Question

Suppose I have a BlockingCollection OutputQueue, which has many items. Current my code is:

    public void Consumer()
    {
        foreach (var workItem in OutputQueue.GetConsumingEnumerable())
        {
            PlayMessage(workItem);
            Console.WriteLine("Works on {0}", workItem.TaskID);
            OutLog.Write("Works on {0}", workItem.TaskID);
            Thread.Sleep(500);
        }
    }

Now I want PlayMessage(workItem) running in the multiple tasks way because some workItem need more time, the others need less time. There are huge difference.

As for the method PlayMessage(workItem), it has a few service calls, play text to speech and some logging.

 bool successRouting = serviceCollection.SvcCall_GetRoutingData(string[] params, out ex);
 bool successDialingService = serviceCollection.SvcCall_GetDialingServiceData(string[] params, out excep);
 PlayTTS(workItem.TaskType); //  playing text to speech

So how to change my code?

What I thought was:

public async Task Consumer()
{
    foreach (var workItem in OutputQueue.GetConsumingEnumerable())
    {
        await PlayMessage(workItem);
        Console.WriteLine("Works on {0}", workItem.TaskID);
        OutLog.Write("Works on {0}", workItem.TaskID);
        Thread.Sleep(500);
    }
}
Was it helpful?

Solution

Since you want parallelism with your PlayMessage, i would suggest looking into TPL Dataflow, as it combines both parallel work with async, so you could await your work properly.

TPL Dataflow is constructed of Blocks, and each block has its own characteristics. Some popular ones are:

ActionBlock<TInput>
TransformBlock<T, TResult>

I would construct something like the following:

var workItemBlock = new ActionBlock<WorkItem>(
    workItem =>
    {
        PlayMessage(workItem);
        Console.WriteLine("Works on {0}", workItem.TaskID);
        OutLog.Write("Works on {0}", workItem.TaskID);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = // Set max parallelism as you wish..
    });

foreach (var workItem in OutputQueue.GetConsumingEnumerable())
{
    workItemBlock.Post(workItem);
}

workItemBlock.Complete();

OTHER TIPS

Here's another solution, not based on TPL Dataflow. It uses uses SemaphoreSlim to throttle the number of parallel playbacks (warning, untested):

public async Task Consumer()
{
    var semaphore = new SemaphoreSlim(NUMBER_OF_PORTS);
    var pendingTasks = new HashSet<Task>();
    var syncLock = new Object();

    Action<Task> queueTaskAsync = async(task) =>
    {
        // be careful with exceptions inside "async void" methods

        // keep failed/cancelled tasks in the list
        // they will be observed outside
        lock (syncLock)
            pendingTasks.Add(task);

        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCancelled && !task.IsFaulted)
                throw;
            // the error will be observed later, 
            // keep the task in the list
            return;
        }
        finally
        {
            semaphore.Release();
        }

        // remove successfully completed task from the list
        lock (syncLock)
            pendingTasks.Remove(task);
    };

    foreach (var workItem in OutputQueue.GetConsumingEnumerable())
    {
        var item = workItem;
        Func<Task> workAsync = async () =>
        {
            await PlayMessage(item);
            Console.WriteLine("Works on {0}", item.TaskID);
            OutLog.Write("Works on {0}", item.TaskID);
            Thread.Sleep(500);
        });

        var task = workAsync();
        queueTaskAsync(task);
    }

    await Task.WhenAll(pendingTasks.ToArray());
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top