Question

J'ai écrit récemment un programme qui a utilisé d'un simple producteur / modèle de consommation. Il avait d'abord un bug lié à une mauvaise utilisation de threading.Lock que je finalement fixé. Mais il m'a fait penser que ce soit possible de mettre en œuvre modèle producteur / consommateur d'une manière lockless.

Exigences dans mon cas étaient simples:

  • Un fil de producteur.
  • Un fil à la consommation.
  • File d'attente a place pour un seul élément.
  • Le producteur peut produire l'article suivant avant l'actuel est consommé. L'élément en cours est donc perdu, mais c'est OK pour moi.
  • consommateur peut consommer élément en cours avant le prochain est produit. L'élément en cours est donc consommé deux fois (ou plus), mais c'est OK pour moi.

J'ai donc écrit ceci:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

Ma question est la suivante: Est-ce thread-safe code

?

commentaire dans l'immédiat. Ce code n'est pas vraiment Lockless - J'utilise CPython et il a GIL

Je l'ai testé un peu le code et il semble fonctionner. Il se traduit par une certaine charge et STORE opérations qui sont atomiques en raison de GIL. Mais je sais aussi que l'opération n'est pas del x atomique lorsque x met en œuvre la méthode __del__. Donc, si mon article a une méthode et une planification <=> méchant arrive, les choses peuvent se casser. Ou pas?

Une autre question est: Quel genre de restrictions (par exemple sur le type d'articles produits) dois-je imposer pour rendre le code ci-dessus beau travail

Mes questions sont seulement théoriquement possible d'exploiter les bizarreries de son CPython et GIL afin de trouver une solution lockless (à savoir pas de serrures comme threading.Lock explicitement dans le code).

Était-ce utile?

La solution

Oui, cela fonctionnera de la manière que vous avez décrite:

  1. Que le producteur peut produire un élément skippable.
  2. Que le consommateur peut consommer le même élément.
  

Mais je sais aussi que le fonctionnement del x est pas atomique lorsque x outils del méthode . Donc, si mon article a une del méthode et une planification méchant arrive, les choses peuvent se casser.

Je ne vois pas un « del » ici. Si un del arrive à consume_item alors peut se produire dans le thread producteur del . Je ne pense pas que ce serait un « problème ».

Ne vous embêtez pas utiliser ce bien. Vous retrouvez à l'aide des CPU sur les cycles de vote inutiles, et il est pas plus rapide que d'utiliser une file d'attente avec les verrous depuis Python a déjà un verrou global.

Autres conseils

Supercherie vous mordre. Il suffit d'utiliser la file d'attente pour communiquer entre les threads.

Ce n'est pas vraiment thread-safe parce que le producteur pourrait remplacer avant la consommation a QUEUE_ITEM consommé et le consommateur pourrait consommer deux fois <=>. Comme vous l'avez mentionné, vous êtes OK avec cela, mais la plupart des gens ne sont pas.

Quelqu'un avec plus de connaissances internes de CPython devra vous répondre à des questions plus théoriques.

Je pense qu'il est possible qu'un fil est interrompu tout en produisant /, en particulier si les objets sont de grands objets. Edit: cela est juste une supposition sauvage. Je ne suis pas expert.

De plus, les fils peuvent produire / consommer un certain nombre d'articles avant que l'autre commence à fonctionner.

Vous pouvez utiliser une liste que la file d'attente aussi longtemps que vous vous en tenez à ajouter / pop puisque les deux sont atomiques.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

Dans un contexte de classe comme ci-dessous, vous pouvez même effacer la file d'attente.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Vous devez savoir quelles opérations de liste sont atomiques en regardant le bytecode généré.

Le __del__ pourrait être un problème comme vous le dites. Il pourrait être évité, si seulement il y avait un moyen d'empêcher le collecteur d'ordures d'invoquer la méthode sur l'ancien QUEUE_ITEM objet avant de terminer l'attribution de la nouvelle à la <=>. Nous aurions besoin de quelque chose comme:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Je crains, je ne sais pas s'il est possible, cependant.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top