Является ли этот потокобезопасный подход без блокировки производителя и потребителя Python?

StackOverflow https://stackoverflow.com/questions/854906

Вопрос

Недавно я написал программу, которая использовала простой шаблон производитель / потребитель.Изначально в нем была ошибка, связанная с неправильным использованием потоков.Блокировка, которую я в конечном итоге исправил.Но это заставило меня задуматься, возможно ли реализовать шаблон производитель / потребитель без блокировки.

Требования в моем случае были простыми:

  • Один поток производителя.
  • Один потребительский поток.
  • В очереди есть место только для одного элемента.
  • Производитель может произвести следующий товар до того, как будет потреблен текущий.Таким образом, текущий элемент утерян, но для меня это нормально.
  • Потребитель может потребить текущий товар до того, как будет произведен следующий.Таким образом, текущий продукт расходуется дважды (или больше), но для меня это нормально.

Итак, я написал это:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

Мой вопрос заключается в следующем:Является ли этот код потокобезопасным?

Немедленный комментарий:этот код на самом деле не является безблокировочным - я использую CPython, и у него есть GIL.

Я немного протестировал код, и, кажется, он работает.Это приводит к некоторым операциям ЗАГРУЗКИ и хранения, которые являются атомарными из-за GIL.Но я также знаю, что del x операция не является атомарной, когда x реализует __del__ способ.Так что, если мой товар имеет __del__ метод и происходит какое-то неприятное планирование, все может сломаться.Или нет?

Другой вопрос заключается в:Какие ограничения (например, на тип производимых элементов) я должен наложить, чтобы приведенный выше код работал нормально?

Мои вопросы касаются только теоретической возможности использовать причуды CPython и GIL для создания lockless (т.е.никаких блокировок, подобных потоковой передаче.Блокировка явно в коде) решение.

Это было полезно?

Решение

Да, это будет работать так, как вы описали:

  1. Что производитель может создать пропускаемый элемент.
  2. Что потребитель может потреблять один и тот же элемент.

Но я также знаю, что операция del x не является атомарной, когда x реализует del способ.Так что, если мой товар имеет del метод и происходит какое-то неприятное планирование, все может сломаться.

Я не вижу здесь буквы "del".Если del происходит в consume_item , то del может возникать в потоке производителя.Я не думаю, что это было бы "проблемой".

Однако не утруждайте себя этим использованием.В конечном итоге вы будете расходовать процессор на бессмысленные циклы опроса, и это ничуть не быстрее, чем использование очереди с блокировками, поскольку Python уже имеет глобальную блокировку.

Другие советы

Обман укусит вас.Просто используйте очередь для связи между потоками.

Это не действительно потокобезопасный, потому что производитель может перезаписать QUEUE_ITEM до того, как потребитель потребил его, и потребитель мог бы потреблять QUEUE_ITEM дважды.Как вы упомянули, вас это устраивает, но большинство людей - нет.

Кто-то, обладающий большими знаниями о внутренних компонентах cpython, должен будет ответить вам на дополнительные теоретические вопросы.

Я думаю, возможно, что поток прерывается во время производства / потребления, особенно если элементы являются большими объектами.Редактировать:это всего лишь дикое предположение.Я не эксперт.

Также потоки могут создавать / потреблять любое количество элементов до того, как другой начнет выполняться.

Вы можете использовать список в качестве очереди до тех пор, пока вы придерживаетесь append / pop, поскольку оба они являются атомарными.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

В области видимости класса, подобной приведенной ниже, вы даже можете очистить очередь.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Вам нужно будет выяснить, какие операции со списком являются атомарными, просмотрев сгенерированный байт-код.

В __del__ может возникнуть проблема, как Вы и сказали.Этого можно было бы избежать, если бы только существовал способ предотвратить вызов сборщиком мусора __del__ метод на старом объекте, прежде чем мы закончим присваивать новый объекту QUEUE_ITEM.Нам нужно было бы что-то вроде:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Боюсь, однако, я не знаю, возможно ли это.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top