Является ли этот потокобезопасный подход без блокировки производителя и потребителя Python?
-
21-08-2019 - |
Вопрос
Недавно я написал программу, которая использовала простой шаблон производитель / потребитель.Изначально в нем была ошибка, связанная с неправильным использованием потоков.Блокировка, которую я в конечном итоге исправил.Но это заставило меня задуматься, возможно ли реализовать шаблон производитель / потребитель без блокировки.
Требования в моем случае были простыми:
- Один поток производителя.
- Один потребительский поток.
- В очереди есть место только для одного элемента.
- Производитель может произвести следующий товар до того, как будет потреблен текущий.Таким образом, текущий элемент утерян, но для меня это нормально.
- Потребитель может потребить текущий товар до того, как будет произведен следующий.Таким образом, текущий продукт расходуется дважды (или больше), но для меня это нормально.
Итак, я написал это:
QUEUE_ITEM = None
# this is executed in one threading.Thread object
def producer():
global QUEUE_ITEM
while True:
i = produce_item()
QUEUE_ITEM = i
# this is executed in another threading.Thread object
def consumer():
global QUEUE_ITEM
while True:
i = QUEUE_ITEM
consume_item(i)
Мой вопрос заключается в следующем:Является ли этот код потокобезопасным?
Немедленный комментарий:этот код на самом деле не является безблокировочным - я использую CPython, и у него есть GIL.
Я немного протестировал код, и, кажется, он работает.Это приводит к некоторым операциям ЗАГРУЗКИ и хранения, которые являются атомарными из-за GIL.Но я также знаю, что del x
операция не является атомарной, когда x реализует __del__
способ.Так что, если мой товар имеет __del__
метод и происходит какое-то неприятное планирование, все может сломаться.Или нет?
Другой вопрос заключается в:Какие ограничения (например, на тип производимых элементов) я должен наложить, чтобы приведенный выше код работал нормально?
Мои вопросы касаются только теоретической возможности использовать причуды CPython и GIL для создания lockless (т.е.никаких блокировок, подобных потоковой передаче.Блокировка явно в коде) решение.
Решение
Да, это будет работать так, как вы описали:
- Что производитель может создать пропускаемый элемент.
- Что потребитель может потреблять один и тот же элемент.
Но я также знаю, что операция del x не является атомарной, когда x реализует del способ.Так что, если мой товар имеет del метод и происходит какое-то неприятное планирование, все может сломаться.
Я не вижу здесь буквы "del".Если del происходит в consume_item , то del может возникать в потоке производителя.Я не думаю, что это было бы "проблемой".
Однако не утруждайте себя этим использованием.В конечном итоге вы будете расходовать процессор на бессмысленные циклы опроса, и это ничуть не быстрее, чем использование очереди с блокировками, поскольку Python уже имеет глобальную блокировку.
Другие советы
Обман укусит вас.Просто используйте очередь для связи между потоками.
Это не действительно потокобезопасный, потому что производитель может перезаписать QUEUE_ITEM
до того, как потребитель потребил его, и потребитель мог бы потреблять QUEUE_ITEM
дважды.Как вы упомянули, вас это устраивает, но большинство людей - нет.
Кто-то, обладающий большими знаниями о внутренних компонентах cpython, должен будет ответить вам на дополнительные теоретические вопросы.
Я думаю, возможно, что поток прерывается во время производства / потребления, особенно если элементы являются большими объектами.Редактировать:это всего лишь дикое предположение.Я не эксперт.
Также потоки могут создавать / потреблять любое количество элементов до того, как другой начнет выполняться.
Вы можете использовать список в качестве очереди до тех пор, пока вы придерживаетесь append / pop, поскольку оба они являются атомарными.
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
В области видимости класса, подобной приведенной ниже, вы даже можете очистить очередь.
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
Вам нужно будет выяснить, какие операции со списком являются атомарными, просмотрев сгенерированный байт-код.
В __del__
может возникнуть проблема, как Вы и сказали.Этого можно было бы избежать, если бы только существовал способ предотвратить вызов сборщиком мусора __del__
метод на старом объекте, прежде чем мы закончим присваивать новый объекту QUEUE_ITEM
.Нам нужно было бы что-то вроде:
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
Боюсь, однако, я не знаю, возможно ли это.