Pregunta

¿Cuál es la mejor manera de esperar (sin girar) hasta que algo está disponible en cualquiera de dos (multiprocesamiento) colas , donde ambos residen en el mismo sistema?

¿Fue útil?

Solución

No parece que haya una manera oficial para manejar esto todavía. O al menos, no se basa en lo siguiente:

Se podría intentar algo así como lo que está haciendo este post - acceder a los controladores de archivo tubo subyacente:

y luego seleccione utilizar.

Otros consejos

En realidad se puede utilizar objetos multiprocessing.Queue en select.select. es decir

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

seleccionaría Que sólo si está listo para ser leído a partir.

No hay documentación sobre ello. Estaba leyendo el código fuente de la biblioteca multiprocessing.queue (en Linux por lo general es STH como /usr/lib/python2.6/multiprocessing/queue.py) para averiguarlo.

Con Queue.Queue yo no he encontrado ninguna forma inteligente de hacer esto (y realmente me gustaría a).

Parece que el uso de hilos que los elementos entrantes hacia adelante a una sola cola que luego se espera en es una opción práctica cuando se utiliza el multiprocesamiento de una manera independiente de la plataforma.

Evitar los hilos requiere o bien el manejo de tubos de bajo nivel / FDS que es tanto específica de la plataforma y no es fácil de manejar consistentemente con la API de nivel superior.

O que se necesita colas con la capacidad de establecer las devoluciones de llamada que creo que son la interfaz de nivel superior adecuado para ir. Es decir. que iba a escribir algo como:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Tal vez el paquete de multiprocesamiento podría crecer esta API pero no es allí todavía. El concepto funciona bien con py.execnet que utiliza el término "canal" en lugar de "colas", ver aquí http: // tinyurl .com / nmtr4w

Usted podría utilizar algo así como el Observador patrón, en el que los suscriptores de la cola son notificados de los cambios de estado .

En este caso, usted podría tener su subproceso de trabajo designado como oyente en cada cola, y cada vez que recibe una señal de preparado, se puede trabajar en el nuevo artículo, de lo contrario dormir.

No está seguro de lo bien que la selección en una cola de multiprocesamiento funciona en Windows. Como en las ventanas de selección escucha los zócalos y no los identificadores de archivo, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar a cada cola de un modo de bloqueo, y poner los resultados a todos en una sola cola escuchado por el hilo principal, la multiplexación esencialmente las colas individuales en una sola.

Mi código para hacer esto es:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

La siguiente rutina de prueba muestra cómo usarlo:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Espero que esto ayude.

Tony Wallace

La nueva versión del código ...

No está seguro de lo bien que la selección en una cola de multiprocesamiento funciona en Windows. Como en las ventanas de selección escucha los zócalos y no los identificadores de archivo, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar a cada cola de un modo de bloqueo, y poner los resultados a todos en una sola cola escuchado por el hilo principal, la multiplexación esencialmente las colas individuales en una sola.

Mi código para hacer esto es:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

El código de seguimiento es mi rutina de prueba para mostrar cómo funciona:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)

A partir de Python 3.3 se puede utilizar multiprocessing.connection. esperar que esperar en varios objetos a la vez Queue._reader.

No hacerlo.

Ponga un encabezado en los mensajes y enviarlos a una cola común. Esto simplifica el código y será más limpio en general.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top