Pregunta

¿Hay alguna forma de determinar si se pierde y volver a intentarlo?


Creo que la razón de Lost puede ser un bugs de despachador o un bloqueo de hilo de trabajadores.

Estaba planeando volver a intentarlos, pero no estoy seguro de cómo determinar qué tareas deben retirarse.

¿Y cómo hacer este proceso automáticamente? ¿Puedo usar mi propio planificador personalizado que creará nuevas tareas?

EDITAR: En la documentación, encontré que RabbitMQ nunca pierde las tareas, pero ¿qué sucede cuando el subproceso de trabajadores se bloquea en el medio de la ejecución de la tarea?

¿Fue útil?

Solución

Lo que necesitas es establecer

CELERY_ACKS_LATE = True

ACK tardío significa que los mensajes de la tarea serán reconocidos después de que se haya ejecutado la tarea, no solo antes, que es el comportamiento predeterminado. De esta manera, si el trabajador se estrella, Rabbit MQ aún tendrá el mensaje.

Obviamente, de un bloqueo total (conejo + trabajadores) al mismo tiempo, no hay forma de recuperar la tarea, excepto si implementa un inicio de sesión en el inicio de la tarea y el final de la tarea. Personalmente, escribo en una línea MongoDB cada vez que comienza una tarea y otra cuando la tarea termina (forma el resultado independientemente), de esta manera puedo saber qué tarea se interrumpió analizando los registros de Mongo.

Puede hacerlo fácilmente anulando los métodos __call__ y after_return de la clase de tareas base de apio.

A continuación, verá una parte de mi código que usa una clase de TaskLogger como Administrador de contexto (con punto de entrada y salida). La clase TaskLogger simplemente escribe una línea que contiene la información de la tarea en una instancia de MongoDB.

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

Espero que esto pueda ayudar

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top