Question

I have the below repro code which demonstrate a problem in a more complex flow:

static void Main(string[] args)
    {
        var r = Observable.Range(1, 10).Finally(() => Console.WriteLine("Disposed"));
        var x = Observable.Create<int>(o =>
            {
                for (int i = 1; i < 11; i++)
                {
                    o.OnNext(i);
                }

                o.OnCompleted();

                return Disposable.Create(() => Console.WriteLine("Disposed"));
            });

        var src = x.Publish().RefCount();

        var a = src.Where(i => i % 2 == 0).Do(i => Console.WriteLine("Pair:" + i));
        var b = src.Where(i => i % 2 != 0).Do(i => Console.WriteLine("Even:" + i));

        var c = Observable.Merge(a, b);

        using (c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete")))
        {
            Console.ReadKey();
        }
    }

running this snippet with r as src (var src = r.Publish().RefCount()) will produce all the numbers from 1 till 10, switching the src to x(like in example) will produce only the pairs, actually the first observable to subscribe unless i change Publish() to Replay().

Why? What is the difference between r and x?

Thanks.

Was it helpful?

Solution

Although I do not have the patience to sort through the Rx.NET source code to find exactly what implementation detail causes this exact behavior, I can provide the following insight:

The difference in behavior your are seeing is caused by a race condition. The racers in this case are the subscriptions of a and b which happen as a result of your subscription to the observable returned by Observable.Merge. You subscribe to c, which in turn subscribes to a and b. a and b are defined in terms of a Publish and RefCount of either x or r, depending on which case you choose.

Here's what's happening.

src = r

In this case, you are using a custom Observable. When subscribed to, your custom observible immediately and synchronously begins to onNext the numbers 1 though 10, and then calls onCompleted. Interestingly enough, this subscription is caused by your Publish().RefCount() Observable when it is subscribe to the first time. It is subscribed to the first time by a, because a is the first parameter to Merge. So, before Merge has even subscribed to b, your subscription has already completed. Merge subscribes to b, which is the RefCount observable. That observable is already completed, so Merge looks for the next Observable to merge. Since there are no more Observables to merge, and because all of the existing Observables have completed, the merged observable completes.

The values onNext'd through your custom observable have traveled through the "pairs" observable, but not the "evens" observable. Therefore, you end up with the following:

// "pairs" (has this been named incorrectly?)
[2, 4, 6, 8, 10]

src = x

In this case, you are using the built-in Range method to create an Observable. When subscribed to, this Range Observable does something that eventually ends up yielding the numbers 1 though 10. Interesting. We haven't a clue what's happening in that method, or when it's happening. We can, however, make some observations about it. If we look at what happens when src = r (above), we can see that only the first subscription takes effect because the observable is yielding immediately and synchronously. Therefore, we can determine that the Range Observable must not be yielding in the same manner, but instead allows the application's control flow to execute the subscription to b before any values are yielded. The difference between your custom Observable and this Range Observable, is probably that the Range Observable is scheduling the yields to happen on the CurrentThread Scheduler.

How to avoid this kind of race condition:

var src = a.Publish(); // not ref count

var a = src.where(...);
var b = src.where(...);

var c = Observable.Merge(a, b);

var subscription = c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete"))

// don't dispose of the subscription. The observable creates an auto-disposing subscription which will call dispose once `OnCompleted` or `OnError` is called.

src.Connect(); // connect to the underlying observable, *after* merge has subscribed to both a and b.

Notice that the solution to fixing the subscription to this composition of Observables was not to change how the source observable works, but instead to make sure your subscription logic isn't allowing any race conditions to exist. This is important, because trying to fix this problem in the Observable is simply changing behavior, not fixing the race. Had we changed the source and switched it out later, the subscription logic would still be buggy.

OTHER TIPS

I suspect it's the schedulers. This change causes the two to behave identically:

var x = Observable.Create<int>(o =>
    {
        NewThreadScheduler.Default.Schedule(() =>
        {
            for (int i = 1; i < 11; i++)
            {
                o.OnNext(i);
            }

            o.OnCompleted();
        });

        return Disposable.Create(() => Console.WriteLine("Disposed"));
    });

Whereas using Scheduler.Immediate gives the same behavior as yours.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top