Question

I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)

When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.

This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}
Was it helpful?

Solution

Normally, what you would do in a situation like this is to also set BoundedCapacity of the CreateData block. But that won't work here, because TransformManyBlock doesn't seem to take BoundedCapacity into account when filling the output queue from a single IEnumerable.

What you can do instead is to create a function that iterates the collection and uses SendAsync() to send more data only when the target can accept them:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

Usage:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

If you still wanted to have CreateData block that would behave similarly to your original code, you could have two bounded BufferBlocks, SendAllAsync() between them and then use Encapsulate() to make them look like one block:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top