“Seleccionar” en múltiples colas de multiprocesamiento Python?
-
13-09-2019 - |
Pregunta
¿Cuál es la mejor manera de esperar (sin girar) hasta que algo está disponible en cualquiera de dos (multiprocesamiento) colas , donde ambos residen en el mismo sistema?
Solución
No parece que haya una manera oficial para manejar esto todavía. O al menos, no se basa en lo siguiente:
Se podría intentar algo así como lo que está haciendo este post - acceder a los controladores de archivo tubo subyacente:
y luego seleccione utilizar.
Otros consejos
En realidad se puede utilizar objetos multiprocessing.Queue en select.select. es decir
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
seleccionaría Que sólo si está listo para ser leído a partir.
No hay documentación sobre ello. Estaba leyendo el código fuente de la biblioteca multiprocessing.queue (en Linux por lo general es STH como /usr/lib/python2.6/multiprocessing/queue.py) para averiguarlo.
Con Queue.Queue yo no he encontrado ninguna forma inteligente de hacer esto (y realmente me gustaría a).
Parece que el uso de hilos que los elementos entrantes hacia adelante a una sola cola que luego se espera en es una opción práctica cuando se utiliza el multiprocesamiento de una manera independiente de la plataforma.
Evitar los hilos requiere o bien el manejo de tubos de bajo nivel / FDS que es tanto específica de la plataforma y no es fácil de manejar consistentemente con la API de nivel superior.
O que se necesita colas con la capacidad de establecer las devoluciones de llamada que creo que son la interfaz de nivel superior adecuado para ir. Es decir. que iba a escribir algo como:
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
Tal vez el paquete de multiprocesamiento podría crecer esta API pero no es allí todavía. El concepto funciona bien con py.execnet que utiliza el término "canal" en lugar de "colas", ver aquí http: // tinyurl .com / nmtr4w
Usted podría utilizar algo así como el Observador patrón, en el que los suscriptores de la cola son notificados de los cambios de estado .
En este caso, usted podría tener su subproceso de trabajo designado como oyente en cada cola, y cada vez que recibe una señal de preparado, se puede trabajar en el nuevo artículo, de lo contrario dormir.
No está seguro de lo bien que la selección en una cola de multiprocesamiento funciona en Windows. Como en las ventanas de selección escucha los zócalos y no los identificadores de archivo, sospecho que podría haber problemas.
Mi respuesta es hacer un hilo para escuchar a cada cola de un modo de bloqueo, y poner los resultados a todos en una sola cola escuchado por el hilo principal, la multiplexación esencialmente las colas individuales en una sola.
Mi código para hacer esto es:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
La siguiente rutina de prueba muestra cómo usarlo:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
Espero que esto ayude.
Tony Wallace
La nueva versión del código ...
No está seguro de lo bien que la selección en una cola de multiprocesamiento funciona en Windows. Como en las ventanas de selección escucha los zócalos y no los identificadores de archivo, sospecho que podría haber problemas.
Mi respuesta es hacer un hilo para escuchar a cada cola de un modo de bloqueo, y poner los resultados a todos en una sola cola escuchado por el hilo principal, la multiplexación esencialmente las colas individuales en una sola.
Mi código para hacer esto es:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
El código de seguimiento es mi rutina de prueba para mostrar cómo funciona:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
A partir de Python 3.3 se puede utilizar multiprocessing.connection. esperar que esperar en varios objetos a la vez Queue._reader
.
No hacerlo.
Ponga un encabezado en los mensajes y enviarlos a una cola común. Esto simplifica el código y será más limpio en general.