Question

Here's an example. I have one producer and several consumers.

#!/usr/bin/env python2

from multiprocessing import Process, Queue
import time

def counter(low, high):
    current = low 
    while current <= high:
        yield current
        current += 1

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks') 

def work(id, q): 
    while True:
        task = q.get()
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

if __name__ == '__main__':
    q = Queue(2)
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 

    task_gen.start()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

counter is just a number generator for put_tasks. Typically, I would have several thousands of tasks instead of just 10 like in this example. The point of this code is to feed the queue with tasks incrementally.

The problem is that consumers cannot know in advance how many tasks they will have to process but the put_tasks function does know when it's done (it then prints no more tasks).

Sample output:

process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks

All tasks get processed but the program then hangs (each process gets stuck on q.get(). I would like it to terminate when all tasks have been processed without sacrificing speed or safety (no ugly timeouts).

Any ideas?

Was it helpful?

Solution

I suggest to put a sentinel value to put on the end of the queue

def put_tasks(q):
    ...

    print('put_tasks: no more tasks')
    q.put(end_of_queue)

def work(id, q):
    while True:
        task = q.get()

        if task == end_of_queue:
            q.put(task)
            print("DONE")
            return

        print('process %d: %s' % (id, task))
        time.sleep(.1)
    print('process %d: done' % id)

class Sentinel:
    def __init__(self, id):
        self.id = id

    def __eq__(self, other):
        if isinstance(other, Sentinel):
            return self.id == other.id

        return NotImplemented

if __name__ == '__main__':
    q = Queue(2)
    end_of_queue = Sentinel("end of queue")
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
    ...

I don't seem to be able to use object() as the sentinel because the threads seem to have access different instances, so they don't compare equal.

If you ever wish to generate random sentinels you can use the uuid module to generate random ids:

import uuid

class Sentinel:
    def __init__(self):
        self.id = uuid.uuid4()

    def __eq__(self, other):
        if isinstance(other, Sentinel):
            return self.id == other.id

        return NotImplemented

Finally, zch used None for a sentinel which is perfectly adequate as long as the queue cannot have None in. The sentinel method will work for mostly-arbitrary arguments.

OTHER TIPS

The simplest way is to add to the queue something that tells consumers all work is done.

number_of_consumers = 3

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks')
    for i in range(number_of_consumers):
        q.put(None)

def work(id, q): 
    while True:
        task = q.get()
        if task is None:
            break
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

I recently looked into the same question and found an alternate answer to the above, in the Python documentation

It looks like the "correct" way to do this is with the Queue.task_done() method, i.e.:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top