Вопрос

В настоящее время я использую RX Framework для реализации конвейера обработки сообщений, подобного рабочему процессу.По сути, у меня есть производитель сообщений (десериализует сетевые сообщения и вызывает OnNext() для субъекта), и у меня есть несколько потребителей.

ПРИМЕЧАНИЕ:If и Transform — это методы расширения, которые я написал, которые просто возвращают IObservable.

Потребитель делает что-то вроде следующего:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest затем потребляется другим аналогичным конвейером, и это продолжается до самого верха, где он заканчивается тем, что кто-то звонит Subscribe() на последнем трубопроводе.Проблема, с которой я столкнулся, заключается в том, что сообщения из базы не распространяются, если где-то непосредственно не вызывается подписка на сообщения.

Как я могу поместить сообщения на вершину стека?Я знаю, что это неортодоксальный подход, но мне кажется, что он позволяет коду очень легко понять, что происходит с сообщением.Может ли кто-нибудь предложить другой способ сделать то же самое, если вы считаете, что это совершенно ужасная идея?

Это было полезно?

Решение

Зачем им идти по конвейеру, если нет подписчиков?Если один из ваших промежуточных шагов полезен из-за своих побочных эффектов (вы хотите, чтобы они выполнялись, даже если нет других подписчиков), вам следует переписать операцию побочного эффекта, чтобы стать подписчиком.

Вы также можете сделать шаг с побочным эффектом в качестве сквозной операции (или тройника, если хотите), если хотите продолжить цепочку.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top