Frage

Ich schrieb vor kurzem ein Programm, das ein einfaches Producer / Consumer-Muster verwendet. Es hatte zunächst einen Fehler unsachgemäßen Gebrauch von threading.Lock Zusammenhang, dass ich schließlich fixiert. Aber es hat mich zum Nachdenken, ob es möglich Erzeuger / Verbraucher-Muster in einer lockless Weise zu implementieren.

Anforderungen in meinem Fall waren einfach:

  • Ein Hersteller Thread.
  • Ein Verbraucher-Thread.
  • Warteschlange hat Platz für nur ein Element.
  • Der Produzent kann nächsten Punkt erzeugen, bevor die aktuellen verbraucht wird. Das aktuelle Element wird deshalb verloren, aber das ist in Ordnung für mich.
  • Verbraucher können aktuelles Element verbrauchen, bevor die nächsten erzeugt wird. Das aktuelle Element wird deshalb zweimal verbraucht (oder mehr), aber das ist in Ordnung für mich.

Also schrieb ich diese:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

Meine Frage ist: Ist dieser Code Thread-sicher

?

Sofortiger Kommentar:. Dieser Code ist nicht wirklich Lockless - Ich benutze CPython und es hat GIL

Ich testete den Code ein wenig, und es scheint zu funktionieren. Er übersetzt zu einigen LOAD und STORE ops, die wegen GIL Atom sind. Aber ich weiß auch, dass del x Betrieb nicht atomar ist, wenn x __del__ Methode implementiert. Also, wenn mein Titel eine __del__ Methode hat und einige böse Scheduling geschieht, kann die Dinge brechen. Oder nicht?

Eine andere Frage ist: Welche Art von Einschränkungen (zum Beispiel auf Anfertigungen Typ) muss ich verhängen, um den obigen Code funktionieren zu machen

Meine Fragen sind nur über theoretische Möglichkeit CPython ist und GIL Marotten, um zu kommen mit lockless (das heißt keine Schlösser wie threading.Lock explizit im Code) Lösung.

auszubeuten
War es hilfreich?

Lösung

Ja, das wird in der Art und Weise arbeiten, die Sie beschrieben:

  1. Dass der Hersteller kann ein überspringbarer Element erzeugen.
  2. , dass der Verbraucher das gleiche Element verbrauchen kann.
  

Aber ich weiß auch, dass del x Betrieb nicht atomar ist, wenn x Arbeitsgeräte del Methode. Also, wenn meine Artikel ein hat del Methode und einige böse Scheduling geschieht, kann die Dinge brechen.

Ich sehe nicht ein „del“ hier. Wenn ein del in consume_item geschieht dann der del kann in dem Producer-Thread auftreten. Ich glaube nicht, dies wäre ein „Problem“.

Kümmern Sie sich nicht um dieses obwohl verwenden. Sie werden mit bis CPU für sinnlose Abfragezyklen am Ende, und es ist nicht schneller als eine Warteschlange mit Sperren verwenden, da Python bereits eine globale Sperre hat.

Andere Tipps

Trickery werden Sie beißen. verwenden Queue einfach zu kommunizieren zwischen Threads.

Dies ist nicht wirklich Thread sicher, weil Produzent QUEUE_ITEM überschreiben könnte, bevor der Verbraucher sie verbraucht hat und Verbraucher QUEUE_ITEM zweimal verbrauchen könnten. Wie Sie erwähnt haben, sind Sie OK mit, dass aber die meisten Menschen nicht.

Jemand mit mehr Wissen über CPython Interna haben Sie mehr theoretische Fragen zu beantworten.

Ich denke, es ist möglich, dass ein Thread unterbrochen wird, während die Herstellung / raubend, vor allem, wenn die Einzelteile sind große Objekte. Edit: das ist nur eine wilde Vermutung. Ich bin kein Experte.

Auch können die Fäden produzieren / konsumieren beliebige Anzahl von Elementen, bevor der andere beginnt zu laufen.

Sie können eine Liste als die Warteschlange verwenden, solange Sie anhängen halten Sie sich an / pop da beide Atom sind.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In einem Klassenbereich wie unten, können Sie auch die Warteschlange löschen.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Sie müssen finden Sie heraus, welche Listenoperationen durch einen Blick auf den Bytecode erzeugt atomar sind.

Die __del__ könnte ein Problem sein, wie Sie gesagt haben. Es könnte vermieden werden, wenn nur ein Weg, es war der Garbage Collector von Aufrufen der __del__ Methode auf dem alten Objekt zu verhindern, bevor wir das neue zu dem QUEUE_ITEM fertig zuweisen. Wir bräuchten so etwas wie:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Ich fürchte, ich weiß nicht, ob es möglich ist, though.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top