Question

I am new writer to SO, pls bear with me.

I have a WCF service with a duplex service contract. This service contract has an operation contact that suppose to do long data processing. I am constrained to limit the number of concurrent data processing to let's say max 3. My problem is that after the data processing I need to get back to the same service instance context so I call back my initiator endpoint passing the data processing result. I need to mention that due to various reasons I am constrained to TPL dataflows and WCF duplex.

Here is a demo to what I wrote so far

In a console library I simulate WCF calls

class Program
{
    static void Main(string[] args)
    {
        // simulate service calls

        Enumerable.Range(0, 5).ToList().ForEach(x =>
        {
            new System.Threading.Thread(new ThreadStart(async () =>
            {
                var service = new Service();
                await service.Inc(x);
            })).Start();
        });
    }
}

Here is what suppose to be the WCF service

// service contract
public class Service
{
    static TransformBlock<Message<int>, Message<int>> transformBlock;

    static Service()
    {
        transformBlock = new TransformBlock<Message<int>, Message<int>>(x => Inc(x), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });
    }

    static Message<int> Inc(Message<int> input)
    {
        System.Threading.Thread.Sleep(100);

        return new Message<int> { Token = input.Token, Data = input.Data + 1 };
    }

    // operation contract
    public async Task Inc(int id)
    {
        var token = Guid.NewGuid().ToString();

        transformBlock.Post(new Message<int> { Token = token, Data = id });

        while (await transformBlock.OutputAvailableAsync())
        {
            Message<int> message;
            if (transformBlock.TryReceive(m => m.Token == token, out message))
            {
                // do further processing using initiator service instance members
                // something like Callback.IncResult(m.Data);
                break;
            }
        }
    }
}

public class Message<T>
{
    public string Token { get; set; }

    public T Data { get; set; }
}

The operation contract is not really necessary to be async, but I needed the OutputAvailableAsync notification.

Is this a good approach or is there a better solution for my scenario?

Thanks in advance.

Was it helpful?

Solution

First, I think you shouldn't use the token the way you do. Unique identifiers are useful when communicating between processes. But when you're inside a single process, just use reference equality.

To actually answer your question, I think the (kind of) busy loop is not a good idea.

A simpler solution for asynchronous throttling would be to use SemaphoreSlim. Something like:

static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(3);

// operation contract
public async Task Inc(int id)
{
    await Semaphore.WaitAsync();

    try
    {
        Thread.Sleep(100);
        var result = id + 1;
        // do further processing using initiator service instance members
        // something like Callback.IncResult(result);
    }
    finally
    {
        Semaphore.Release();
    }
}

If you really want to (or have to?) use dataflow, you can use TaskCompletionSource for synchronization between the operation and the block. The operation method would wait on the Task of the TaskCompletionSource and the block would set it when it finished computation for that message:

private static readonly ActionBlock<Message<int>> Block =
    new ActionBlock<Message<int>>(
        x => Inc(x),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });

static void Inc(Message<int> input)
{
    Thread.Sleep(100);

    input.TCS.SetResult(input.Data + 1);
}

// operation contract
public async Task Inc(int id)
{
    var tcs = new TaskCompletionSource<int>();

    Block.Post(new Message<int> { TCS = tcs, Data = id });

    int result = await tcs.Task;
    // do further processing using initiator service instance members
    // something like Callback.IncResult(result);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top