Come posso impostare sedano per chiamare una funzione di inizializzazione personalizzato prima di eseguire i miei compiti?

StackOverflow https://stackoverflow.com/questions/2129820

Domanda

Ho un progetto Django e sto cercando di utilizzare sedano a presentare le attività per l'elaborazione in background ( http://ask.github.com/celery/introduction.html ). Sedano si integra bene con Django e sono stato in grado di presentare le mie attività personalizzate e tornare risultati.

L'unico problema è che non riesco a trovare un modo sano di eseguire l'inizializzazione personalizzato nel processo demone. Ho bisogno di chiamare una funzione costosa che carica un sacco di memoria prima di iniziare a elaborare i compiti, e non posso permettermi di chiamare tale funzione ogni volta.

Qualcuno ha avuto questo problema prima? Delle idee come lavorare intorno ad esso senza modificare il codice sorgente di sedano?

Grazie

È stato utile?

Soluzione

È possibile scrivere un caricatore personalizzato, o utilizzare i segnali.

Pale hanno il metodo on_task_init, che è chiamato quando un'attività sta per essere eseguita, e on_worker_init che è chiamato dal celerybeat processo principale sedano +.

Usando segnali è probabilmente il più facile, i segnali disponibili sono:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Inviato quando un compito è in procinto di essere giustiziato dal lavoratore (o localmente se si utilizza apply / o se CELERY_ALWAYS_EAGER è stata impostata).

  • task_postrun(task_id, task, args, kwargs, retval) Inviato dopo un'operazione è stata eseguita nelle stesse condizioni di cui sopra.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Chiamato quando viene applicato un compito (non va bene per operazioni di lunga durata)

Ulteriori segnali disponibili in 0.9.x (ramo master corrente su GitHub):

  • worker_init()

    Chiamato quando celeryd ha iniziato (prima che l'operazione viene inizializzata, quindi, se su un sistema di supporto fork, eventuali modifiche di memoria viene copiato al bambino processi di lavoro).

  • worker_ready()

    Chiamato quando celeryd è in grado di ricevere i compiti.

  • worker_shutdown()

    Chiamato quando celeryd si sta spegnendo.

Ecco un esempio il ricalcolo qualcosa la prima volta un'attività viene eseguita nel processo:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

Se si desidera che la funzione da eseguire per tutte le attività, basta saltare l'argomento sender.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top