Question

I look for an alternative to JoinBlock which can be linked to by n-TransformBlocks and join/merge messages of all TransformBlock source blocks together in order to pass a collection of such on to another data flow block.

JoinBlock does the job fine but it is limited to hooking up to 3 source blocks. It also suffers from quite a number inefficiencies (very slow to join even value types (ints) of 2 source blocks). Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the Task<item>?

Any alternative ideas? I potentially have 1-20 such transform blocks which items I need to join together before passing on the joined item collection. Each transform block is guaranteed to return exactly one output item for each input item "transformed".

Edit: Requested clarification:

Per one of my previous questions, I set up my JoinBlocks as follows:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}
Was it helpful?

Solution

One way to do this is to use BatchBlock with Greedy set to false. In this configuration, the block doesn't do anything until there are n items from n different blocks waiting for it to be consumed (where n is the number you set when creating the BatchBlock). When that happens, it consumes all n items at once and produces an array containing all of the items.

One caveat with this solution is that the resulting array is not sorted: you're not going to know which item came from which source. And I have no idea how does its performance compare with JoinBlock, you'll have to test that by yourself. (Though I would understand if using BatchBlock this way was slower, because of the overhead necessary for non-greedy consumption.)

OTHER TIPS

If you want to perform multiple parallel operations for each item, it makes more sense IMHO to perform these operations inside a single block, instead of splitting them to multiple blocks and then trying to join the independent results into a single object again. So my suggestion is to do something like this:

var block = new TransformBlock<MyClass, MyClass>(async item =>
{
    Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
    Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    item.Property1 = task1.Result;
    item.Property2 = task2.Result;
    return item;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

In the above example items of type MyClass are passed through a TransformBlock. The properties Property1 and Property2 of each item are calculated in parallel using a separate Task for each property. Then both tasks are awaited, and when both are complete the results are assigned to the properties of the item. Finally the processed item is returned.

The only thing you want to be aware with this approach is that the degree of parallelism will be the product of the internal parallel operations and the MaxDegreeOfParallelism option of the block. So in the above example the degree of parallelism will be 2 x 2 = 4. To be precise this will be the maximum degree of parallelism, because it is possible that one of the two internal calculations will be slower than the other. So at any given moment the actual degree of parallelism could be anything between 2 and 4.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top