Observable に変換すると ParallelQuery<T>.Where が機能しないのはなぜですか?

StackOverflow https://stackoverflow.com/questions/2287963

質問

並列処理したい監視可能なコレクションがあり、フィルタリング中に処理された値を監視し、最後にフィルタリングされた値を受け取るハンドラーをサブスクライブします。

私のサンプルは構文的に正しく、問題なくコンパイルされます。コードを実行すると、 Where フィルタリングを実行するステートメントが評価されます。しかし、サブスクリプションにはデータが送信されません。取り除いたら AsParallel 処理が定期的に行われるようにする IEnumerable, 、データが送信され、すべてが期待どおりに機能します。

これは文字列に対していくつかの処理を実行する私のサンプルです。

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

次に奇妙なのは、 TakeWhile 私の考えでは、この演算子は概念的には Where に似ており、ParallelQuery が期待どおりに動作することを確認できます。

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

サブスクリプションにログ コードを追加すると、データが受信されるまでにデータが受信されることがわかります。 ToObservable 変換後ではありません:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

ラムダの 4 行目のブレークポイントはヒットしますが、ラムダの 6 行目のブレークポイントはヒットしません。

なぜ TakeWhile データが加入者に届くようにしながら、 Where ではない?

重要な場合は、.Net 4.0 Framework Client Profile を対象としたプロジェクトを使用して、Visual Studio 2010 RC でコードを開発します。

アップデート:に基づく @Sergeyの答え の配置をやり直しました Where フィルター。次のコードは期待どおりに機能します。

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

最初に最初のオブザーバブルを変換する必要があるのは、まだ少し厄介に感じます processedStrings 並列化するために列挙可能に変換し、最終結果をサブスクライブするためにオブザーバブルに変換し直します。

役に立ちましたか?

解決

から C# 4.0 の概要:


現在、PLINQ が並列化できるものには実際的な制限がいくつかあります。これら 制限は、その後の Service Pack と Framework のバージョンで緩和される可能性があります。次のクエリ演算子は、 ソース要素は、元のインデックス位置にあります。

  • Take、Takewhile、Skip、Skipwhile
  • Select、SelectMany、および ElementAt のインデックス付きバージョン

ほとんどのクエリ演算子は、要素のインデックス位置を変更します ( 要素を削除します (Where など)。つまり、前の 演算子は、通常、クエリの先頭にある必要があります


したがって、実際には、Takewhile を使用すると、.AsParallel() が並列化されなくなります。言うのは難しいです なぜ Where はサブスクリプションを強制終了しますが、AsParallel の前に置きます。 かもしれない 問題を解決します。

他のヒント

TakeWhile 概念的には同等ではありません Where, それは注文次第だからです。クエリは次のとおりだと思います 実は 順次実行 (参照) このブログ投稿)。電話してみてください .WithExecutionMode(ParallelExecutionMode.ForceParallelism) あなたの中で TakeWhile たとえば、同じ結果が表示されると思います。

並列ケースでなぜ機能しないのかはわかりません...データがどこまで到達するかを確認するためにログを記録することを提案できますか?たとえば、ログを記録した後に元の項目を返す Select を使用すると、便利なログを実行できます。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top