Вопрос

Как лучше всего подождать (без вращения), пока что -то не будет доступно ни в одном из двух (многопроцессорная) Очереди, где оба проживают в одной и той же системе?

Это было полезно?

Решение

Похоже, что есть официальный способ справиться с этим. Или, по крайней мере, не основано на этом:

Вы можете попробовать что -то вроде того, что делает этот пост - доступ к базовым файлам труб:

а затем используйте Select.

Другие советы

На самом деле вы можете использовать Multiprocessing. т.е.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

Выберите Que, только если он будет готов к чтению.

Никакой документации об этом хотя. Я читал исходный код мультипроцессы.

С queue.queue я не нашел никакого умного способа сделать это (и я бы очень хотел).

Похоже, что использование потоков, которые переправляют входящие элементы в одну очередь, которую вы затем ждете, является практическим выбором при использовании многопроцессы в независимой форме платформы.

Избегание потоков требует либо обработки низкоуровневых труб/FDS, которые являются как специфичными для платформы, так и нелегко обрабатывать API более высокого уровня.

Или вам понадобятся очереди с возможностью устанавливать обратные вызовы, которые, я думаю, являются правильным интерфейсом более высокого уровня. Т.е. вы бы написали что -то вроде:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Может быть, многопроцессный пакет может вырастить этот API, но его еще нет. Концепция хорошо работает с py.execnet, который использует термин «канал» вместо «очереди», см. Здесь http://tinyurl.com/nmtr4w

Вы можете использовать что -то вроде Наблюдатель Образец, в которой подписчики очереди уведомляются об изменениях состояния.

В этом случае вы можете назначить свой рабочая нить в качестве слушателя в каждой очереди, и всякий раз, когда он получает готовую сигнал, он может работать над новым предметом, в противном случае сон.

Не уверен, насколько хорошо выберите в многопрофильной очереди работает в Windows. Как выберите в Windows Sicking для сокетов, а не ручки файлов, я подозреваю, что могут возникнуть проблемы.

Мой ответ состоит в том, чтобы сделать ветку, чтобы прослушать каждую очередь блокирующим способом, и вставить все результаты в одну очередь, прослушанный основным потоком, по сути, мультиплексируя отдельные очереди в одну.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

Следующая процедура тестирования показывает, как ее использовать:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Надеюсь это поможет.

Тони Уоллес

Новая версия вышеупомянутого кода ...

Не уверен, насколько хорошо выберите в многопрофильной очереди работает в Windows. Как выберите в Windows Sicking для сокетов, а не ручки файлов, я подозреваю, что могут возникнуть проблемы.

Мой ответ состоит в том, чтобы сделать ветку, чтобы прослушать каждую очередь блокирующим способом, и вставить все результаты в одну очередь, прослушанный основным потоком, по сути, мультиплексируя отдельные очереди в одну.

Мой код для этого:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

Следующий код - моя тестовая процедура, чтобы показать, как он работает:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)

Начиная с Python 3.3 вы можете использовать Multiprocessing.connection.Wait Чтобы подождать несколько Queue._reader объекты одновременно.

Не делай этого.

Поместите заголовок на сообщения и отправьте их в общую очередь. Это упрощает код и будет более чище в целом.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top