Question

What's the best way to wait (without spinning) until something is available in either one of two (multiprocessing) Queues, where both reside on the same system?

Was it helpful?

Solution

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

and then use select.

OTHER TIPS

Actually you can use multiprocessing.Queue objects in select.select. i.e.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).

Seems like using threads which forward incoming items to a single Queue which you then wait on is a practical choice when using multiprocessing in a platform independent manner.

Avoiding the threads requires either handling low-level pipes/FDs which is both platform specific and not easy to handle consistently with the higher-level API.

Or you would need Queues with the ability to set callbacks which i think are the proper higher level interface to go for. I.e. you would write something like:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Maybe the multiprocessing package could grow this API but it's not there yet. The concept works well with py.execnet which uses the term "channel" instead of "queues", see here http://tinyurl.com/nmtr4w

You could use something like the Observer pattern, wherein Queue subscribers are notified of state changes.

In this case, you could have your worker thread designated as a listener on each queue, and whenever it receives a ready signal, it can work on the new item, otherwise sleep.

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The following test routine shows how to use it:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Hope this helps.

Tony Wallace

New version of above code...

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The follow code is my test routine to show how it works:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)

As of Python 3.3 you can use multiprocessing.connection.wait to wait on multiple Queue._reader objects at once.

Don't do it.

Put a header on the messages and send them to a common queue. This simplifies the code and will be cleaner overall.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top