RX IObservable como Pipeline
-
19-09-2019 - |
Pergunta
Atualmente, estou usando a estrutura RX para implementar um fluxo de trabalho como pipeline de processamento de mensagens. Essencialmente eu tenho um produtor de mensagens (desserializa mensagens de rede e chama OnNext () sobre um assunto) e eu tenho vários consumidores.
NOTA: Se e transformar são métodos de extensão tenho codificados que simplesmente retornar um IObservable
.Um consumidor faz algo como o seguinte:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
é então consumida por outro gasoduto similar e isso continua até o topo onde termina com alguém chamando Subscribe()
no gasoduto final. O problema que estou tendo é que as mensagens a partir da base não se propagam-se a menos que se inscrever é chamado em mensagens diretamente em algum lugar.
Como posso empurrar as mensagens até o topo da pilha? Eu sei que esta é uma abordagem pouco ortodoxa mas eu sinto que torna o código muito simples de entender o que está ocorrendo a uma mensagem. Alguém pode sugerir uma outra maneira de fazer o mesmo se você sentir esta é uma idéia totalmente terrível?
Solução
Por que eles devem ir através do gasoduto se não houver assinantes? Se um de seus passos intermediários é útil para seus efeitos colaterais (Você quer que eles para executar mesmo se não há outros assinantes), você deve reescrever a operação efeito colateral de ser um assinante.
Você também pode fazer o passo com um efeito colateral como um pass-through operação (ou T, se você) se você quiser continuar a cadeia.