Question

I have identified a bottleneck in my TCP application that I have simplified for the sake of this question.

I have a MyClient class, that represents when a client connects; also I have a MyWrapper class, that represents a client that fulfill some conditions. If a MyClientfulfill some conditions, it qualifies for wrapper.

I want to expose an method that allows the caller to await a MyWrapper, and that method should handle the negotiation and rejection of invalid MyClients:

public static async Task StartAccepting(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var wrapper = await AcceptWrapperAsync(token);

        HandleWrapperAsync(wrapper);
    }
}

Therefore AcceptWrapperAsync awaits a valid wrapper, and HandleWrapperAsync handles the wrapper asynchronously without blocking the thread, so AcceptWrapperAsync can get back to work as fast as it can.

How that method works internally is something like this:

public static async Task<MyWrapper> AcceptWrapperAsync(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        var client = await AcceptClientAsync();
        if (IsClientWrappable(client))
            return new MyWrapper(client);
    }
    return null;
}

public static async Task<MyClient> AcceptClientAsync()
{
    await Task.Delay(1000);
    return new MyClient();
}

private static Boolean IsClientWrappable(MyClient client)
{
    Thread.Sleep(500);
    return true;
}

This code simulates that there is a client connection every second, and that it takes half a second to checkout if the connection is suitable for a wrapper. AcceptWrapperAsync loops till a valid wrapper is generated, and then returns.

This approach, that works well, has a flaw. During the time that IsClientWrappable is executing, no further clients can be accepted, creating a bottleneck when lot of clients are trying to connect at the same time. I am afraid that in real life, if the server goes down while having lot of clients connected, the going up is not gonna be nice because all of them will try to connect at the same time. I know that is very difficult to connect all of them at the same time, but I would like to speed up the connection process.

Making IsClientWrappable async, would just ensure that the executing thread is not blocked till the negotiation finishes, but the execution flow is blocked anyway.

How could I improve this approach to continuously accept new clients but still be able of awaiting a wrapper using AcceptWrapperAsync?

Was it helpful?

Solution 2

TPL Dataflow to the rescue. I have created a "producer/consumer" object with two queues that:

  1. accepts inputs from "producer" and stores it in the "in" queue.
  2. a internal asynchronous task read from the "in" queue and process the input in parallel with a given maximum degree of parallelism.
  3. put the processed item in the "out" queue afterwards. Result or Exception.
  4. accepts a consumer to await an item. Then can check if the processing was successful or not.

I have done some testing and it seems to work fine, I want to do more testing though:

public sealed class ProcessingResult<TOut> 
    where TOut : class
{
    public TOut Result { get; internal set; }
    public Exception Error { get; internal set; }
}

public abstract class ProcessingBufferBlock<TIn,TOut> 
    where TIn:class 
    where TOut:class
{
    readonly BufferBlock<TIn> _in;
    readonly BufferBlock<ProcessingResult<TOut>> _out;
    readonly CancellationToken _cancellation;
    readonly SemaphoreSlim _semaphore;

    public ProcessingBufferBlock(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
    {
        _cancellation = cancellation;
        _semaphore = new SemaphoreSlim(degreeOfParalellism);
        var options = new DataflowBlockOptions() { BoundedCapacity = boundedCapacity, CancellationToken = cancellation };
        _in = new BufferBlock<TIn>(options);
        _out = new BufferBlock<ProcessingResult<TOut>>(options);
        StartReadingAsync();
    }

    private async Task StartReadingAsync()
    {
        await Task.Yield();
        while (!_cancellation.IsCancellationRequested)
        {
            var incoming = await _in.ReceiveAsync(_cancellation);
            ProcessThroughGateAsync(incoming);
        }
    }

    private async Task ProcessThroughGateAsync(TIn input)
    {
        _semaphore.Wait(_cancellation);
        Exception error=null;
        TOut result=null;
        try
        {
            result = await ProcessAsync(input);                   
        }
        catch (Exception ex)
        {
            error = ex;
        }
        finally
        {
            if(result!=null || error!=null)
                _out.Post(new ProcessingResult<TOut>() { Error = error, Result = result });
            _semaphore.Release(1);
        }
    }

    protected abstract Task<TOut> ProcessAsync(TIn input);

    public void Post(TIn item)
    {
        _in.Post(item);
    }

    public Task<ProcessingResult<TOut>> ReceiveAsync()
    {
        return _out.ReceiveAsync();
    }
}

So the example I used on the OP would be something like this:

public class WrapperProcessingQueue : ProcessingBufferBlock<MyClient, MyWrapper>
{
    public WrapperProcessingQueue(Int32 boundedCapacity, Int32 degreeOfParalellism, CancellationToken cancellation)
        : base(boundedCapacity, degreeOfParalellism, cancellation)
    { }

    protected override async Task<MyWrapper> ProcessAsync(MyClient input)
    {
        await Task.Delay(5000);
        if (input.Id % 3 == 0)
            return null;
        return new MyWrapper(input);
    }
}

And then I could add MyClient objects to that queue as fast as I get them, they would be processed in parallel, and the consumer would await for the ones that pass the filter.

As I said, I want to do more testing but any feedback will be very welcomed.

Cheers.

OTHER TIPS

//this loop must never be blocked
while (!token.IsCancellationRequested)
{
    var client = await AcceptClientAsync();
    HandleClientAsync(client); //must not block
}

Task HandleClientAsync(Client client) {
    if (await IsClientWrappableAsync(client)) //make async as well, don't block
        await HandleWrapperAsync(new MyWrapper(client));
}

This way you move the IsClientWrappable logic out of the accept loop and into the background async workflow.

If you do not wish to make IsClientWrappable non-blocking, just wrap it with Task.Run. It is essential that HandleClientAsync does not block so that its caller doesn't either.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top