Question

I have an iterator which contains a lot of data (larger then memory) I want to be able to perform some actions on this data. To do this quickly I am using the multiprocessing module.

def __init__(self, poolSize, spaceTimeTweetCollection=None):
    super().__init__()
    self.tagFreq = {}

    if spaceTimeTweetCollection is not None:
        q = Queue()

        processes = [Process(target=self.worker, args=((q),)) for i in range(poolSize)]

        for p in processes:
            p.start()

        for tweet in spaceTimeTweetCollection:
            q.put(tweet)

        for p in processes:
            p.join()

the aim is that I create some proceses which listen in on the queue

def worker(self, queue):
    tweet = queue.get()
    self.append(tweet) #performs some actions on data

I then loop over the iterator and add the data to the queue as the queue.get() in the worker method is blocking the workers should start performing actions on the data as it recieves it from the queue.

However instead each worker on each processor is run once and thats it! so if poolSize is 8 it will read the first 8 items in the queue perform the actions on 8 different processes and then it will finish! does anyone know why this is happerning? I am running this on windows.

edit I wanted to mention even thought this is all being done in a class the class is called in _main_like so

if __name__ == '__main__':
    tweetDatabase = Database()
    dataSet =  tweetDatabase.read2dBoundingBox(boundaryBox)
    freq = TweetCounter(8, dataSet) # this is where the multiprocessing is done
Was it helpful?

Solution

Your worker is to blame I believe. It just does one thing and then dies. Try:

def worker(self, queue):
    while True:
        tweet = queue.get()
        self.append(tweet)

(I'd take a look at Pool though)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top