我最近编写了一个使用简单的生产者/消费者模式的程序。它最初有一个与 threading.Lock 使用不当相关的错误,我最终修复了该错误。但这让我思考是否可以以无锁的方式实现生产者/消费者模式。

我的情况的要求很简单:

  • 一个生产者线程。
  • 一个消费者线程。
  • 队列只能容纳一件物品。
  • 生产者可以在当前项目被消耗之前生产下一个项目。因此当前的项目丢失了,但这对我来说没关系。
  • 消费者可以在生产下一件物品之前消费当前物品。因此,当前物品被消耗两次(或更多),但这对我来说没关系。

所以我写了这个:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

我的问题是:这段代码是线程安全的吗?

立即评论:这段代码并不是真正无锁的——我使用 CPython 并且它有 GIL。

我对代码进行了一些测试,它似乎可以工作。它转换为一些 LOAD 和 STORE 操作,由于 GIL,这些操作是原子的。但我也知道 del x 当 x 实现时,操作不是原子的 __del__ 方法。所以如果我的物品有 __del__ 方法和一些令人讨厌的调度发生,事情可能会中断。或不?

另一个问题是:我必须施加什么样的限制(例如对生产项目的类型)才能使上述代码正常工作?

我的问题只是关于利用 CPython 和 GIL 的怪癖来实现无锁的理论可能性(即没有像 threading.Lock 这样在代码中显式锁定)的解决方案。

有帮助吗?

解决方案

是的,这将以您描述的方式工作:

  1. 生产者可以产生可跳过的元素。
  2. 消费者可能会消费相同的元素。

但我也知道当 x 实现时 del x 操作不是原子的 德尔 方法。所以如果我的物品有 德尔 方法和一些令人讨厌的调度发生,事情可能会中断。

我在这里没有看到“del”。如果del发生在consume_item中,那么 德尔 可能发生在生产者线程中。我不认为这会是一个“问题”。

不过不用费心使用这个。你最终会在毫无意义的轮询周期上耗尽 CPU,而且它并不比使用带锁的队列快,因为 Python 已经有了全局锁。

其他提示

诡计会咬你。线程间通信只需使用Queue即可。

这不是 真的 线程安全,因为生产者可以覆盖 QUEUE_ITEM 在消费者消费它并且消费者可以消费之前 QUEUE_ITEM 两次。正如您提到的,您对此表示同意,但大多数人则不然。

对 cpython 内部结构有更多了解的人将不得不回答你更多的理论问题。

我认为线程在生产/消费时可能会被中断,特别是当项目是大对象时。编辑:这只是一个疯狂的猜测。我不是专家。

此外,线程可以在另一个线程开始运行之前生成/消耗任意数量的项目。

只要坚持追加/弹出,您就可以使用列表作为队列,因为两者都是原子的。

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

在如下所示的类范围中,您甚至可以清除队列。

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

您必须通过查看生成的字节码来找出哪些列表操作是原子的。

__del__ 正如你所说,可能是个问题。如果有一种方法可以阻止垃圾收集器调用 __del__ 在我们完成将新对象分配给旧对象之前,对旧对象执行方法 QUEUE_ITEM. 。我们需要类似的东西:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

恐怕,我不知道这是否可能。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top