Question

I recently wrote a program that used a simple producer/consumer pattern. It initially had a bug related to improper use of threading.Lock that I eventually fixed. But it made me think whether it's possible to implement producer/consumer pattern in a lockless manner.

Requirements in my case were simple:

  • One producer thread.
  • One consumer thread.
  • Queue has place for only one item.
  • Producer can produce next item before the current one is consumed. The current item is therefore lost, but that's OK for me.
  • Consumer can consume current item before the next one is produced. The current item is therefore consumed twice (or more), but that's OK for me.

So I wrote this:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

My question is: Is this code thread-safe?

Immediate comment: this code isn't really lockless - I use CPython and it has GIL.

I tested the code a little and it seems to work. It translates to some LOAD and STORE ops which are atomic because of GIL. But I also know that del x operation isn't atomic when x implements __del__ method. So if my item has a __del__ method and some nasty scheduling happens, things may break. Or not?

Another question is: What kind of restrictions (for example on produced items' type) do I have to impose to make the above code work fine?

My questions are only about theoretical possibility to exploit CPython's and GIL's quirks in order to come up with lockless (i.e. no locks like threading.Lock explicitly in code) solution.

Was it helpful?

Solution

Yes this will work in the way that you described:

  1. That the producer may produce a skippable element.
  2. That the consumer may consume the same element.

But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.

I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".

Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.

OTHER TIPS

Trickery will bite you. Just use Queue to communicate between threads.

This is not really thread safe because producer could overwrite QUEUE_ITEM before consumer has consumed it and consumer could consume QUEUE_ITEM twice. As you mentioned, you're OK with that but most people aren't.

Someone with more knowledge of cpython internals will have to answer you more theoretical questions.

I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.

Also the threads may produce/consume any number of items before the other one starts running.

You can use a list as the queue as long as you stick to append/pop since both are atomic.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In a class scope like below, you can even clear the queue.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

You'll have to find out which list operations are atomic by looking at the bytecode generated.

The __del__ could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__ method on the old object before We finish assigning the new one to the QUEUE_ITEM. We would need something like:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

I'm afraid, I don't know if it is possible, though.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top