Question

I'm experimenting with TPL dataflow by porting some old socket code over for using TPL dataflow and the new async features. Although the API feels rock solid, my code still ends up feeling messy. I'm wondering if I'm missing something here.

My requirements is as follows: A socket class exposes: Open, Close, Send and Receive methods. All return a Task and therefore are async. Open and Close are atomic. Send and Receive can work next to each other although both can only handle 1 command a time.

Logically this brings me to the next piece of code for internal control:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });

So far all good. I can safely send actions to the Send and Receive block without having to worry about a running connection related action in the meantime. Also ActionBlock ensures that multiple calls to send are synchronized (idem for receive, close and open).

The problem is that there is no easy way for an action to communicate a task back to the poster. Right now i'm using a TaskCompletionSource to communicate a result back. Like:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();

    sendBlock.Post(async () =>
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Cant send when not open");
        else
        {
            await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
            resultCompletionSource.SetResult(null);
        }
    });

    return resultCompletionSource.Task;
}

which just feels ugly and clumsy. My question is: Is there a way to synchronize the workflow using TPL without having to use a TaskCompletionSource for communication in between?

Thanks!

Was it helpful?

Solution

First, you don't need TPL Dataflow for this at all, because you don't actually have any dataflow.

Second, using TaskSchedulers like this is not the correct solution either. TaskSchedulers schedule code, but while you await something, there is no code running. So, while WriteAsync() is doing its async work, code for Open() could run.

What you actually need is something like ReaderWriterLock, but that works well with async. There isn't anything like that in the framework, but you could use code from Stephen Toub's article Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock, which does exactly what you need. The article also explains in more detail why using a TaskScheduler is wrong.

Using AsyncReaderWriterLock, your code might look like this:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    using (await readerWriterLock.ReaderLockAsync())
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Can't send when not open");

        await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top