Question

Using TPL, how do I collect results from multiple IO sources ("thread-less" tasks) and merge them into a sequence as they come in from their respective sources without spawning a thread based task per source to monitor them? Would it be safe to poll the sources from one thread?

while (true)
{
    try
    {
        IEnumerable<UdpClient> readyChannels = 
            from channel in channels
            where channel.Available > 0
            select channel;

        foreach( UdpClient channel in readyChannels)
        {
           var result = await channel.ReceiveAsync();
           //do something with result like post to dataflow block.
        }
    }
    catch (Exception e)
    {
        throw (e);
    }
    ...

How about something like that?

Was it helpful?

Solution

I see several options here:

If you want fire up the calls to ReceiveAsync(), set them up to do something with the result (e.g. send to a dataflow block, like you said) and then forget about them, you could use ContinueWith():

foreach (var channel in readyChannels)
{
   channel.ReceiveAsync().ContinueWith(task => 
   {
       var result = task.Result;
       //do something with result like post to dataflow block.
   }
}

One disadvantage of this is that you would need to handle exceptions in each continuation.

Probably a better approach is to use OrderByCompletion() from Stephen Cleary's AsyncEx. This way, you can start all reads at once and process them as they complete:

var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion();

foreach (var task in tasks)
{
   var result = await task;
   //do something with result like post to dataflow block.
}

Yet another option, useful for example if you want to limit parallelism, is to use TransformBlock:

var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
    c => c.ReceiveAsync(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
foreach (var channel in readyChannels)
    receiveBlock.Post(channel);
receiveBlock.Complete();

// set up processing here

await receiveBlock.Completion;

If you want to send the results to another block, then the processing mentioned in a comment above consists of simply linking them together:

receiveBlock.LinkTo(anotherBlock);

In all of the cases above, there is never a thread blocking to monitor anything. But the code to call ReceiveAsync() and then to process the result has to execute somewhere.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top