Question

I run a pretty typical producer/consumer model on different tasks.

Task1: Reads batches of byte[] from binary files and kicks off a new task for each collection of byte arrays. (the operation is batched for memory management purposes).

Task 2-n: Those are worker tasks and each operates on the passed-in collection (from Tasks1) of byte arrays and de-serializes byte arrays, sorts them by certain criteria, and then stores a collection of resulting objects (each byte array deserializes into such object) in a Concurrent Dictionary.

Task (n+1) I chose a concurrent dictionary because the job of this task is to merge those collections that are stored in the concurrent dictionary in the same order than how they originated from Task1. I achieve that by passing a collectionID (it is of type int and incremented for each new collection within Task1) all the way down from Task1 to this task. This task basically checks whether the next expected collectionID is already stored in the concurrent dictionary and if yes, takes it out, adds it to a Final Queue and checks for the next collection in the concurrent dictionary.

Now, from what I have read and the videos I watched it seems to me that TPL Dataflow may be the perfect candidate for such producer/consumer model. I just do not seem to be able to devise a design and thus get started because I have never worked with TPL Dataflow. In terms of throughput and latency is this library even up to the task? I currently process 2.5 million byte arrays and thus objects per second in the resulting collections. Can TPL Dataflow help to simplify? I am especially interested in the answer to the following question: Can TPL Dataflow preserve the order of collection batches from Task1 when spawning off worker tasks and re-merging them once the worker tasks have done their work? Does it optimize things? Having profiled the whole structure I feel there is quite some time wasted due to spinning and too many concurrent collections involved.

Any ideas, thoughts?

Was it helpful?

Solution

EDIT: Turns out I was very wrong. TransformBlock does return items in the same order they came in, even if it is configured for parallelism. Because of that, the code in my original answer is completely useless and normal TransformBlock can be used instead.


Original answer:

As far as I know only one parallelism construct in .Net supports returning processed items in the order they came in: PLINQ with AsOrdered(). But it seems to me that PLINQ doesn't fit what you want well.

TPL Dataflow, on the other hand, fits well, I think, but it doesn't have a block that would support parallelism and returning items in order at the same time (TransformBlock supports both of them, but not at the same time). Fortunately, Dataflow blocks were designed with composability in mind, so we can build our own block that does that.

But first, we have to figure out how to order the results. Using a concurrent dictionary, like you suggested, along with some synchronization mechanism, would certainly work. But I think there is a simpler solution: use a queue of Tasks. In the output task, you dequeue a Task, wait for it to complete (asynchronously) and when it does, you send its result along. We still need some synchronization for the case when the queue is empty, but we can get that for free if we choose which queue to use cleverly.

So, the general idea is like this: what we're writing will be an IPropagatorBlock, with some input and some output. The easiest way to create a custom IPropagatorBlock is to create one block that processes the input, another block that produces the results and treat them as one using DataflowBlock.Encapsulate().

The input block will have to process the incoming items in the correct order, so no parallelization there. It will create a new Task (actually, a TaskCompletionSource, so that we can set the result of the Task later), add it to the queue and then send the item for processing, along with some way to set the result of the correct Task. Because we don't need to link this block to anything, we can use an ActionBlock.

The output block will have to take Tasks from the queue, asynchronously wait for them, and then send them along. But since all blocks have a queue embedded in them, and blocks that take delegates have asynchronous waiting built-in, this will be very simple: new TransformBlock<Task<TOutput>, TOutput>(t => t). This block will work both as the queue and as the output block. Because of this, we don't have to deal with any synchronization.

The last piece of the puzzle is actually processing the items in parallel. For this, we can use another ActionBlock, this time with MaxDegreeOfParallelism set. It will take the input, process it, and set the result of the correct Task in the queue.

Put together, it could look like this:

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

After so much talk, that's quite a small amount of code, I think.

It seems you care about performance a lot, so you might need to fine tune this code. For example, it might make sense to set MaxDegreeOfParallelism of the processor block to something like Environment.ProcessorCount, to avoid oversubscription. Also, if latency is more important than throughput to you, it might make sense to set MaxMessagesPerTask of the same block to 1 (or another small number) so that when processing of an item is finished, it's sent to the output immediately.

Also, if you want to throttle incoming items, you could set BoundedCapacity of enqueuer.

OTHER TIPS

Yes, the TPL Dataflow library is well suited for this job. It supports all the features that you need: MaxDegreeOfParallelism, BoundedCapacity and EnsureOrdered. But using the BoundedCapacity option requires some attention to details.

At first you must make sure that you feed the first block in the pipeline with the SendAsync method. Otherwise, if you use the Post method and ignore its return value, you may lose messages. The SendAsync will never lose messages, because it blocks asynchronously the caller until there is free space for the incoming message in the block's internal buffer.

Secondly you must ensure that a possible exception in a block downstream will not block indefinitely the feeder, awaiting for free space that will never come. There is no built-in way to make this happen automatically by configuring the blocks. Instead you must propagate manually the completion of the downstream blocks to the blocks upstream. This is the intention of the method PropagateFailure in the example below:

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top