Question

I'm trying to update some aggregate data at the completion of a chunk of tasks. It's basically the concept of a chord being implemented at the end of a chunk. How do I implement this?

Was it helpful?

Solution

Ok, finally I solved it like this.

I first break the incoming arg_collection into chunk sized sub collections. For each of them, we then apply a chord.



    # required util
    def n_sized_parts(inp_iterable, n):
         return (inp_iterable[i:i+n] for i in xrange(0,len(inp_iterable),n))

    # defintion of tasks
    @task
    def gen_task(arg1, arg2):
        return arg1 + arg2

    @task
    def any_callback_task_for_end_of_chunk(some_arg):
        return some_arg * 10
        # this is a trivial example. 
        # usually, you will want to do something better.

    @task
    def apply_chord_to_chunk(arg_collection, *args, **kwargs):
        task_list = []
        callback_arg = kwargs.pop('callback_arg')

        for part in n_sized_parts(arg_collection, size_of_each_chunk):
             task_list.append(chord(gen_task.starmap(part))(any_callback_task_for_end_of_chunk.si(callback_arg))))

         return task_list

    # invocation
    apply_chord_to_chunk.s(zip(xrange(100),xrange(100)), callback_arg='asdgag')

If you find some problem anywhere in this solution, please let me know. I'll quickly update with any required edits.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top