«Выберите» в нескольких очереди Python Multiprocessing?
-
13-09-2019 - |
Вопрос
Как лучше всего подождать (без вращения), пока что -то не будет доступно ни в одном из двух (многопроцессорная) Очереди, где оба проживают в одной и той же системе?
Решение
Похоже, что есть официальный способ справиться с этим. Или, по крайней мере, не основано на этом:
Вы можете попробовать что -то вроде того, что делает этот пост - доступ к базовым файлам труб:
а затем используйте Select.
Другие советы
На самом деле вы можете использовать Multiprocessing. т.е.
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
Выберите Que, только если он будет готов к чтению.
Никакой документации об этом хотя. Я читал исходный код мультипроцессы.
С queue.queue я не нашел никакого умного способа сделать это (и я бы очень хотел).
Похоже, что использование потоков, которые переправляют входящие элементы в одну очередь, которую вы затем ждете, является практическим выбором при использовании многопроцессы в независимой форме платформы.
Избегание потоков требует либо обработки низкоуровневых труб/FDS, которые являются как специфичными для платформы, так и нелегко обрабатывать API более высокого уровня.
Или вам понадобятся очереди с возможностью устанавливать обратные вызовы, которые, я думаю, являются правильным интерфейсом более высокого уровня. Т.е. вы бы написали что -то вроде:
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
Может быть, многопроцессный пакет может вырастить этот API, но его еще нет. Концепция хорошо работает с py.execnet, который использует термин «канал» вместо «очереди», см. Здесь http://tinyurl.com/nmtr4w
Вы можете использовать что -то вроде Наблюдатель Образец, в которой подписчики очереди уведомляются об изменениях состояния.
В этом случае вы можете назначить свой рабочая нить в качестве слушателя в каждой очереди, и всякий раз, когда он получает готовую сигнал, он может работать над новым предметом, в противном случае сон.
Не уверен, насколько хорошо выберите в многопрофильной очереди работает в Windows. Как выберите в Windows Sicking для сокетов, а не ручки файлов, я подозреваю, что могут возникнуть проблемы.
Мой ответ состоит в том, чтобы сделать ветку, чтобы прослушать каждую очередь блокирующим способом, и вставить все результаты в одну очередь, прослушанный основным потоком, по сути, мультиплексируя отдельные очереди в одну.
Мой код для этого:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Следующая процедура тестирования показывает, как ее использовать:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
Надеюсь это поможет.
Тони Уоллес
Новая версия вышеупомянутого кода ...
Не уверен, насколько хорошо выберите в многопрофильной очереди работает в Windows. Как выберите в Windows Sicking для сокетов, а не ручки файлов, я подозреваю, что могут возникнуть проблемы.
Мой ответ состоит в том, чтобы сделать ветку, чтобы прослушать каждую очередь блокирующим способом, и вставить все результаты в одну очередь, прослушанный основным потоком, по сути, мультиплексируя отдельные очереди в одну.
Мой код для этого:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Следующий код - моя тестовая процедура, чтобы показать, как он работает:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
Начиная с Python 3.3 вы можете использовать Multiprocessing.connection.Wait Чтобы подождать несколько Queue._reader
объекты одновременно.
Не делай этого.
Поместите заголовок на сообщения и отправьте их в общую очередь. Это упрощает код и будет более чище в целом.