Question

I have a source IEnumerable<T> which I would like to process in a parallel way having a fixed number of tasks/threads (close to the number of processors) each of them grabbing the next item from the source and process it until all the elements have been iterated.

  • Parallel.For is not a candidate as the number of elements is unknown.
  • Parallel.ForEach is not a candidate because is creates many Tasks even when specifying MaxDegreeOfParallelism as this parameter only ensures the maximum number of tasks running concurrently but not the number of tasks created.
  • Each Task must be notified that the source is traversed until its end so it can run some wrapping up logic.
  • The elements of the source list cannot be held in memory but must be processed and discarded continuously.

Sounds like a producer/consumer problem with the simplification that the producer can be single-threaded and once the IEnumerable is finished, no more element will be added.

How would a solution for this problem look like using the TPL? Do I have to implement my own shareable thread-safe IEnumerable or does the framework provide something?

EDIT: this is my try with Parallel.ForEach and specifying MaxDegreeOfParallelism which does not prevent the TPL to create many tasks.

int nbTasks = 0;
Parallel.ForEach(positions, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    () => { return new List<IPositionData>(); },
    (position, loop, list) =>
    {
        Thread.Sleep(1);
        list.Add(position);
        return list;
     },
     list => Interlocked.Add(ref nbTasks, 1));
Trace.WriteLine(string.Format("Tasks: {0}", nbTasks));

Comment: positions is my source IEnumerable<IPositionData>. I've just ran this and for example, nbTasks is 64 (and not the expected 4 on my 4 cores).

Was it helpful?

Solution

You can limit the number of tasks in Parallel.ForEach by using an overload that expects a ParallelOptions object and setting the MaxDegreeOfParallelism property.

OTHER TIPS

You can limit number of tasks in Parallel.ForEach:

   in maxNumberOfTasks = 4;
   Parallel.ForEach(collection, new ParallelOptions { MaxDegreeOfParallelism = maxNumberOfTasks}, 
                 i => {
                        //Your action here
                  });
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top