Question

I want to subscribe on an IObservable<T> and unsubscribe (dipose) the subscription right after receiving the first element of type T, i.e. I only want to call the action on the very first element I get after subscribing.

This is the approach I came up with:

public static class Extensions
{
    public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action)
    {
        IDisposable subscription = null;
        subscription = observable.Subscribe(t =>
        {
            action(t);
            subscription.Dispose();
        });
    }
}

Example usage:

public class Program
{
    public static void Main()
    {
        var subject = new Subject<int>();

        subject.OnNext(0);
        subject.OnNext(1);
        subject.SubscribeOnce(i => Console.WriteLine(i));
        subject.OnNext(2);
        subject.OnNext(3);

        Console.ReadLine();
    }
}

It works as expected and only prints 2. Is there anything wrong with this or anything else to consider? Is there a cleaner way using the extension methos that come with RX out of the box maybe?

Was it helpful?

Solution

var source = new Subject();

source
  .Take(1)
  .Subscribe(Console.WriteLine);

source.OnNext(5);
source.OnNext(6);
source.OnError(new Exception("oops!"));
source.OnNext(7);
source.OnNext(8);

// Output is "5". Subscription is auto-disposed. Error is ignored.

Take automatically disposes of the subscription after the nth element yields. :)

As far as other things to consider, for your custom observable, you should note that you may also want to pass OnError and OnCompleted notifications to your observer, which Take also handles for you.

The built-in operators have other benefits as well, such as better Dispose handling.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top