Question

I need to realise following scenario:

  • Execute task A
    • Execute multiple task B in parallel with different arguments
    • Wait for all tasks to finish
    • Execute multiple task B in parallel with different arguments
    • Wait for all tasks to finish
    • Execute task C

I have achieved this by implementing chain of chords, here is simplified code:

# inside run() method of ATask
chord_chain = []
for taskB_group in taskB.groups.all():
  tasks = [BTask().si(id=taskB_model.id) for taskB_model in taskB_group.children.all()]
  if len(tasks):
    chord_chain.append(chord(tasks, _dummy_callback.s()))
chord_chain.append(CTask().si(execution_id))
chain(chord_chain)()

The problem is that I need to have ability to call revoke(terminate=True) on all BTasks in any point of time. The lower level problem is that I can't get to BTask celery ids.

  1. Tried to get BTask ids via chain result = chain(chord_chain)(). But I didn't found that information in returned AsyncResult object. Is it possible to get chain children ids from this object? (result.children is None)
  2. Tried to get BTask ids via ATask AsyncResult, but it seems that children property only contains results of first chord and not the rest of tasks.
>>> r=AsyncResult(#ATask.id#)
>>> r.children
[<GroupResult: 5599ae69-4de0-45c0-afbe-b0e573631abc [#BTask.id#, #BTask.id#]>, 
<AsyncResult: #chord_unlock.id#>] 
Was it helpful?

Solution

Solved by flagging ATask related model with aborted status flag and adding check at start of BTask.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top