これはPythonの生産者-消費者locklessアプローチスレッド安全ですか?
-
21-08-2019 - |
質問
私は最近書いたプログラムを用いた簡単なプロデューサー/消費者のパターンです。では最初にバグ関連の不正使用のネジ.ロックというのを修正。ものとされていましたかどうかをプロデューサー/消費者のパターンlocklessます。
要求事項ったシンプルです:
- の一つ。
- 一消費者ます。
- キューの場所のために一つだけます。
- 生産者でクラスタ間のレプリケーションの一つ消費しているかが見えてきます。現在の項目に従って失われたが、大丈夫でした。
- 消費者が消費する電流項目の一つが生産されています。現在の商品が消費に回以上)ですが、大丈夫でした。
でも書きました:
QUEUE_ITEM = None
# this is executed in one threading.Thread object
def producer():
global QUEUE_ITEM
while True:
i = produce_item()
QUEUE_ITEM = i
# this is executed in another threading.Thread object
def consumer():
global QUEUE_ITEM
while True:
i = QUEUE_ITEM
consume_item(i)
私の質問はこのコードはスレッド安全ですか?
即時のコメント:このコードの最初がやりたかっただけでlocklessを使っていCPythonでGIL.
いたしましのコードを少しでも生かされております。その一部に負荷保opsる原子で吉.がいることも分かっていることで、こ del x
運転な原子がxの実施 __del__
方法。なので私のアイテム __del__
方法および一部のバケジューリングが起こるものが破断することになります。はありませんか?
他の質問はどのような制限(例えば製品タイプ)をもっているのコード作。
私の質問のみでの理論的可能性をCPythonやGILの癖にはまっていく様子を見ることができlockless(なロックのようにネジ.ロックを明示的にコード)。
解決
ありその作業方を述:
- このプロデューサーを演出しskippable要素となります。
- ることで、消費者が消費する同じ要素になります。
がいることも分かっていることで、こdel x操作な原子がxの実施 del 方法。なので私のアイテム del 方法および一部のバケジューリングが起こるものが破断することになります。
と思いますか、"del"です。場合デルが起きconsume_itemの del がプロデューサーのスレです。とは思わないその一環として行われた"問題です。
かわいを利用すものです。ま使用のCPUは、意味のポーリングサイクルでは高速を使用キューとロックをPythonでは、グローバルロックになっています。
他のヒント
欺き、あなたをかむます。ただ、スレッド間の通信にキューを使用します。
QUEUE_ITEM
を消費することができる前にプロデューサーがQUEUE_ITEM
を上書きする可能性があるため、これはの本当にのスレッドセーフではありません。あなたが述べたように、あなたはそれでOKですが、ほとんどの人はそうではありません。
はCPythonの内部のより多くの知識を持つ誰かがあなたに多くの理論的な質問に回答する必要があります。
私はそれはアイテムが大きなオブジェクトである場合は特に、消費する/生成しながら、スレッドが中断されている可能性だと思います。 編集:これはちょうど野生の推測です。私は専門家だ。
他の一つは実行を開始する前に、またスレッドが消費/アイテムの任意の数を生成することができる。
あなたは両方がアトミックであるため、/ポップを追加するために固執する限り、キューとしてリストを使用することができます。
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
以下のようなクラススコープでは、あなたも、キューをクリアすることができます。
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
あなたは、生成されたバイトコードを見て、リスト操作がアトミックである見つける必要があります。
__del__
は問題がある可能性があります。唯一の私たちは、__del__
に新しいものを割り当てる終える前に、古いオブジェクトにQUEUE_ITEM
メソッドを呼び出すことから、ガベージコレクタを防ぐ方法があった場合には、避けることができます。我々のようなものが必要になります:
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
私はそれが、可能であるならば、私は知らない、怖います。