Impossible d'obtenir la propriété de Céleri au travail
Question
Je suis en train de créer des tâches périodiques Céleri, et certains d'entre eux ont besoin d'avoir la possibilité de changer le temps de run_every lors de l'exécution. La documentation Céleri dit que je devrais être en mesure de le faire en tournant l'attribut run_every dans une propriété ( http://packages.python.org/celery/faq.html#can-i-change-the-interval-of-a- -tâche à-exécution périodique ).
Voici ce que je fais:
class ParseSomeStuffTask(PeriodicTask):
def run(self, **kwargs):
# Do stuff
@property
def run_every(self):
if datetime.now().weekday() in [1, 2, 3]:
return timedelta(minutes=15)
else:
return timedelta(seconds=40)
Malheureusement, quand je tourne sur celerybeat, je reçois l'erreur suivante:
[Jeu 9 septembre 2010 15:44:40: CRITIQUE / 828]: celerybeat soulevé exception: objet « » n'a pas datetime.timedelta attribut 'is_due'
Il arrête ensuite. La documentation Céleri ne va pas vraiment dans quoi revenir lors de run_every une propriété, et je n'ai pas eu de chance la recherche Google. changelogs Céleri dire son été en mesure de changer l'intervalle de tâche périodique lors de l'exécution depuis la version 1.0.0.
Dev. Environnement:
- Python 2.6.5
- Django 1.2.1
- Céleri 2.0.2
La solution
Céleri 2.0 prend en charge différents comportements de planification. Il y a celery.task.schedules.schedule
et celery.task.schedules.crontab
.
Vous devez retourner l'un de ceux-ci, ou faire votre propre sous-classe de l'annexe.
from celery.task.schedules import schedule
@property
def run_every(self):
if datetime.now().weekday() in [1, 2, 3]:
return schedule(timedelta(minutes=15))
else:
return schedule(timedelta(seconds=40))
L'attribut run_every
sera automatiquement converti à l'instanciation,
mais pas plus tard.