이 파이썬 프로듀서 소비자 Lockless Approach Thread-Safe입니까?
-
21-08-2019 - |
문제
나는 최근에 간단한 생산자/소비자 패턴을 사용한 프로그램을 썼습니다. 처음에는 나사산의 부적절한 사용과 관련된 버그가있었습니다. 그러나 그것은 잠금 방식으로 생산자/소비자 패턴을 구현할 수 있는지 여부를 생각하게 만들었습니다.
내 경우의 요구 사항은 간단했습니다.
- 하나의 생산자 스레드.
- 하나의 소비자 스레드.
- 대기열에는 하나의 항목 만 있습니다.
- 생산자는 현재 항목을 소비하기 전에 다음 품목을 생산할 수 있습니다. 따라서 현재 항목은 손실되었지만 괜찮습니다.
- 소비자는 다음 품목을 생산하기 전에 현재 품목을 소비 할 수 있습니다. 따라서 현재 항목은 두 번 (또는 그 이상) 소비되지만 괜찮습니다.
그래서 나는 이것을 썼다 :
QUEUE_ITEM = None
# this is executed in one threading.Thread object
def producer():
global QUEUE_ITEM
while True:
i = produce_item()
QUEUE_ITEM = i
# this is executed in another threading.Thread object
def consumer():
global QUEUE_ITEM
while True:
i = QUEUE_ITEM
consume_item(i)
내 질문은 :이 코드 스레드-안전입니까?
즉각적인 의견 :이 코드는 실제로 잠금이 아닙니다. CPYTHON을 사용하고 GIL이 있습니다.
코드를 조금 테스트했는데 작동하는 것 같습니다. 그것은 길로 인해 원자 인 일부 부하 및 저장 작전으로 변환됩니다. 그러나 나는 또한 그것을 알고 있습니다 del x
X가 구현 될 때 작동은 원자가 아닙니다 __del__
방법. 따라서 내 항목에 a가있는 경우 __del__
방법과 불쾌한 일정이 발생하면 상황이 깨질 수 있습니다. 아니면 아니야?
또 다른 문제는 다음과 같습니다. 어떤 종류의 제한 (예 : 제작 된 항목 유형)이 위의 코드를 잘 작동 시키려면 부과해야합니까?
저의 질문은 Cpython과 Gil 's Quirks를 이용할 수있는 이론적 가능성에 관한 것입니다.
해결책
예, 이것은 당신이 설명하는 방식으로 작동합니다.
- 생산자가 건너 뛰어 요소를 생산할 수 있습니다.
- 소비자가 동일한 요소를 소비 할 수 있습니다.
그러나 나는 또한 X가 X가 구현 될 때 Del X 작동이 원자가 아니라는 것을 알고 있습니다. 델 방법. 따라서 내 항목에 a가있는 경우 델 방법과 불쾌한 일정이 발생하면 상황이 깨질 수 있습니다.
여기에 "델"이 보이지 않습니다. del이 coom_item에서 발생하면 the 델 생산자 스레드에서 발생할 수 있습니다. 나는 이것이 "문제"라고 생각하지 않습니다.
그래도 이것을 사용하지 마십시오. 무의미한 폴링 사이클에서 CPU를 사용하게됩니다. 파이썬에는 이미 전역 잠금 장치가 있기 때문에 잠금 장치가있는 큐를 사용하는 것보다 빠르지 않습니다.
다른 팁
속임수가 당신을 물게 될 것입니다. 큐를 사용하여 스레드간에 통신하십시오.
이것은 아니다 진짜 생산자가 덮어 쓸 수 있기 때문에 스레드 안전합니다 QUEUE_ITEM
소비자가 소비하기 전에 소비자가 소비 할 수 있습니다 QUEUE_ITEM
두 배. 당신이 언급했듯이, 당신은 그것에 대해 괜찮지 만 대부분의 사람들은 그렇지 않습니다.
Cpython 내부에 대한 지식을 가진 사람은 더 많은 이론적 질문에 답해야합니다.
생산/소비하는 동안 스레드가 중단 될 수 있다고 생각합니다. 특히 항목이 큰 물체 인 경우. 편집 : 이것은 단지 거친 추측입니다. 나는 전문가가 아닙니다.
또한 다른 하나가 실행되기 전에 스레드가 수많은 품목을 생산/소비 할 수 있습니다.
둘 다 원자이기 때문에 Append/Pop을 고수하는 한 목록을 대기열로 사용할 수 있습니다.
QUEUE = []
# this is executed in one threading.Thread object
def producer():
global QUEUE
while True:
i = produce_item()
QUEUE.append(i)
# this is executed in another threading.Thread object
def consumer():
global QUEUE
while True:
try:
i = QUEUE.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
아래와 같은 클래스 범위에서는 대기열을 지울 수도 있습니다.
class Atomic(object):
def __init__(self):
self.queue = []
# this is executed in one threading.Thread object
def producer(self):
while True:
i = produce_item()
self.queue.append(i)
# this is executed in another threading.Thread object
def consumer(self):
while True:
try:
i = self.queue.pop(0)
except IndexError:
# queue is empty
continue
consume_item(i)
# There's the possibility producer is still working on it's current item.
def clear_queue(self):
self.queue = []
생성 된 바이트 코드를 살펴보면 어떤 목록 작업이 원자가인지 알아 내야합니다.
그만큼 __del__
당신이 말했듯이 문제가 될 수 있습니다. 쓰레기 수집가가 호출하는 것을 막는 방법 만 있으면 피할 수 있습니다. __del__
새 객체에 대한 메소드 새 객체를 QUEUE_ITEM
. 우리는 다음과 같은 것이 필요합니다.
increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object
그래도 두렵습니다. 그래도 가능한지 모르겠습니다.