It would be nice if the TransformBlock
had a ProcessingCompleted
event that would fire when the block has completed the processing of all messages in its queue, but there is no such event. Below is an attempt to rectify this omission. The CreateTransformBlockEx
method accepts an Action<Exception>
handler, that is invoked when this "event" occurs.
The intention was to always invoke the handler before the final completion of the block. Unfortunately in the case that the supplied CancellationToken
is canceled, the completion (cancellation) happens first, and the handler is invoked some milliseconds later. To fix this inconsistency would require some tricky workarounds, and may had other unwanted side-effects, so I am leaving it as is.
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (onProcessingCompleted == null)
throw new ArgumentNullException(nameof(onProcessingCompleted));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var transformBlock = new TransformBlock<TInput, TOutput>(transform,
dataflowBlockOptions);
var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);
transformBlock.LinkTo(bufferBlock);
PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
return DataflowBlock.Encapsulate(transformBlock, bufferBlock);
async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
Action<Exception> completionHandler)
{
try
{
await block1.Completion.ConfigureAwait(false);
}
catch { }
var exception =
block1.Completion.IsFaulted ? block1.Completion.Exception : null;
try
{
// Invoke the handler before completing the second block
completionHandler(exception);
}
finally
{
if (exception != null) block2.Fault(exception); else block2.Complete();
}
}
}
// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return CreateTransformBlockEx<TInput, TOutput>(
x => Task.FromResult(transform(x)), onProcessingCompleted,
dataflowBlockOptions);
}
The code of the local function PropagateCompletion
mimics the source code of the LinkTo
built-in method, when invoked with the PropagateCompletion = true
option.
Usage example:
var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10
});