这种Python生产者-消费者无锁方法是线程安全的吗?
-
21-08-2019 - |
题
我最近编写了一个使用简单的生产者/消费者模式的程序。它最初有一个与 threading.Lock 使用不当相关的错误,我最终修复了该错误。但这让我思考是否可以以无锁的方式实现生产者/消费者模式。
我的情况的要求很简单:
- 一个生产者线程。
- 一个消费者线程。
- 队列只能容纳一件物品。
- 生产者可以在当前项目被消耗之前生产下一个项目。因此当前的项目丢失了,但这对我来说没关系。
- 消费者可以在生产下一件物品之前消费当前物品。因此,当前物品被消耗两次(或更多),但这对我来说没关系。
所以我写了这个:
QUEUE_ITEM = None
# this is executed in one threading.Thread object
def producer():
global QUEUE_ITEM
while True:
i = produce_item()
QUEUE_ITEM = i
# this is executed in another threading.Thread object
def consumer():
global QUEUE_ITEM
while True:
i = QUEUE_ITEM
consume_item(i)
我的问题是:这段代码是线程安全的吗?
立即评论:这段代码并不是真正无锁的——我使用 CPython 并且它有 GIL。
我对代码进行了一些测试,它似乎可以工作。它转换为一些 LOAD 和 STORE 操作,由于 GIL,这些操作是原子的。但我也知道 del x
当 x 实现时,操作不是原子的 __del__
方法。所以如果我的物品有 __del__
方法和一些令人讨厌的调度发生,事情可能会中断。或不?
另一个问题是:我必须施加什么样的限制(例如对生产项目的类型)才能使上述代码正常工作?
我的问题只是关于利用 CPython 和 GIL 的怪癖来实现无锁的理论可能性(即没有像 threading.Lock 这样在代码中显式锁定)的解决方案。
解决方案
是的,这将以您描述的方式工作:
- 生产者可以产生可跳过的元素。
- 消费者可能会消费相同的元素。
但我也知道当 x 实现时 del x 操作不是原子的 德尔 方法。所以如果我的物品有 德尔 方法和一些令人讨厌的调度发生,事情可能会中断。
我在这里没有看到“del”。如果del发生在consume_item中,那么 德尔 可能发生在生产者线程中。我不认为这会是一个“问题”。
不过不用费心使用这个。你最终会在毫无意义的轮询周期上耗尽 CPU,而且它并不比使用带锁的队列快,因为 Python 已经有了全局锁。
其他提示
诡计会咬你。线程间通信只需使用Queue即可。
这不是 真的 线程安全,因为生产者可以覆盖 QUEUE_ITEM
在消费者消费它并且消费者可以消费之前 QUEUE_ITEM
两次。正如您提到的,您对此表示同意,但大多数人则不然。
对 cpython 内部结构有更多了解的人将不得不回答你更多的理论问题。
我认为线程在生产/消费时可能会被中断,特别是当项目是大对象时。编辑:这只是一个疯狂的猜测。我不是专家。
此外,线程可以在另一个线程开始运行之前生成/消耗任意数量的项目。
只要坚持追加/弹出,您就可以使用列表作为队列,因为两者都是原子的。
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
在如下所示的类范围中,您甚至可以清除队列。
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
您必须通过查看生成的字节码来找出哪些列表操作是原子的。
这 __del__
正如你所说,可能是个问题。如果有一种方法可以阻止垃圾收集器调用 __del__
在我们完成将新对象分配给旧对象之前,对旧对象执行方法 QUEUE_ITEM
. 。我们需要类似的东西:
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
恐怕,我不知道这是否可能。