Pregunta

Actualmente, estoy usando el Marco de RX para implementar una gestión de mensajes de tuberías de flujo de trabajo similar. Esencialmente Tengo un productor de mensajes (deserializa mensajes de red y pide OnNext () en un sujeto) y tengo varios consumidores.

NOTA:. Si son y transformar los métodos de extensión que he codificado que simplemente devuelve un IObservable

Un consumidor hace algo como lo siguiente:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest entonces es consumido por otro ducto similar y esto continúa hasta la parte superior donde termina con alguien llamando Subscribe() en la tubería final. El problema que estoy teniendo es que los mensajes de la base no se propagan a menos que se llama suscribirse a los mensajes directamente en alguna parte.

¿Cómo puedo empujar los mensajes hasta la parte superior de la pila? Sé que este es un enfoque poco ortodoxo pero siento que hace que el código muy sencillo de entender lo que está ocurriendo a un mensaje. ¿Puede alguien sugerir otra forma de hacer lo mismo si cree que esto es totalmente una idea terrible?

¿Fue útil?

Solución

¿Por qué deberían ir a través de la tubería si no hay suscriptores? Si uno de sus pasos intermedios es útil para sus efectos secundarios (Usted desea que se ejecuten incluso si no hay otros abonados), debe volver a escribir la operación de efectos secundarios que ser suscriptor.

También se puede dar el paso con un efecto secundario como una operación de traspaso (o té, si se quiere) si desea continuar o no la cadena.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top