Question

I'm trying to create an async unit test for the project, but cannot understand how to wait for the async subject to complete:

    [Test]
    public async void MicroTest()
    {
        var value = 2;

        var first = new AsyncSubject<int>();
        var second = new AsyncSubject<int>();

        first.Subscribe(_ =>
        {
            value = _;
            second.OnCompleted();
        });

        first.OnNext(1);

        // how to wait for the second subject to complete?

        Assert.AreEqual(value, 1);
    }

Sync version of this test is works well:

    [Test]
    public void MicroTest()
    {
        var value = 2;

        var first = new Subject<int>();
        var second = new Subject<int>();

        first.Subscribe(_ =>
        {
            value = _;
            second.OnCompleted();
        });

        first.OnNext(1);

        Assert.AreEqual(value, 1);
    }
Was it helpful?

Solution

AsyncSubject versus Subject

First off, it's worth pointing out that AsyncSubject<T> is not an asynchronous version of Subject<T>. Both are in fact free-threaded* (see footnote).

AsyncSubject is a specialization of Subject intended to be used to model an operation that completes asynchronously and returns a single result. It has two noteworthy features:

  • Only the last result is published
  • The result is cached and is available to observers subscribing after it has completed.

It is used internally in various places, including by the ToObservable() extension method defined on Task and Task<T>.

The issue with the test

Recall AsyncSubject<T> will only return the final result received. It does this by waiting for OnCompleted() so it knows what the final result is. Because you do not call OnCompleted() on first your test is flawed as the OnNext() handler - the lambda function passed in your Subscribe call - will never be invoked.

Additionally, it is invalid not to call OnNext() at least once on an AsyncSubject<T>, so when you call await second; you will get an InvalidOperationException if you haven't done this.

If you write your test as follows, all is well:

[Test]
public async void MicroTest()
{
    var value = 2;

    var first = new AsyncSubject<int>();
    var second = new AsyncSubject<int>();

    first.Subscribe(_ =>
    {
        // won't be called until an OnCompleted() has
        // been invoked on first
        value = _;
        // you must send *some* value to second
        second.OnNext(_);
        second.OnCompleted();
    });

    first.OnNext(1);
    // you must do this for OnNext handler to be called
    first.OnCompleted(); 

    // how to wait for the second subject to complete
    await second;

    Assert.AreEqual(value, 1);
}

About asynchronous tests

As a general rule I would avoid writing asynchronous tests that could wait forever. This gets particularly annoying when it causes resource drains on build servers. Use some kind of timeout e.g:

await second.Timeout(TimeSpan.FromSeconds(1));

No need to handle the exception since that is enough for the test to fail.

**I've borrowed this term from the COM lexicon. In this sense I mean that they, as with most of the Rx framework components, will generally run on whatever thread you happen to invoke their methods on. Being free-threaded doesn't necessarily mean being fully thread safe though. In particular, unlike AsyncSubject<T>, Subject<T> doesn't protect you from the Rx grammar violation of making overlapping calls to OnNext. Use Subject.Synchronize or Observable.Synchronize for this protection.*

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top