目前,我使用的RX框架来实现工作流般的消息处理流水线。基本上我有一个消息生产者(反序列化网络消息并且在主题调用OnNext())和我有几个消费者。

注意:如果和变换是扩展方法我已编码的简单地返回的IObservable

一个消费者做类似如下:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest然后通过另一个类似管道消耗并且这种情况持续下去,直到在那里它与在最后管道有人叫Subscribe()结束顶部。我遇到的问题是,从基本消息不传播了,除非订阅被称为上的消息直接的地方。

如何推动消息到堆栈的顶部?我知道这是一个非正统的做法,但我觉得它使代码非常简单了解正在发生的事情的消息。任何人都可以建议做相同的另一种方式,如果你觉得这是一个完全可怕的想法?

有帮助吗?

解决方案

为什么他们应该去通过管道,如果没有订阅?如果你的中间步骤之一是他们的副作用(你希望他们即使没有其他用户运行)是有用的,你应该重写副作用操作是用户。

您也可以使用一个副作用作为直通操作的步骤(或三通,如果你愿意),如果你想继续链

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top