Question

I have na event-based API (Geolocator) that I want to convert to Rx.

The problem is that some operations require that all events are unsubscribed and I don't want to pass that burdon to the user of the Rx API.

So, the user will subscribe to a few observables and when the events are subscribed they are published to those observables.

What's the best way to do this?

I thought of creating a subject that the users subscribe to and then have the events published to those through another set of observables.

Is this the best way? If so, how?

Was it helpful?

Solution

The key problem is to find a way to keep an Observer subscribed to a stream whilst tearing down and replacing an underlying source. Let's just focus on a single event source - you should be able to extrapolate from that.

First of all, here is an example class we can use that has a single event SomeEvent that follows the standard .NET pattern using an EventHandler<StringEventArgs> delegate. We will use this to create sources of events.

Note I have intercepted the event add/remove handlers in order to show you when Rx subscribes and unsubscribes from the events, and given the class a name property to let us track different instances:

public class EventSource
{
    private string _sourceName;

    public EventSource(string sourceName)
    {
        _sourceName = sourceName;
    }

    private event EventHandler<MessageEventArgs> _someEvent;

    public event EventHandler<MessageEventArgs> SomeEvent
    {
        add
        {
            _someEvent = (EventHandler<MessageEventArgs>)
                Delegate.Combine(_someEvent, value);
            Console.WriteLine("Subscribed to SomeEvent: " + _sourceName);
        }
        remove
        {
            _someEvent = (EventHandler<MessageEventArgs>)
                Delegate.Remove(_someEvent, value);
            Console.WriteLine("Unsubscribed to SomeEvent: " + _sourceName);
        }

    }

    public void RaiseSomeEvent(string message)
    {
        var temp = _someEvent;
        if(temp != null)
            temp(this, new MessageEventArgs(message));
    }
}

public class MessageEventArgs : EventArgs
{
    public MessageEventArgs(string message)
    {
        Message = message;
    }

    public string Message { get; set; }   

    public override string ToString()
    {
        return Message;
    }
}

Solution Key Idea - StreamSwitcher

Now, here is the heart of the solution. We will use a Subject<IObservable<T>> to create a stream of streams. We can use the Observable.Switch() operator to return only the most recent stream to Observers. Here's the implementation, and an example of usage will follow:

public class StreamSwitcher<T> : IObservable<T>
{
    private Subject<IObservable<T>> _publisher;
    private IObservable<T> _stream;

    public StreamSwitcher()
    {
        _publisher = new Subject<IObservable<T>>();
        _stream = _publisher.Switch();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _stream.Subscribe(observer);
    }

    public void Switch(IObservable<T> newStream)
    {
        _publisher.OnNext(newStream);
    }

    public void Suspend()
    {
        _publisher.OnNext(Observable.Never<T>());
    }

    public void Stop()
    {
        _publisher.OnNext(Observable.Empty<T>());
        _publisher.OnCompleted();
    }
}

Usage

With this class you can hook up a new stream on each occasion you want to start events flowing by using the Switch method - which just sends the new event stream to the Subject.

You can unhook events using the Suspend method, which sends an Observable.Never<T>() to the Subject effectively pausing the flow of events.

Finally you can stop altogether by called to Stop to push an Observable.Empty<T>() andOnComplete()` the subject.

The best part is that this technique will cause Rx to do the right thing and properly unsubscribe from the underlying event sources each time you Switch, Suspend or Stop. Note also, that once Stopped no more events will flow, even if you Switch again.

Here's an example program:

static void Main()
{
    // create the switch to operate on
    // an event type of EventHandler<MessageEventArgs>()
    var switcher = new StreamSwitcher<EventPattern<MessageEventArgs>>();


    // You can expose switcher using Observable.AsObservable() [see MSDN]
    // to hide the implementation but here I just subscribe directly to
    // the OnNext and OnCompleted events.
    // This is how the end user gets their uninterrupted stream:
    switcher.Subscribe(
        Console.WriteLine,
        () => Console.WriteLine("Done!"));

    // Now I'll use the example event source to wire up the underlying
    // event for the first time
    var source = new EventSource("A");
    var sourceObservable = Observable.FromEventPattern<MessageEventArgs>(
        h => source.SomeEvent += h,
        h => source.SomeEvent -= h);


    // And we expose it to our observer with a call to Switch
    Console.WriteLine("Subscribing");
    switcher.Switch(sourceObservable);

    // Raise some events
    source.RaiseSomeEvent("1");
    source.RaiseSomeEvent("2");

    // When we call Suspend, the underlying event is unwired
    switcher.Suspend();
    Console.WriteLine("Unsubscribed");

    // Just to prove it, this is not received by the observer
    source.RaiseSomeEvent("3");

    // Now pretend we want to start events again
    // Just for kicks, we'll use an entirely new source of events
    // ... but we don't have to, you could just call Switch(sourceObservable)
    // with the previous instance.
    source = new EventSource("B");
    sourceObservable = Observable.FromEventPattern<MessageEventArgs>(
        h => source.SomeEvent += h,
        h => source.SomeEvent -= h);

    // Switch to the new event stream
    Console.WriteLine("Subscribing");
    switcher.Switch(sourceObservable);

    // Prove it works
    source.RaiseSomeEvent("3");
    source.RaiseSomeEvent("4");

    // Finally unsubscribe
    switcher.Stop();
}

This gives output like this:

Subscribing
Subscribed to SomeEvent: A
1
2
Unsubscribed to SomeEvent: A
Unsubscribed
Subscribing
Subscribed to SomeEvent: B
3
4
Unsubscribed to SomeEvent: B
Done!

Note it doesn't matter when the end user subscribes - I did it up front, but they can Subscribe any time and they'll start getting events at that point.

Hope that helps! Of course you'll need to pull together the various event types of the Geolocator API into a single convenient wrapper - but this should enable you to get there.

If you have several events you want to combine into a single stream using this technique, look at operators like Merge, which requires you to project the source streams into a common type, with Select maybe, or something like CombineLatest - this part of the problem shouldn't be too tricky.

OTHER TIPS

This is what I came up with.

I have created two subjects for the clients of my API to subscribe:

private readonly Subject<Geoposition> positionSubject = new Subject<Geoposition>();
private readonly Subject<PositionStatus> statusSubject = new Subject<PositionStatus>();

And observables for the events my API is subscribing to:

private IDisposable positionObservable;
private IDisposable statusObservable;

When I want to subscribe to the events, I just subscribe them into the subjects:

this.positionObservable = Observable
   .FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
       conversion: handler => (s, e) => handler(e),
       addHandler: handler => this.geolocator.PositionChanged += handler,
       removeHandler: handler => this.geolocator.PositionChanged -= handler)
   .Select(e => e.Position)
   .Subscribe(
       onNext: this.positionSubject.OnNext,
       onError: this.positionSubject.OnError);

this.statusObservable = Observable
   .FromEvent<TypedEventHandler<Geolocator, StatusChangedEventArgs>, StatusChangedEventArgs>(
       conversion: handler => (s, e) => handler(e),
       addHandler: handler => this.geolocator.StatusChanged += handler,
       removeHandler: handler => this.geolocator.StatusChanged -= handler)
   .Select(e => e.Status)
   .Subscribe(
       onNext: this.statusSubject.OnNext,
       onError: this.statusSubject.OnError);

When I want to cancel the subscription, I just dispose of the subscriptions:

this.positionObservable.Dispose();
this.statusObservable.Dispose();
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top