2つの観測可能性をマージして、観測可能性のいずれかが完了したときに結果が完了するようにする方法は?
-
28-10-2019 - |
質問
私はこのコードを持っています:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(
OnError(新しいOperationCanceleDexception())を使用して問題を解決しましたが、より良い解決策が必要です(コンビネーターが必要ですか?)。
解決
またはこれも非常にきれいです:
public static class Ext
{
public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
{
return Observable.CreateWithDisposable<T>(obs =>
{
var compositeDisposable = new CompositeDisposable();
var subject = new Subject<T>();
compositeDisposable.Add(subject.Subscribe(obs));
compositeDisposable.Add(source.Subscribe(subject));
compositeDisposable.Add(right.Subscribe(subject));
return compositeDisposable;
});
}
}
これは、createwithdisposable()で1つのoncompletedがオブザーバーにプッシュされることを確認する主題を使用します。
他のヒント
いずれかのストリームが完了したときにマージを書き直す代わりに、oncompletedイベントをオンネキストイベントに変換して使用することをお勧めします var ss = s1.Merge(s2).TakeUntil(s1ors2complete)
ここで、S1ORS2Completeは、S1またはS2のいずれかが終了すると値を生成します。また、チェーンすることもできます .TakeUntil(s1completes).TakeUntil(s2completes)
S1ORS2Completeを作成する代わりに。このアプローチは、MergeWithCompleteOoneの拡張機能よりも優れた構成を提供します。これは、両方の完全なオペレーターを「完了」オペレーターに変更するために使用できるため、拡張機能を使用します。
OnnextイベントをOncompletedイベントに変換する方法については、それを行う方法がいくつかあります。 Compositedisposableの方法は良いアプローチのように聞こえます。少し検索すると、この興味深いスレッドが見つかります OnNext、Onerror、およびOncompleted通知の間の変換. 。おそらく、returnTrueOncompletedを使用して拡張メソッドを作成するでしょう xs.SkipWhile(_ => true).concat(Observable.Return(True))
そして、あなたのマージは次のようになります:
var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));
Zipのようなオペレーターの使用を見ることもできます。 自動的に完了します 入力ストリームの1つが完了すると。
どちらのストリームの出力も必要ないと仮定すると、使用できます Amb
からの魔法と組み合わされています Materialize
:
var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = Observable.Amb(
s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted),
s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
)
.Finally(() => Console.WriteLine("Finished!"));
ss.Subscribe(_ => Console.WriteLine("Next"));
s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from
値が必要な場合は、使用できます Do
2つの科目について。
これを試して:
public static class Ext
{
public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
{
var completed = Observable.Throw<T>(new StreamCompletedException());
return
source.Concat(completed)
.Merge(right.Concat(completed))
.Catch((StreamCompletedException ex) => Observable.Empty<T>());
}
private sealed class StreamCompletedException : Exception
{
}
}
これが行うことは、ソースまたは適切なソースが完了すると例外をスローするiobservableを連結することです。次に、CACTE拡張法を使用して、空の観測可能なものを返して、いずれかの完了時にストリームを自動的に完了できます。