パイプラインとして RX IObservable
-
19-09-2019 - |
質問
現在、RX フレームワークを使用して、ワークフローのようなメッセージ処理パイプラインを実装しています。基本的に、メッセージ プロデューサー (ネットワーク メッセージを逆シリアル化し、Subject で OnNext() を呼び出します) と、いくつかのコンシューマーがあります。
注記:If と transform は、単純に IObservable を返す、私がコード化した拡張メソッドです。
消費者は次のようなことを行います。
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
次に、別の同様のパイプラインによって消費され、これが先頭まで続き、誰かが呼び出して終了します。 Subscribe()
最終パイプラインで。私が抱えている問題は、メッセージに対して購読がどこかで直接呼び出されない限り、ベースからのメッセージが上に伝播しないことです。
メッセージをスタックの一番上にプッシュするにはどうすればよいですか?これが型破りなアプローチであることは承知していますが、メッセージに何が起こっているかをコードが非常に簡単に理解できるようになると思います。これはまったくひどいアイデアだと思われる場合、同じことを行う別の方法を誰かが提案できますか?
解決
購読者がいないのに、なぜパイプラインを経由する必要があるのでしょうか?中間ステップの 1 つが副作用に対して役立つ場合 (他にサブスクライバが存在しない場合でも実行させたい場合)、副作用操作をサブスクライバになるように書き直す必要があります。
チェーンを継続したい場合は、パススルー操作 (または T 字) として副作用を伴うステップを作成することもできます。
所属していません StackOverflow