無限数セットで Rx IObservable を圧縮
-
22-09-2019 - |
質問
Reactive Extensions フレームワークの IObservable [以下のサンプルの名前付き行] があり、それが監視する各オブジェクトにインデックス番号を追加したいと考えています。
Zip関数を使用してこれを実装しようとしました:
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
..しかし残念ながら、これはスローです
引数範囲外例外:指定された引数は有効な値の範囲外でした。パラメータ名:使い捨て用品
Zip 関数の理解が間違っているのでしょうか、それともコードに問題があるのでしょうか?
コードの Range 部分には問題がないようで、IObservable はまだイベントを受信していません。
解決 2
どうやら、ジップ拡張メソッドは、それへの匿名観測可能とサブスクライブのオリジナルカスタムIObservableがIDisposableを実装していませんSystem.Collections.Generic.AnonymousObserverを、作成に変換します。 したがって、あなたは
は、(私はそれが使用さ見てきた道少なくとも)メソッドを通常の方法を購読実装することはできませんpublic IDisposable Subscribe(IObserver<T> observer) {
// ..add to observer list..
return observer as IDisposable
}
もっと可能性の高い正解は以下のようになります:
return Disposable.Create(() => Observers.Remove(observer));
あなたはcollctionは、おそらく完了-方法ドゥリン変更されることに注意しても、ので、それらを処理する前に、リストのコピーを作成する必要があります:
public void Completed()
{
foreach (var observer in Observers.ToList())
{
observer.OnCompleted();
}
}
他のヒント
.Selectインデックスを含むように過負荷があります:
rows.Select((row, index) => new { row, index });
I午前ないでください、あなたの問題が何であるか、ないこのあなたのための作品(と何をやっていることをここに欠けている?):
static void Main(string[] args)
{
var rows = new List<int> { 4,5,1,2,5 }.ToObservable();
rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) =>
new { Row = row, Index = index })
.Subscribe(a => ProcessRow(a.Row, a.Index), () => Completed());
Console.ReadLine();
}
static void ProcessRow(int row, int index) {
Console.WriteLine("Row {0}, Index {1}", row, index);
}
static void Completed() {
}
所属していません StackOverflow