Question

I need to realise following scenario:

  • Execute task A
    • Execute multiple task B in parallel with different arguments
    • Wait for all tasks to finish
    • Execute multiple task B in parallel with different arguments
    • Wait for all tasks to finish
    • Execute task C

I have achieved this by implementing chain of chords, here is simplified code:

# inside run() method of ATask
chord_chain = []
for taskB_group in taskB.groups.all():
  tasks = [BTask().si(id=taskB_model.id) for taskB_model in taskB_group.children.all()]
  if len(tasks):
    chord_chain.append(chord(tasks, _dummy_callback.s()))
chord_chain.append(CTask().si(execution_id))
chain(chord_chain)()

The problem is that I need to have ability to call revoke(terminate=True) on all BTasks in any point of time. The lower level problem is that I can't get to BTask celery ids.

  1. Tried to get BTask ids via chain result = chain(chord_chain)(). But I didn't found that information in returned AsyncResult object. Is it possible to get chain children ids from this object? (result.children is None)
  2. Tried to get BTask ids via ATask AsyncResult, but it seems that children property only contains results of first chord and not the rest of tasks.
>>> r=AsyncResult(#ATask.id#)
>>> r.children
[<GroupResult: 5599ae69-4de0-45c0-afbe-b0e573631abc [#BTask.id#, #BTask.id#]>, 
<AsyncResult: #chord_unlock.id#>] 
Était-ce utile?

La solution

Solved by flagging ATask related model with aborted status flag and adding check at start of BTask.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top