Question

I have an observable collection that I want to process in parallel, then observe the processed values while filtering and finally subscribe a handler that receives the filtered values.

My sample is syntactically correct and compiles just fine, and when I run the code, the Where statement doing the filtering is evaluated. But no data comes through to the subscription. If I remove AsParallel so that the processing is done over a regular IEnumerable, data comes through and everything works as expected.

Here is my sample, doing some processing on strings:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

The next weird thing is that if I use the TakeWhile operator, which in my mind is conceptually similar to Where, observing the ParallelQuery works as expected:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Adding some logging code to the subscription shows that data is received up til the ToObservable conversion, but not after:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

A breakpoint in the lambda at line 4 is hit while a breakpoint in the lambda at line 6 is never hit.

Why will TakeWhile make data come through to the subscriber while Where does not?

If it is of importance, I develop my code in Visual Studio 2010 RC with a project targeting .Net 4.0 Framework Client Profile.

Update: based on @Sergeys answer I reworked the placement of the Where filter. The following code works as expected:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

It still feels a bit awkward to have to first convert the initial observable processedStrings into an enumerable in order to parallelize it, and then convert it back to an observable in order to subscribe to the final result.

Was it helpful?

Solution

From the C# 4.0 in a Nutshell:


There are currently some practical limitations on what PLINQ can parallelize. These limitations may loosen with subsequent service packs and Framework versions. The following query operators prevent a query from being parallelized, unless the source elements are in their original indexing position:

  • Take, TakeWhile, Skip, and SkipWhile
  • The indexed versions of Select, SelectMany, and ElementAt

Most query operators change the indexing position of elements (including those that remove elements, such as Where). This means that if you want to use the preceding operators, they’ll usually need to be at the start of the query


So, in fact, using TakeWhile prevents the .AsParallel() from parallelizing. It is hard to say why Where kills the subscriptiion, but putting it before AsParallel might fix the problem.

OTHER TIPS

TakeWhile isn't conceptually equivalent to Where, because it depends on ordering. I suspect that the query is actually executing sequentially (see this blog post). Try calling .WithExecutionMode(ParallelExecutionMode.ForceParallelism) in your TakeWhile example, and I suspect you'll see the same result.

I don't know why it's not working in the parallel case though... can I suggest that you put in some logging to see how far the data reaches? You can perform useful logging with a Select which returns the original item after logging it, for example.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top