Pergunta

Eu escrevi recentemente um programa que usado um padrão de produtor / consumidor simples. Ele inicialmente tinha um bug relacionado ao uso indevido de threading.Lock que eu finalmente fixas. Mas isso me fez pensar se é possível implementar produtor padrão / consumidor de forma lockless.

Requisitos no meu caso eram simples:

  • Um segmento produtor.
  • Um segmento do consumidor.
  • Queue tem lugar para apenas um item.
  • Produtor pode produzir próximo item antes de o atual é consumido. O item atual é, portanto, perdido, mas isso é OK para mim.
  • Consumidor pode consumir item atual antes que o próximo é produzido. O item atual é, portanto, consumido duas vezes (ou mais), mas isso é OK para mim.

Então eu escrevi o seguinte:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

A minha pergunta é: é thread-safe este código

comentário imediata: este código não é realmente lockless - Eu uso CPython e tem GIL

.

Eu testei o código um pouco e parece trabalho. Isso se traduz em alguns de carga e armazenamento ops que são atômica por causa do GIL. Mas também sei que a operação del x não é atômica quando x implementos __del__ método. Então, se meu item tem um método __del__ e alguns agendamento desagradável acontece, as coisas podem quebrar. Ou não?

Outra pergunta é: Que tipo de restrições (por exemplo no tipo itens produzidos) Eu tenho de impor a fazer o excelente trabalho código acima

As minhas perguntas são apenas cerca possibilidade teórica para explorar peculiaridades do do CPython e Gil, a fim de chegar a lockless (isto é, sem bloqueios como threading.Lock explicitamente no código) solução.

Foi útil?

Solução

Sim isso vai funcionar da maneira que você descreveu:

  1. Que o produtor pode produzir um elemento skippable.
  2. Que o consumidor pode consumir o mesmo elemento.

Mas também sei que a operação del x não é atômica quando x implementos del método. Então, se o meu item tem um del método e alguns agendamento desagradável acontece, as coisas podem quebrar.

Eu não ver um "del" aqui. Se um del acontece em consume_item então o del pode ocorrer no segmento produtor. Eu não acho que isso seria um "problema".

Não se preocupe com isso embora. Você vai acabar usando-se CPU em ciclos eleitorais inúteis, e não é mais rápido do que usar uma fila com fechaduras desde Python já tem um bloqueio global.

Outras dicas

Trickery vai morder você. Basta usar fila para se comunicar entre threads.

Este não é realmente segmento seguro porque o produtor pode substituir QUEUE_ITEM antes consumidor tem consumido-lo e consumidor poderia consumir QUEUE_ITEM duas vezes. Como você mencionou, você está OK com isso, mas a maioria das pessoas não são.

Alguém com mais conhecimento de internos CPython terá que responder-lhe perguntas mais teóricos.

Eu acho que é possível que um thread é interrompida enquanto produzindo / consumindo, especialmente se os itens são grandes objetos. Edit: este é apenas um palpite. Não sou especialista.

Além disso, os tópicos podem produzir / consumir qualquer número de itens antes do outro começa a funcionar.

Você pode usar uma lista como a fila enquanto se mantiver a acrescentar / pop uma vez que ambos são atômicas.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

Em um escopo de classe como abaixo, você pode até mesmo limpar a fila.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Você vai ter que descobrir qual operações de lista são atômicas por olhar para o bytecode gerado.

O __del__ poderia ser um problema, como você disse. Pode ser evitado, se ao menos houvesse uma maneira de impedir que o coletor de lixo de invocar o método __del__ no objeto de idade antes de terminar atribuindo o novo para o QUEUE_ITEM. Nós precisaríamos de algo como:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Eu tenho medo, eu não sei se é possível, no entanto.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top