Question

I have a Parallel.ForEach() statement used in my program. It uses a list of some objects as input. I don't care about output order, but I need this loop to take input elements in the same order as in the input list. Is it possible to achieve this with Parallel.ForEach()?

Était-ce utile?

La solution

In the case where you need to preserve order from your IEnumerable<T>, you'll likely want to implement a custom partitioner of the OrderablePartitioner<T> variety. The example code for this class includes a simple example which retrieves them one at a time from the enumeration in increasing order.

However, this is a lot of work for something which could be a simple producer-consumer model with something like ConcurrentQueue<T>:

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

Using this code you'll be guaranteed First-In-First-Out behavior, and that your requests end up processed "in flight" nearly in the order they are received. Once placed in parallel, you'll have no guarantee they stay in sequential order.

Alternatively, you can use the following (with the restriction that the number of queue items stays fixed):

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

If you would like to keep on producing in one thread, and consuming in others use a BlockingCollection<T> with a queue as its backing collection:

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

Now we need 'unbounded' consumers since we are unsure exactly how many queue items may be present:

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

Autres conseils

This feature cannot exist because the OS scheduler can suspend the execution of item 10 and put item 11 on the CPU core so that 11 executes before 10. No library can counteract that. Any work item can be suspended at any time indefinitely.

You can get approximate ordering though. See the other answers for that.

No, according with documentation all parallel iterator (for and foreach basically) doesn't guarantee any order processing items, it's always unpredictable

Second paraghraph msdn documentation of parallel loops

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top