Domanda

C'è un modo per determinare se tutta l'operazione è perso e riprovare?


Credo che la ragione per la perduta può essere dispatcher incidente bug o lavoratore thread.

Avevo intenzione di ripetere loro, ma non sono sicuro di come determinare quali attività devono essere dismesso?

E come rendere automatico questo processo? Posso utilizzare il mio proprio scheduler personalizzato che creerà nuove attività?

Modifica:? Ho trovato dalla documentazione che RabbitMQ compiti mai sciolto, ma cosa succede quando thread di lavoro incidente nel bel mezzo di esecuzione dell'attività

È stato utile?

Soluzione

Quello che vi serve è quello di set

CELERY_ACKS_LATE = True

mezzi tardi ACK che i messaggi di attività saranno riconosciuti dopo che il compito è stato eseguito, non solo prima, che è il comportamento predefinito. In questo modo se il lavoratore si blocca coniglio MQ avrà ancora il messaggio.

Ovviamente di un arresto totale (Coniglio + lavoratori) allo stesso tempo, non c'è modo di recuperare il compito, a meno che non si implementa un logging su di avvio dell'attività e di fine operazione. Personalmente scrivo in un MongoDB una linea ogni volta che un avvio di un'attività e un altro quando la finitura compito (formare in modo indipendente il risultato), in questo modo posso sapere quale attività è stata interrotta, analizzando i registri di Mongo.

Si può fare facilmente l'override del metodo __call__ e after_return della classe compito di base di sedano.

A seguito si vede un pezzo del mio codice che utilizza una classe taskLogger come contesto manager (con ingresso e di uscita). La classe taskLogger scrive semplicemente una riga contenente informazioni compito in un'istanza MongoDB.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

Spero che questo potrebbe aiutare

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top