Normally, what you would do in a situation like this is to also set BoundedCapacity
of the CreateData
block. But that won't work here, because TransformManyBlock
doesn't seem to take BoundedCapacity
into account when filling the output queue from a single IEnumerable
.
What you can do instead is to create a function that iterates the collection and uses SendAsync()
to send more data only when the target can accept them:
/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
///
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
this ITargetBlock<T> target, IEnumerable<T> data)
{
try
{
foreach (var item in data)
{
await target.SendAsync(item);
}
}
catch (Exception e)
{
target.Fault(e);
}
}
Usage:
var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();
If you still wanted to have CreateData
block that would behave similarly to your original code, you could have two bounded BufferBlock
s, SendAllAsync()
between them and then use Encapsulate()
to make them look like one block:
/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
CreateBoundedTransformManyBlock<TInput, TOutput>(
Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
var input = new BufferBlock<TInput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
var output = new BufferBlock<TOutput>(
new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
Task.Run(
async () =>
{
try
{
while (await input.OutputAvailableAsync())
{
var data = transform(await input.ReceiveAsync());
await output.SendAllAsync(data);
}
output.Complete();
}
catch (Exception e)
{
((IDataflowBlock)input).Fault(e);
((IDataflowBlock)output).Fault(e);
}
});
return DataflowBlock.Encapsulate(input, output);
}