Pergunta

Atualmente, estou usando a estrutura RX para implementar um fluxo de trabalho como pipeline de processamento de mensagens. Essencialmente eu tenho um produtor de mensagens (desserializa mensagens de rede e chama OnNext () sobre um assunto) e eu tenho vários consumidores.

NOTA: Se e transformar são métodos de extensão tenho codificados que simplesmente retornar um IObservable

.

Um consumidor faz algo como o seguinte:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest é então consumida por outro gasoduto similar e isso continua até o topo onde termina com alguém chamando Subscribe() no gasoduto final. O problema que estou tendo é que as mensagens a partir da base não se propagam-se a menos que se inscrever é chamado em mensagens diretamente em algum lugar.

Como posso empurrar as mensagens até o topo da pilha? Eu sei que esta é uma abordagem pouco ortodoxa mas eu sinto que torna o código muito simples de entender o que está ocorrendo a uma mensagem. Alguém pode sugerir uma outra maneira de fazer o mesmo se você sentir esta é uma idéia totalmente terrível?

Foi útil?

Solução

Por que eles devem ir através do gasoduto se não houver assinantes? Se um de seus passos intermediários é útil para seus efeitos colaterais (Você quer que eles para executar mesmo se não há outros assinantes), você deve reescrever a operação efeito colateral de ser um assinante.

Você também pode fazer o passo com um efeito colateral como um pass-through operação (ou T, se você) se você quiser continuar a cadeia.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top