The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.
This happens because you post all your items very quickly, before the consumer has a chance to start. If you added Thread.Sleep(100)
, you would see that they actually do work in parallel.
Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.
TPL Dataflow is not magic: it won't modify your code to execute in parallel. It's you who calls AscTransConsumerAsync()
once, so don't be surprised that it actually executes only once.
TDF does support processing in parallel, but you would need to actually let it execute the processing code. To do this, use one of the execution blocks. In your case ActionBlock
seems appropriate.
If you use that, you can then configure the block to execute in parallel by setting MaxDegreeOfParallelism
. Of course, doing that means you need to ensure that the processing delegate is thread-safe.
With that, AscTransConsumerAsync()
might now look something like:
public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
// counter to track the number of items that are processed
Int64 count = 0;
var actionBlock = new ActionBlock<Int64>(
data =>
{
ProcessDataBuffer(data);
// count has to be accessed in a thread-safe manner
// be careful about using Interlocked,
// for more complicated computations, locking might be more appropriate
Interlocked.Increment(ref count);
},
// some small constant might be better than Unbounded, depedning on circumstances
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
// this assumes source will be completed when done,
// you need to call ascbuffer.Complete() after AscBufferProducer() for this
await actionBlock.Completion;
return count;
}