Comment puis-je mettre en place Céleri pour appeler une fonction d'initialisation personnalisée avant d'exécuter mes tâches?

StackOverflow https://stackoverflow.com/questions/2129820

Question

J'ai un projet Django et je suis en train d'utiliser Céleri pour soumettre des tâches pour le traitement de fond ( http://ask.github.com/celery/introduction.html ). Céleri intègre bien avec Django et je l'ai été en mesure de présenter mes tâches personnalisées et retourner les résultats.

Le seul problème est que je ne peux pas trouver une façon saine d'effectuer une initialisation personnalisée dans le processus démon. Je dois appeler une fonction coûteuse qui charge beaucoup de mémoire avant de commencer le traitement des tâches, et je ne peux pas se permettre d'appeler cette fonction à chaque fois.

Quelqu'un at-il eu ce problème avant? Toute idée comment travailler autour d'elle sans modifier le code source de Céleri?

Merci

Était-ce utile?

La solution

Vous pouvez écrire un chargeur sur mesure, ou utiliser les signaux.

Chargeurs ont la méthode on_task_init, qui est appelée lorsqu'une tâche est sur le point d'être exécuté, et on_worker_init qui est appelé par le céleri + processus principal celerybeat.

En utilisant des signaux est probablement le plus facile, les signaux disponibles sont:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Distribué lorsqu'une tâche est sur le point d'être exécuté par le travailleur (ou localement si vous utilisez apply / ou si CELERY_ALWAYS_EAGER a été défini).

  • task_postrun(task_id, task, args, kwargs, retval) Distribué après une tâche a été exécutée dans les mêmes conditions que ci-dessus.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Appelée lorsqu'une tâche est appliquée (pas bon pour les opérations de longue durée)

Signaux supplémentaires disponibles dans 0.9.x (de la branche principale en cours sur github):

  • worker_init()

    Appelé lorsque celeryd a commencé (avant que la tâche est initialisé, donc si un système de soutien fork, tout changement de mémoire sera copié à l'enfant processus de travail).

  • worker_ready()

    Appelée lorsque celeryd est capable de recevoir des tâches.

  • worker_shutdown()

    Appelée lorsque celeryd est en cours d'arrêt.

Voici un exemple précalculer quelque chose la première fois qu'une tâche est exécutée dans le processus:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

Si vous voulez que la fonction à exécuter toutes les tâches, sautez l'argument sender.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top