Question

I try to optimize code with parallel execution, but sometimes only one thread gets all the heavy load. The following example shows how 40 tasks should be performed in at most 4 threads, and the ten first are more time consuming than the others.

Parallel.ForEach seem to split the array in 4 parts, and lets one thread handle each part. So the entire execution takes about 10 seconds. It should be able to complete within at most 3.3 seconds!

Is there a way to use all threads all the way, since it in my real problem isn't known which tasks that are time consuming?

var array = System.Linq.Enumerable.Range(0, 40).ToArray();

System.Threading.Tasks.Parallel.ForEach(array, new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = 4, },
     i =>
     {
         Console.WriteLine("Running index {0,3} : {1}", i, DateTime.Now.ToString("HH:mm:ss.fff"));
         System.Threading.Thread.Sleep(i < 10 ? 1000 : 10);
     });
Was it helpful?

Solution

It would be possible with Parallel.ForEach, but you'd need to use a custom partitioner (or find a 3rd party partitioner) that would be able to partition the elements more sensibly based on your particular items. (Or just use much smaller batches.)

This is also assuming that you don't strictly know in advance which items are going to be fast and which are slow; if you did, you could re-order the items yourself before calling ForEach so that the expensive items are more spread out. That may or may not be sufficient, depending on the circumstances.

In general I prefer to solve these problems by simply having one producer and multiple consumers, each of which handle one item at a time, rather than batches. The BlockingCollection class makes these situations rather straightforward. Just add all of the items to the collection, create N tasks/threads/etc., each of which grab an item and process it until there are no more items. It doesn't give you the dynamic adding/removing of threads that Parallel.ForEach gives you, but that doesn't seem to be an issue in your case.

OTHER TIPS

Using a custom partitioner is the right solution to modify the behavior of Parallel.ForEach(). If you're on .Net 4.5, there is an overload of Partitioner.Create() that you can use. With it, your code would look like this:

var partitioner = Partitioner.Create(
    array, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(
    partitioner, new ParallelOptions { MaxDegreeOfParallelism = 4, }, i => …);

This is not the default, because turning off buffering increases the overhead of Parallel.ForEach(). But if your iterations are really that long (seconds), that additional overhead shouldn't be noticeable.

This is due to a feature called the partitioner. By default your loop is divided among your available threads equally. It sounds like you want to change this behavior. The reasoning behind the current behavior is that it takes a certain about of overhead time to set up a thread, so you want to do as much work as is reasonable on it. Therefore the collection is partitioned in to blocks and sent to each thread. The system has no way to know that parts of the collection take longer than others (unless you explicitly tell it) and assumes that an equal division leads to a roughly equal complete time. In your case you may want to split out the tasks that take longer and run time in a different way. Or you may wish to provide a custom partitioner which transverses the collection in a non sequential manner.

You might want to use the Microsoft TPL Dataflow library, which helps in designing highlight concurrent systems.

Your code is roughly equivalent to the following one using this library:

var options = new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 4,
    SingleProducerConstrained = true
};

var actionBlock = new ActionBlock<int>(i => {
    Console.WriteLine("Running index {0,3} : {1}", i, DateTime.Now.ToString("HH:mm:ss.fff"));
    System.Threading.Thread.Sleep(i < 10 ? 1000 : 10);
}, options);

Task.WhenAll(Enumerable.Range(0, 40).Select(actionBlock.SendAsync)).Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();

TPL dataflow will use 4 consumers in this scenario, processing a new value as soon as one of the consumer is available, thus maximizing throughput.

Once you're used to the library, you might want to add more asynchrony to your system by using the various blocks provided by the library, and removing all those awful Wait calls.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top