Question

I am trying to unit test a method that uses the Wait() method on an IObservable however my test never completes - the Wait never finishes. My test contains the following:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<List<string>>(
   new Recorded<Notification<List<string>>>(100, Notification.CreateOnNext(new List<string> { "John", "Harry" })),
   new Recorded<Notification<List<string>>>(200, Notification.CreateOnCompleted<List<string>>())
          );

I am using Moq to setup the response on my method by returning input1. For example

myObj.Setup(f => f.GetStrings()).Returns(input1);

It doesn't actually matter about the details of myObj. I start the scheduler and call my method which contains a Wait(e.g somewhere in my method I call

var results = myObj.GetStrings().Wait();

But this never returns. I suspect I am using the scheduler wrong but I am not sure.

Regards Alan

No correct solution

OTHER TIPS

Summary

The problem is that you are creating a cold observable and advancing the scheduler before you have subscribed to it.

Detail

If you call the blocking Wait() operation on a single threaded test, you are dead in the water at that point. This is because the TestScheduler's internal clock only advances when you call Start() or one of the AdvanceXXX() methods and, since you have a cold observable, the event times you specify are relative the point of subscription. There are also some nuances to calling Start() which I will explain below.

So, as Wait will block, you might try to call it on another thread, but it's still tricky. Consider the following code, which is similar to yours:

void Main()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnCompleted<int>()));

    // (A)

    int result = 0;

    var resultTask = Task.Run(() => { result = source.Wait(); });

    // (B)

    resultTask.Wait();

    Console.WriteLine(result);
}

This code tries to wait on a background thread. If we insert a call to scheduler.Start() at point (A), then source.Wait() will block forever.

This is because Start() will ONLY advance the internal clock of the TestScheduler until all currently scheduled events are executed. With a cold observable, events are scheduled relative to the virtual time of subscription. Since there are no subscribers at point (A), you will find that TestScheduler.Now.Ticks will report 0 even after the call to Start().

Hmmm. Things get even worse if we move the call to scheduler.Start() to point B. Now we have a race condition! It's a race condition that will almost always result in the test hanging at the call to resultTask.Wait(). This is because the chances are that the resultTask will not have had time to execute it's action and subscribe to source before the scheduler.Start() call executes - and so time once again will not advance.

A deterministic execution is therefore very hard to achieve - there is no nice way to announce that the Wait() call has been issued before advancing time, since the Wait() call itself will block. Inserting a long enough delay before calling Start() will work, but kind of defeats the object of using the TestScheduler:

// (B)
Task.Delay(2000).Wait();        
scheduler.AdvanceBy(200);

What this question really demonstrates to me (IMHO) is that calling Wait() and blocking a thread is almost always a bad idea. Look for using methods like LastAsync() instead, and/or using continuations to get hold of results to asynchronous methods.

I can't recommend the approach due to the complexity, but here is a deterministic solution that makes use of an extension method to signal when a subscription has been made.

void Main()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnCompleted<int>()));

    // (A)      
    var waitHandle = new AutoResetEvent(false);

    int result = 0;                         
    var resultTask = Task.Run(() =>
    {
        result = source.AnnounceSubscription(waitHandle).Wait();
    });

    // (B)
    waitHandle.WaitOne();   
    scheduler.Start();

    resultTask.Wait();

    Console.WriteLine(result);
}

public static class ObservableExtensions
{
    public static IObservable<T> AnnounceSubscription<T>(
        this IObservable<T> source, AutoResetEvent are)
    {
        return Observable.Create<T>(o =>
        {
            var sub = source.Subscribe(o);
            are.Set();
            return sub;
        });
    }
}

Recommended approach for testing Rx

A more idiomatic use of the TestScheduler is to create an observer to collect results, and then assert they meet expectations. Something like:

void Main()
{
    var scheduler = new TestScheduler();
    var source = scheduler.CreateColdObservable(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnCompleted<int>()));

    var results = scheduler.CreateObserver<int>();

    // here you would append to source the Rx calls that do something interesting
    source.Subscribe(results);

    scheduler.Start();

    results.Messages.AssertEqual(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnCompleted<int>()));   
}

Finally, if you derive a unit test class from ReactiveTest you can take advantage of OnNext, OnCompleted and OnError helper methods to create Recorded<Notification<T>> instances in a more readable fashion.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top