Question

The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable<T> from scratch?

IEnumerable has the lovely yield keyword to make it very simple to implement.

What is the proper way of implementing IObservable<T>?

Do I need to worry about thread safety?

I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable<T> author need to worry about or this somehow built-in?

update:

Here's my C# version of Brian's F# solution

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

edit: Don't throw ObjectDisposedException if Dispose is called twice

Was it helpful?

Solution

Honestly, I am not sure how 'right' all this is, but if feels pretty good based on my experience so far. It's F# code, but hopefully you get a sense of the flavor. It lets you 'new up' a source object, which you can then call Next/Completed/Error on, and it manages subscriptions and tries to Assert when the source or clients do bad things.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

I'll be interested in any thoughts about what's good or bad here; I haven't had a chance to look at all the new Rx stuff from devlabs yet...

My own experiences suggest that:

  • Those who subscribe to observables should never throw from the subscriptions. There is nothing reasonable an observable can do when a subscriber throws. (This is similar to events.) Most likely the exception will just bubble up to a top-level catch-all handler or crash the app.
  • Sources probably should be "logically single threaded". I think it may be harder to write clients that can react to concurrent OnNext calls; even if each individual call comes from a different thread, it is helpful to avoid concurrent calls.
  • It's definitely useful to have a base/helper class that enforces some 'contracts'.

I'm very curious if people can show more concrete advice along these lines.

OTHER TIPS

The official documentation deprecates users implementing IObservable themselves. Instead, users are expected to use the factory method Observable.Create

When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create

It happens that Observable.Create is a trivial wrapper around Reactive's internal class AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

I don't know why they didn't make their implementation public, but hey, whatever.

Yes, the yield keyword is lovely; maybe there will be something similar for IObservable(OfT)? [edit: In Eric Meijer's PDC '09 talk he says "yes, watch this space" to a declarative yield for generating observables.]

For something close (instead of rolling your own), check out the bottom of the "(not yet) 101 Rx Samples" wiki, where the team suggests use of the Subject(T) class as a "backend" to implement an IObservable(OfT). Here's their example:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Crack open Reflector and have a look.

  2. Watch some C9 videos - this one shows how you can 'derive' the Select 'combinator'

  3. The secret is to create AnonymousObservable, AnonymousObserver and AnonymousDisposable classes, (which are just work arounds for the fact that you can't instantiate interfaces). They contain zero implementation, as you pass that in with Actions and Funcs.

For example:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

I'll let you work out the rest... it's a very good exercise in understanding.

There's a nice little thread growing here with related questions.

just one remark regarding this implementation :

after concurrent collections being introduced in .net fw 4 it is probably better to use ConcurrentDictioary instead of a simple dictionary.

it saves handling locks on the collection.

adi.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top