Question

I am learning LINQ RX and having a hard time understanding how to create handlers.

Let's say I have a IObservable<Entity> source, that provides a stream of entities. Now I want to define different strategies for processing those Entity objects depending on its Entity.Group property. I tried:

source.Where(e=> e.Group = "first").Do(e=> whatever).Subscribe();

And it works. The problem is when I add the second path:

source.Where(e=> e.Group == "first").Do(whateverWithFirst).Subscribe();
source.Where(e=> e.Group == "second").Do(whateverWithSecond).Subscribe();

Then things happen twice and I some exception related on how source is implemented.

What is the right way of doing this?

Was it helpful?

Solution

You could use Observable.RefCount() in order to share the subscription between both queries:

var publishedSource = source.Publish().RefCount();
publishedSource.Where(e=> e.Group == "first").Do(whateverWithFirst).Subscribe();
publishedSource.Where(e=> e.Group == "second").Do(whateverWithSecond).Subscribe();
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top