Domanda

Recentemente ho scritto un programma che utilizza un semplice modello produttore / consumatore. Inizialmente aveva un bug relativo ad un uso improprio di threading.Lock che alla fine ho risolto. Ma mi ha fatto pensare se è possibile implementare modello produttore / consumatore in modo lockless.

Requisiti nel mio caso erano semplici:

  • Un thread produttore.
  • Un thread consumatore.
  • Coda ha posto per un solo elemento.
  • produttore in grado di produrre il prossimo elemento prima di quello corrente viene consumata. La voce corrente è quindi persa, ma va bene per me.
  • Consumer può consumare elemento corrente prima di quello successivo viene prodotto. La voce corrente è quindi consumato due volte (o più), ma va bene per me.

Così ho scritto questo:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

La mia domanda è: è questo codice thread-safe

?

Commento immediata:. Questo codice non è davvero lockless - io uso CPython ed ha GIL

Ho provato il codice un po 'e sembra funzionare. Essa si traduce in alcune ops load e store che sono atomico a causa della GIL. Ma so anche che del x operazione non è atomica quando x attrezzi metodo __del__. Quindi, se il mio oggetto ha un metodo <=> e qualche brutta programmazione accade, le cose possono rompersi. O no?

Un'altra domanda è: che tipo di limitazioni (ad esempio sul tipo di prodotti articoli) devo imporre a fare quanto sopra bel lavoro codice

Le mie domande sono solo circa possibilità teorica di sfruttare stranezze di CPython e Gil di al fine di venire con lockless (ovvero le serrature come threading.Lock esplicitamente nel codice) soluzione.

È stato utile?

Soluzione

Sì, questo funzionerà nel modo in cui hai descritto:

  1. che il produttore può produrre un elemento skippable.
  2. Che il consumatore può consumare lo stesso elemento.
  

Ma so anche che del funzionamento x non è atomica quando x implementa del metodo. Quindi, se il mio oggetto ha una del metodo e qualche brutta programmazione accade, le cose possono rompersi.

Non vedo un "del" qui. Se un del accade nel consume_item poi il del si possono verificare nel thread produttore. Non credo che questo sarebbe un "problema".

Non preoccupatevi di utilizzare questo però. Si finisce per utilizzare la CPU sui cicli elettorali inutili, e non è più veloce rispetto all'utilizzo di una coda con le serrature dal Python ha già un blocco globale.

Altri suggerimenti

Inganno ti morderà. Basta usare Coda per comunicare tra i thread.

Non è davvero al sicuro perché produttore potrebbe sovrascrivere QUEUE_ITEM prima consumatore ha consumato e consumatore potrebbe consumare <=> due volte thread. Come lei ha ricordato, sei d'accordo con questo, ma la maggior parte delle persone non sono.

Qualcuno con più conoscenza dei meccanismi interni di CPython dovrà rispondere voi le domande più teorico.

Credo che sia possibile che un thread viene interrotto, mentre la produzione / consumo, soprattutto se gli elementi sono oggetti di grandi dimensioni. Edit: questa è solo una supposizione. Non sono un esperto.

Anche i fili possono produrre / consumare qualsiasi numero di elementi prima dell'altro inizia a funzionare.

È possibile utilizzare una lista come la coda finché vi limitate a accodare / pop poiché entrambi sono atomica.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In un ambito di classe come qui di seguito, si può anche cancellare la coda.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Dovrai scoprire quali operazioni di lista sono atomiche guardando il bytecode generato.

Il __del__ potrebbe essere un problema in quanto hai detto. Si potrebbe essere evitato, se solo ci fosse un modo per evitare che il garbage collector di invocare il metodo QUEUE_ITEM sul vecchio oggetto Prima di finire di assegnare il nuovo per il <=>. Avremmo bisogno di qualcosa come:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Ho paura, non so se è possibile, però.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top