スレッドキューからすべてのアイテムを取得する
-
03-07-2019 - |
質問
結果をキューに書き込むスレッドが1つあります。
別のスレッド(GUI)で、次のように、キューに結果があるかどうかを定期的に(IDLEイベントで)確認します。
def queue_get_all(q):
items = []
while 1:
try:
items.append(q.get_nowait())
except Empty, e:
break
return items
これは良い方法ですか?
編集:
私が尋ねているのは、時々 待機中のスレッドが数回スタックする 新しく出さずに数秒 結果。
「スタック」推奨されているように、 wx.WakeUpIdle
を呼び出すことによってそのようなイベントが実際に生成されることを確認せずに、アイドルイベントハンドラーで処理を行っていたために問題が判明しました。
解決
get_nowait()
呼び出しがリストが空の場合に戻らないことで一時停止を引き起こした場合、私は非常に驚いたでしょう。
チェックの間に多数の(おそらく大きい?)アイテムを投稿しているということでしょうか?これは、受信スレッドが Queue
から引き出す大量のデータを持っていることを意味しますか? 1つのバッチで取得する数を制限してみてください:
def queue_get_all(q):
items = []
maxItemsToRetreive = 10
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items
これにより、受信スレッドは一度に最大10個のアイテムをプルするように制限されます。
他のヒント
使用可能なすべてのアイテムを常にキューから取り出す場合、ロックのあるリストだけでなく、キューを使用することに本当の意味はありますか?例:
from __future__ import with_statement
import threading
class ItemStore(object):
def __init__(self):
self.lock = threading.Lock()
self.items = []
def add(self, item):
with self.lock:
self.items.append(item)
def getAll(self):
with self.lock:
items, self.items = self.items, []
return items
それらを個別にプルし、空のキューのブロック動作を利用する場合は、Queueを使用する必要がありますが、ユースケースははるかに単純に見え、上記のアプローチの方が適している可能性があります。
[Edit2] アイドルループからキューをポーリングしているという事実を見逃していました。更新から、問題は競合とは関係がないことがわかりました。以下のアプローチは、あなたの問題に実際には関係ありません。誰かがこの便利なブロッキングバリアントを見つけた場合に備えて、これを残しました。
少なくとも1つの結果が得られるまでブロックする場合は、上記のコードを変更して、プロデューサースレッドからのシグナルが送られてデータが利用可能になるのを待つことができます。例:
class ItemStore(object):
def __init__(self):
self.cond = threading.Condition()
self.items = []
def add(self, item):
with self.cond:
self.items.append(item)
self.cond.notify() # Wake 1 thread waiting on cond (if any)
def getAll(self, blocking=False):
with self.cond:
# If blocking is true, always return at least 1 item
while blocking and len(self.items) == 0:
self.cond.wait()
items, self.items = self.items, []
return items
すべてのアイテムをキューから取り出す最も簡単な方法は次のとおりだと思います:
def get_all_queue_result(queue):
result_list = []
while not queue.empty():
result_list.append(queue.get())
return result_list
ドキュメントによると、get_nowait()を使用していることがわかります。これは、アイテムがすぐに使用可能な場合は「return [s]、そうでない場合は空の例外を発生させます」
今、Empty例外がスローされると、ループから抜け出します。したがって、キューにすぐに使用できる結果がない場合、関数は空のアイテムリストを返します。
代わりにget()メソッドを使用しない理由はありますか?キューが同じ瞬間にput()リクエストを処理しているためにget_nowait()が失敗する場合があります。
キューへの書き込みが完了したら、qsizeは各反復のキューをチェックする必要なくトリックを実行する必要があります。
responseList = []
for items in range(0, q.qsize()):
responseList.append(q.get_nowait())
最も簡単な方法は、リスト内包表記を使用することです:
items = [q.get() for _ in range(q.qsize())]
range
関数の使用は一般的に嫌われていますが、より簡単な方法はまだ見つかりません。