Solved by flagging ATask related model with aborted status flag and adding check at start of BTask.
Get celery task ids in advanced workflow
Question
I need to realise following scenario:
- Execute task A
- Execute multiple task B in parallel with different arguments
- Wait for all tasks to finish
- Execute multiple task B in parallel with different arguments
- Wait for all tasks to finish
- Execute task C
I have achieved this by implementing chain of chords, here is simplified code:
# inside run() method of ATask chord_chain = [] for taskB_group in taskB.groups.all(): tasks = [BTask().si(id=taskB_model.id) for taskB_model in taskB_group.children.all()] if len(tasks): chord_chain.append(chord(tasks, _dummy_callback.s())) chord_chain.append(CTask().si(execution_id)) chain(chord_chain)()
The problem is that I need to have ability to call revoke(terminate=True) on all BTasks in any point of time. The lower level problem is that I can't get to BTask celery ids.
- Tried to get BTask ids via chain
result = chain(chord_chain)()
. But I didn't found that information in returned AsyncResult object. Is it possible to get chain children ids from this object? (result.children is None) - Tried to get BTask ids via ATask AsyncResult, but it seems that children property only contains results of first chord and not the rest of tasks.
>>> r=AsyncResult(#ATask.id#) >>> r.children [<GroupResult: 5599ae69-4de0-45c0-afbe-b0e573631abc [#BTask.id#, #BTask.id#]>, <AsyncResult: #chord_unlock.id#>]
Solution
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow