To elaborate on Chris's answer and address your comments. Starting from here:
var personAsObservable = Observable.Create<Person>(observer => {
_mngr.OnAction(person => {
observer.OnNext(person);
observer.OnCompleted();
});
return Disposable.Empty;
});
As it stands, this will cause OnAction
to be called for every subscriber.
The general approach for avoiding this is to publish the observable. Publishing a stream causes subscribers to share events.
The Publish
operator returns a connectable observable. This can accept subscribers, but won't actually subscribe to the underlying stream until you call Connect()
- a method that returns an IDisposable
you can use to control the single connection to the underlying observable - dispose it to unsubscribe.
There are several operators related to publishing that help you govern subscriptions to the underlying stream.
RefCount
works with a connectable observable to manage the connection and share events with subscriptions as long as the underlying subscription is running. Once it completes, subsequent subscriptions will restart. This may be sufficient for your purposes. To use this, subscribe to the following (which is a very common Rx idiom):
var personPub = personAsObservable.Publish().RefCount();
Other approaches include appending Replay(n)
to the source observable where n events will be cached and replayed to subsequence subscribers that arrive after the underlying stream has completed. So this is useful if you want to only get results once. Note that you must call Connect
on Replay
explicitly. You can also just call Publish
and manage connecting yourself.
Note that appending these operators does not change the behaviour of the underlying observable - all the publishing, caching etc. is done on the appended operator. So in the above example, it's personPub
that should be used by subscribers.
Controlling the connection explicitly looks like this:
IConnectableObservable<Person> personPub = personAsObservable.Publish();
var subscriberOne = personPub.Subscribe(...); // personAsObservable not started
var connection = personPub.Connect(); // *now* personAsObservable is subscribed
var subscriberTwo = personPub.Subscribe(...); // shares underlying subscription
// but could miss events
connection.Dispose(); // underlying connection terminated
// but may have already OnCompleted anyway
// in which case this is a no-op