Вопрос

У меня есть один поток, который записывает результаты в очередь.

В другом потоке (GUI) я периодически (в событии IDLE) проверяю, есть ли результаты в очереди, например:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Хороший ли это способ сделать это ?

Редактировать:

Я спрашиваю, потому что иногда ожидающий поток застревает на несколько секунд без получения новых результатов.

Оказалось, что проблема "зависания" заключалась в том, что я выполнял обработку в обработчике событий idle, не убедившись, что такие события действительно генерируются вызовом wx.WakeUpIdle, как это рекомендуется.

Это было полезно?

Решение

Я был бы очень удивлен, если бы get_nowait() вызов вызвал паузу, не вернувшись, если список был пуст.

Может ли быть так, что вы размещаете большое количество (возможно, больших?) элементов между проверками, что означает, что принимающий поток имеет большой объем данных для извлечения из Queue?Вы могли бы попробовать ограничить количество, которое вы извлекаете в одном пакете:

def queue_get_all(q):
    items = []
    maxItemsToRetreive = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetreive):
        try:
            if numOfItemsRetrieved == maxItemsToRetreive:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

Это ограничило бы приемный поток возможностью извлечения до 10 элементов одновременно.

Другие советы

Если вы всегда извлекаете все доступные элементы из очереди, есть ли какой-то реальный смысл в использовании очереди, а не просто списка с блокировкой?т. е.:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

Если вы также извлекаете их по отдельности и используете поведение блокировки для пустых очередей, тогда вам следует использовать Queue , но ваш вариант использования выглядит намного проще, и, возможно, описанный выше подход лучше подходит.

[Правка2] Я пропустил тот факт, что вы опрашиваете очередь из цикла ожидания, и из вашего обновления я вижу, что проблема не связана с конфликтом, поэтому приведенный ниже подход на самом деле не имеет отношения к вашей проблеме.Я оставил это на случай, если кто-нибудь сочтет блокирующий вариант этого полезным:

В случаях, когда вы действительно хотите заблокировать, пока не получите хотя бы один результат, вы можете изменить приведенный выше код, чтобы дождаться, пока данные станут доступны по сигналу потока-производителя.Например.

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items

Я думаю, что самый простой способ вывести все элементы из очереди заключается в следующем:

def get_all_queue_result(queue):

    result_list = []
    while not queue.empty():
        result_list.append(queue.get())

    return result_list

Я вижу, что вы используете get_nowait(), который, согласно документации, "возвращает элемент, если он доступен немедленно, иначе вызывает пустое исключение".

Теперь вы случайно выходите из цикла, когда генерируется пустое исключение.Таким образом, если в очереди нет результата, доступного немедленно, ваша функция возвращает пустой список элементов.

Есть ли причина, по которой вы не используете метод get() вместо этого?Может случиться так, что функция get_nowait() завершается с ошибкой, потому что очередь обслуживает запрос put() в тот же момент.

Если вы закончили запись в очередь, qsize должен выполнить это задание без необходимости проверять очередь для каждой итерации.

responseList = []
for items in range(0, q.qsize()):
    responseList.append(q.get_nowait())

Самый простой метод - это использование понимания списка:

items = [q.get() for _ in range(q.qsize())]

Использование range к функции обычно относятся неодобрительно, но я пока не нашел более простого метода.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top