Question

I want to set up a TransformBlock that processes its item in parallel. Thus, I'm setting ExecutionDataflowBlockOptions.MaxDegreeOfParallelism to > 1. I don't care about the order of the messages but the documentation says:

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

Does "correctly ordered" mean that if there is one message in the queue that needs long processing time, further messages are not output until this one message is processed?

And if so, how can I specify an Execution Block (for example a TransformBlock) that does not care about the ordering? Or do I have to specify at the consumption end that I don't care about ordering?

Was it helpful?

Solution

There is no such block in the library, but you can easily create one yourself by combining an ActionBlock and a BufferBlock. Something like:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

This way, once an item is processed by the ActionBlock, it's immediately moved to the BufferBlock, which means ordering is not maintained.

One issue with this code is that it doesn't observe the set BoundedCapacity well: in effect, the capacity of this block is twice the capacity set in options (because each of the two blocks has a separate capacity).

OTHER TIPS

(Upgrading a comment by NPNelson to an answer)

The DataflowBlockOptions class contains a configurable property EnsureOrdered (introduced in 2016), that determines if the order of the received messages will be preserved at the block's output. This property is true by default. Setting this property to false makes the block to propagate messages as soon as they are processed, increasing this way the throughput of the pipeline because of the faster propagation and the reduced overhead.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top