Question

So topic is the questions.

I get that method AsParallel returns wrapper ParallelQuery<TSource> that uses the same LINQ keywords, but from System.Linq.ParallelEnumerable instead of System.Linq.Enumerable

It's clear enough, but when i'm looking into decompiled sources, i don't understand how does it works.

Let's begin from an easiest extension : Sum() method. Code:

[__DynamicallyInvokable]
public static int Sum(this ParallelQuery<int> source)
{
  if (source == null)
    throw new ArgumentNullException("source");
  else
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate();
}

it's clear, let's go to Aggregate() method. It's a wrapper on InternalAggregate method that traps some exceptions. Now let's take a look on it.

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

and here is the question: how it works? I see no concurrence safety for a variable, modified by many threads, we see only iterator and summing. Is it magic enumerator? Or how does it works? GetEnumerator() returns QueryOpeningEnumerator<TOutput>, but it's code is too complicated.

Was it helpful?

Solution

Finally in my second PLINQ assault I found an answer. And it's pretty clear. Problem is that enumerator is not simple. It's a special multithreading one. So how it works? Answer is that enumerator doesn't return a next value of source, it returns a whole sum of next partition. So this code is only executed 2,4,6,8... times (based on Environment.ProcessorCount), when actual summation work is performed inside enumerator.MoveNext in enumerator.OpenQuery method.

So TPL obviosly partition the source enumerable, then sum independently each partition and then pefrorm this summation, see IntSumAggregationOperatorEnumerator<TKey>. No magic here, just could plunge deeper.

OTHER TIPS

The Sum operator aggregates all values in a single thread. There is no multi-threading here. The trick is that multi-threading is happening somewhere else.

The PLINQ Sum method can handle PLINQ enumerables. Those enumerables could be built up using other constructs (such as where) that allows a collection to be processed over multiple threads.

The Sum operator is always the last operator in a chain. Although it is possible to process this sum over multiple threads, the TPL team probably found out that this had a negative impact on performance, which is reasonable, since the only thing this method has to do is a simple integer addition.

So this method processes all results that come available from other threads and processes them on a single thread and returns that value. The real trick is in other PLINQ extension methods.

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?    (ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

This code won't be executed parallel, the while will be sequentially execute it's innerscope.

Try this instead

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                checked { num += item; }
            });

The inner action will be spread on the ThreadPool and the ForEach statement will be complete when all items are handled.

Here you need threadsafety:

        List<int> list = new List<int>();

        int num = 0;

        Parallel.ForEach(list, (item) =>
            {
                Interlocked.Add(ref num, item);
            });
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top