Domanda

I have a method void OnAction(Action<Person> callback) and I wanna create an IObservable<T> from this using the reactive extensions (Rx).

I have found two methods that could help me: Observable.FromEvent() and Observable.Start():

var observable = Observable.Start(() =>
                                              {
                                                  Person person = null;
                                                  _mngr.OnAction(p => person = p);
                                                  return person;
                                              });

and:

  var observable = Observable.FromEvent<Person>(
                action => _mngr.OnAction(action), //Add Handler
                action => // Remove Handler
                {
                });

The first one have an closure and I must evaluate if person != null:

var foo= observable.Where(p =>
                          {
                if(p!=null) //...
                          });

The second one takes an Action argument that detaches the given event handler from the underlying .NET event... But OnAction method isn't a .NET event.

Both ways works well, but (in my opinion) smells...

So, what is the best way to create an IObservable from OnAction Method?

È stato utile?

Soluzione

To elaborate on Chris's answer and address your comments. Starting from here:

var personAsObservable = Observable.Create<Person>(observer => {
    _mngr.OnAction(person => {
        observer.OnNext(person);
        observer.OnCompleted();
    });
    return Disposable.Empty;
});

As it stands, this will cause OnAction to be called for every subscriber.

The general approach for avoiding this is to publish the observable. Publishing a stream causes subscribers to share events.

The Publish operator returns a connectable observable. This can accept subscribers, but won't actually subscribe to the underlying stream until you call Connect() - a method that returns an IDisposable you can use to control the single connection to the underlying observable - dispose it to unsubscribe.

There are several operators related to publishing that help you govern subscriptions to the underlying stream.

RefCount works with a connectable observable to manage the connection and share events with subscriptions as long as the underlying subscription is running. Once it completes, subsequent subscriptions will restart. This may be sufficient for your purposes. To use this, subscribe to the following (which is a very common Rx idiom):

var personPub = personAsObservable.Publish().RefCount();

Other approaches include appending Replay(n) to the source observable where n events will be cached and replayed to subsequence subscribers that arrive after the underlying stream has completed. So this is useful if you want to only get results once. Note that you must call Connect on Replay explicitly. You can also just call Publish and manage connecting yourself.

Note that appending these operators does not change the behaviour of the underlying observable - all the publishing, caching etc. is done on the appended operator. So in the above example, it's personPub that should be used by subscribers.

Controlling the connection explicitly looks like this:

IConnectableObservable<Person> personPub = personAsObservable.Publish();
var subscriberOne = personPub.Subscribe(...); // personAsObservable not started
var connection = personPub.Connect(); // *now* personAsObservable is subscribed
var subscriberTwo = personPub.Subscribe(...); // shares underlying subscription
                                              // but could miss events
connection.Dispose(); // underlying connection terminated
                      // but may have already OnCompleted anyway
                      // in which case this is a no-op

Altri suggerimenti

var personAsObservable
        = Observable.Create<Person>(observer => {
            _mngr.OnAction(person => {
                observer.OnNext(person);
                observer.OnCompleted();
            });
        });

If you would like to ensure this method is only called once, you could do the following.

var publishedPerson = personaAsObservable.Replay(1);

publishedPerson.Connect();

publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top