RX的IObservable作为管道
-
19-09-2019 - |
题
目前,我使用的RX框架来实现工作流般的消息处理流水线。基本上我有一个消息生产者(反序列化网络消息并且在主题调用OnNext())和我有几个消费者。
注意:如果和变换是扩展方法我已编码的简单地返回的IObservable
一个消费者做类似如下:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
然后通过另一个类似管道消耗并且这种情况持续下去,直到在那里它与在最后管道有人叫Subscribe()
结束顶部。我遇到的问题是,从基本消息不传播了,除非订阅被称为上的消息直接的地方。
如何推动消息到堆栈的顶部?我知道这是一个非正统的做法,但我觉得它使代码非常简单了解正在发生的事情的消息。任何人都可以建议做相同的另一种方式,如果你觉得这是一个完全可怕的想法?
解决方案
为什么他们应该去通过管道,如果没有订阅?如果你的中间步骤之一是他们的副作用(你希望他们即使没有其他用户运行)是有用的,你应该重写副作用操作是用户。
您也可以使用一个副作用作为直通操作的步骤(或三通,如果你愿意),如果你想继续链
不隶属于 StackOverflow