Question

I have an observable of type IObservable<...> and need to return from the function an observable of type IObservable<Unit>. Currently I do it as follows:

IObservable<Unit> DoSomething() {
    var observable = SomeOtherMethodThatReturnsObservable();

    // convert Convert IObservable<...> to IObservable<Unit>
    var ret = new AsyncSubject<Unit>();
    observable.Subscribe(_ => { }, ret.OnCompleted);
    return ret;
}

Is there more nice way of doing so? Maybe extension method or something?

Was it helpful?

Solution

I would not use the example you gave in production, because it does not consider the following:

  • What happens if the Observable errors?
    • The error is eaten, and the consumer never knows the observable has ended.
  • What happens if the consumer disposes of it's subscription?
    • The underlying subscription is not disposed of
  • What happens if the consumer never subscribes?
    • The underlying observable is still subscribed to.

Typically, using the built in operators provide the best implementation.

Here's an "Rx-y" way of doing it.

source.IgnoreElements()
    .Cast<Unit>()
    .Concat(Observable.Return(Unit.Default));

Here's another a way to do the same thing: :)

without built-in-operators. This is arguably more efficient, but theres not real gain here.

// .Cast<Unit>()
Observable.Create<Unit>(o => { // Also make sure you return the subscription!
    return source.Subscribe(
        // .IgnoreElements()
        _ => { },
        // Make sure errors get passed through!!!
        o.OnError,
        // .Concat(Observable.Return(Unit.Default));
        () => {
            o.OnNext(Unit.Default);
            o.OnCompleted();
        });
});

Here's how to write it just once.

public static class ObsEx
{
    public static IObservable<Unit> WhenDone<T>(this IObservable<T> source)
    {
        return Observable.Create<Unit>(observer =>
        {
            return source.Subscribe(
                _ => { },
                observer.OnError,
                () => {
                    observer.OnNext(Unit.Default);
                    observer.OnCompleted();
                });
        });
    }
}

And use it like so: source.WhenDone();

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top