Frage

ich eine beobachtbare Sammlung habe, dass ich parallel zu verarbeiten will, dann die verarbeiteten Werte beobachten, während Filterung und schließlich einen Handler abonnieren, den die gefilterten Werte empfängt.

ist mein Beispiel syntaktisch korrekt und stellt ganz gut, und wenn ich den Code ausführen, die Where Anweisung, um die Filterung zu tun ausgewertet wird. Aber keine Daten kommen durch die Zeichnung. Wenn ich AsParallel entfernen, so dass die Verarbeitung über einen regelmäßigen IEnumerable erfolgt, Daten kommen durch und alles funktioniert wie erwartet.

Hier ist meine Probe, einige Verarbeitung auf Strings zu tun:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

Die nächste seltsame Sache ist, dass, wenn ich den TakeWhile Operator, der in meinem Kopf vom Konzept her ähnlich ist Wo arbeitet die ParallelQuery beobachtet, wie erwartet:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Hinzufügen einiger Logging-Code auf die Abonnement zeigt, dass die Daten bis der ToObservable Umwandlung empfangen, aber nicht nach:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Ein Haltepunkt in dem Lambda in Zeile 4 getroffen wird, während ein Haltepunkt in dem Lambda bei der Linie 6 wird nie getroffen.

Warum machen Daten kommen durch an den Teilnehmer TakeWhile während Where nicht?

Wenn es wichtig ist, entwickle ich meinen Code in Visual Studio 2010 RC mit einem Projekt Targeting .NET 4.0 Framework Client Profile.

Aktualisieren : basierend auf @ Sergeys Antwort ich die Platzierung des Where Filter überarbeitet. Der folgende Code funktioniert wie erwartet:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Es fühlt sich immer noch ein wenig umständlich zu ersten Konvertiten zu haben, um die anfänglichen beobachtbaren processedStrings in eine zählbaren, um es zu parallelisieren, und es dann zu einer beobachtbaren, um das Endergebnis zu abonnieren konvertieren zurück.

War es hilfreich?

Lösung

Von der C # 4.0 in a Nutshell :


Es gibt noch einige praktische Einschränkungen auf, was PLINQ parallelisieren können. Diese Beschränkungen können mit nachfolgendem Service Pack und Framework-Versionen lösen. Die folgende Abfrage Betreiber verhindern eine Abfrage von parallelisiert werden, es sei denn, die Quellenelemente sind in ihrer ursprünglichen Teilungsposition:

  • Nehmen, Takewhile, Überspringen, und Skipwhile
  • Die indizierten Versionen von Select, Select und ElementAt

Die meisten Abfrageoperatoren ändern, um die Schaltposition der Elemente (einschließlich derjenigen, die Entfernen Elemente wie Wo). Dies bedeutet, dass, wenn Sie die vorhergehenden verwenden möchten Betreiber, sie werden in der Regel zu Beginn der Abfrage

sein müssen

So in der Tat verhindert mit Takewhile der .AsParallel () von Parallelisierung. Es ist schwer, zu sagen, warum Wo die subscriptiion tötet, aber es vor AsParallel könnte das Problem beheben setzen.

Andere Tipps

ist TakeWhile nicht konzeptionell äquivalent zu Where, weil es bei der Bestellung abhängt. Ich vermute, dass die Abfrage wirklich Ausführen der Reihe nach (siehe dieses Blog-Post ). Versuchen Sie fordern .WithExecutionMode(ParallelExecutionMode.ForceParallelism) in Ihrem TakeWhile Beispiel, und ich vermute, Sie werden das gleiche Ergebnis sehen.

Ich weiß nicht, warum es nicht im parallelen Fall arbeitet aber ... kann ich vorschlagen, dass Sie in einiger Protokollierung setzen, um zu sehen, wie weit die Daten erreicht? Sie können nützliche Protokollierung mit einem Select durchführen, die nach der Anmeldung des ursprüngliche Element zurückgibt, zum Beispiel.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top