Come posso impostare sedano per chiamare una funzione di inizializzazione personalizzato prima di eseguire i miei compiti?
Domanda
Ho un progetto Django e sto cercando di utilizzare sedano a presentare le attività per l'elaborazione in background ( http://ask.github.com/celery/introduction.html ). Sedano si integra bene con Django e sono stato in grado di presentare le mie attività personalizzate e tornare risultati.
L'unico problema è che non riesco a trovare un modo sano di eseguire l'inizializzazione personalizzato nel processo demone. Ho bisogno di chiamare una funzione costosa che carica un sacco di memoria prima di iniziare a elaborare i compiti, e non posso permettermi di chiamare tale funzione ogni volta.
Qualcuno ha avuto questo problema prima? Delle idee come lavorare intorno ad esso senza modificare il codice sorgente di sedano?
Grazie
Soluzione
È possibile scrivere un caricatore personalizzato, o utilizzare i segnali.
Pale hanno il metodo on_task_init
, che è chiamato quando un'attività sta per essere eseguita,
e on_worker_init
che è chiamato dal celerybeat processo principale sedano +.
Usando segnali è probabilmente il più facile, i segnali disponibili sono:
0.8.x:
-
task_prerun(task_id, task, args, kwargs)
Inviato quando un compito è in procinto di essere giustiziato dal lavoratore (o localmente se si utilizza
apply
/ o seCELERY_ALWAYS_EAGER
è stata impostata). -
task_postrun(task_id, task, args, kwargs, retval)
Inviato dopo un'operazione è stata eseguita nelle stesse condizioni di cui sopra. -
task_sent(task_id, task, args, kwargs, eta, taskset)
Chiamato quando viene applicato un compito (non va bene per operazioni di lunga durata)
Ulteriori segnali disponibili in 0.9.x (ramo master corrente su GitHub):
-
worker_init()
Chiamato quando celeryd ha iniziato (prima che l'operazione viene inizializzata, quindi, se su un sistema di supporto
fork
, eventuali modifiche di memoria viene copiato al bambino processi di lavoro). -
worker_ready()
Chiamato quando celeryd è in grado di ricevere i compiti.
-
worker_shutdown()
Chiamato quando celeryd si sta spegnendo.
Ecco un esempio il ricalcolo qualcosa la prima volta un'attività viene eseguita nel processo:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Se si desidera che la funzione da eseguire per tutte le attività, basta saltare l'argomento sender
.