Riprova persi o non è riuscito Tasks (sedano, Django e RabbitMQ)
-
26-10-2019 - |
Domanda
C'è un modo per determinare se tutta l'operazione è perso e riprovare?
Credo che la ragione per la perduta può essere dispatcher incidente bug o lavoratore thread.
Avevo intenzione di ripetere loro, ma non sono sicuro di come determinare quali attività devono essere dismesso?
E come rendere automatico questo processo? Posso utilizzare il mio proprio scheduler personalizzato che creerà nuove attività?
Modifica:? Ho trovato dalla documentazione che RabbitMQ compiti mai sciolto, ma cosa succede quando thread di lavoro incidente nel bel mezzo di esecuzione dell'attività
Soluzione
Quello che vi serve è quello di set
CELERY_ACKS_LATE = True
mezzi tardi ACK che i messaggi di attività saranno riconosciuti dopo che il compito è stato eseguito, non solo prima, che è il comportamento predefinito. In questo modo se il lavoratore si blocca coniglio MQ avrà ancora il messaggio.
Ovviamente di un arresto totale (Coniglio + lavoratori) allo stesso tempo, non c'è modo di recuperare il compito, a meno che non si implementa un logging su di avvio dell'attività e di fine operazione. Personalmente scrivo in un MongoDB una linea ogni volta che un avvio di un'attività e un altro quando la finitura compito (formare in modo indipendente il risultato), in questo modo posso sapere quale attività è stata interrotta, analizzando i registri di Mongo.
Si può fare facilmente l'override del metodo __call__
e after_return
della classe compito di base di sedano.
A seguito si vede un pezzo del mio codice che utilizza una classe taskLogger come contesto manager (con ingresso e di uscita). La classe taskLogger scrive semplicemente una riga contenente informazioni compito in un'istanza MongoDB.
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
Spero che questo potrebbe aiutare