Yeah, so... you're kinda half using the power of TPL here. The fact that you're still manually receiving items from the BufferBlock
in your own while loop in a background Task
is not the "way" you want to do it if you're subscribing to the TPL DataFlow style.
What you would do is link an ActionBlock
to the BufferBlock
and do your command processing/sending from within that. This is also the block where you would set the MaxDegreeOfParallelism
to control just how many concurrent commands you want to process. So that setup might look something like this:
// Initialization logic to build up the TPL flow
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new ActionBlock<Command>(this.ProcessCommand);
_pendingCommands.LinkTo(_commandProcessor);
private Task ProcessCommand(Command command)
{
var response = await _commandProcessor.RunCommand(command.Content);
this.Send(command.Client, response);
}
Then, in your shutdown code, you would need to signal that you're done adding items into the pipeline by calling Complete
on the _pipelineCommands
BufferBlock
and then wait on the _commandProcessor
ActionBlock
to complete to ensure that all items have made their way through the pipeline. You do this by grabbing the Task
returned by the block's Completion
property and calling Wait
on it:
_pendingCommands.Complete();
_commandProcessor.Completion.Wait();
If you want to go for bonus points, you can even separate the command processing from the command sending. This would allow you to configure those steps separately from one another. For example, maybe you need to limit the number of threads processing commands, but want to have more sending out the responses. You would do this by simply introducing a TransformBlock
into the middle of the flow:
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new TransformBlock<Command, Tuple<Client, Response>>(this.ProcessCommand);
_commandSender = new ActionBlock<Tuple<Client, Response>(this.SendResponseToClient));
_pendingCommands.LinkTo(_commandProcessor);
_commandProcessor.LinkTo(_commandSender);
private Task ProcessCommand(Command command)
{
var response = await _commandProcessor.RunCommand(command.Content);
return Tuple.Create(command, response);
}
private Task SendResponseToClient(Tuple<Client, Response> clientAndResponse)
{
this.Send(clientAndResponse.Item1, clientAndResponse.Item2);
}
You probably want to use your own data structure instead of Tuple
, it was just for illustrative purposes, but the point is this is exactly the kind of structure you want to use to break up the pipeline so that you can control the various aspects of it exactly how you might need to.