TPL Dataflow to the rescue. I have created a "producer/consumer" object with two queues that:
- accepts inputs from "producer" and stores it in the "in" queue.
- a internal asynchronous task read from the "in" queue and process the input in parallel with a given maximum degree of parallelism.
- put the processed item in the "out" queue afterwards. Result or Exception.
- accepts a consumer to
await
an item. Then can check if the processing was successful or not.
I have done some testing and it seems to work fine, I want to do more testing though:
public sealed class ProcessingResult<TOut>
where TOut : class
{
public TOut Result { get; internal set; }
public Exception Error { get; internal set; }
}
public abstract class ProcessingBufferBlock<TIn,TOut>
where TIn:class
where TOut:class
{
readonly BufferBlock<TIn> _in;
readonly BufferBlock<ProcessingResult<TOut>> _out;
readonly CancellationToken _cancellation;
readonly SemaphoreSlim _semaphore;
public ProcessingBufferBlock(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
{
_cancellation = cancellation;
_semaphore = new SemaphoreSlim(degreeOfParalellism);
var options = new DataflowBlockOptions() { BoundedCapacity = boundedCapacity, CancellationToken = cancellation };
_in = new BufferBlock<TIn>(options);
_out = new BufferBlock<ProcessingResult<TOut>>(options);
StartReadingAsync();
}
private async Task StartReadingAsync()
{
await Task.Yield();
while (!_cancellation.IsCancellationRequested)
{
var incoming = await _in.ReceiveAsync(_cancellation);
ProcessThroughGateAsync(incoming);
}
}
private async Task ProcessThroughGateAsync(TIn input)
{
_semaphore.Wait(_cancellation);
Exception error=null;
TOut result=null;
try
{
result = await ProcessAsync(input);
}
catch (Exception ex)
{
error = ex;
}
finally
{
if(result!=null || error!=null)
_out.Post(new ProcessingResult<TOut>() { Error = error, Result = result });
_semaphore.Release(1);
}
}
protected abstract Task<TOut> ProcessAsync(TIn input);
public void Post(TIn item)
{
_in.Post(item);
}
public Task<ProcessingResult<TOut>> ReceiveAsync()
{
return _out.ReceiveAsync();
}
}
So the example I used on the OP would be something like this:
public class WrapperProcessingQueue : ProcessingBufferBlock<MyClient, MyWrapper>
{
public WrapperProcessingQueue(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
: base(boundedCapacity, degreeOfParalellism, cancellation)
{ }
protected override async Task<MyWrapper> ProcessAsync(MyClient input)
{
await Task.Delay(5000);
if (input.Id % 3 == 0)
return null;
return new MyWrapper(input);
}
}
And then I could add MyClient
objects to that queue as fast as I get them, they would be processed in parallel, and the consumer would await for the ones that pass the filter.
As I said, I want to do more testing but any feedback will be very welcomed.
Cheers.