質問

Have been trying to google this but getting a bit stuck.

Let's say we have a class that fires an event, and that event could be fired by several threads at the same time.

Using Observable.FromEventPattern, we create an Observable, and subscribe to that event. How exactly does Rx manage multiple those events being fired at once? Let's say we have 3 events fired in quick succession on different threads. Does it queue them internally, and then call the Subscribe delegate synchronously for each one? Let's say we were subscribing on a thread pool, can we still guarantee the Subscriptions would be processed separately in time?

Following on from that, let's say for each event, we want to perform an action, but it's a method that's potentially not thread safe, so we only want one thread to be in this method at a time. Now I see we can use an EventLoop Scheduler, and presumably we wouldn't need to implement any locking on the code?

Also, would observing on the Current Thread be an option? Is Current Thread the thread that the event was fired from, or the event the subscription was set up on? i.e. Is that current thread guaranteed to always be the same or could be have 2 threads running ending up in the method at the same time?

Thx

PS: I put an example together but I always seem to end up on the samethread in my subscrive method, even when I ObserveOn the threadpool, which is confusing :S

PSS: From doing a few more experiments, it seems that if no Schedulers are specified, then RX will just execute on whatever thread the event was fired on, meaning it processes several concurrently. As soon as I introduce a scheduler, it always runs things consecutively, no matter what the type of the scheduler is. Strange :S

役に立ちましたか?

解決

According to the Rx Design Guidelines, an observable should never call OnNext of an observer concurrently. It will always wait for the current call to complete before making the next call. All Rx methods honor this convention. And, more importantly, they assume you also honor this convention. When you violate this condition, you may encounter subtle bugs in the behavior of your Observable.

For those times when you have source data that does not honor this convention (ie it can produce data concurrently), they provide Synchronize.

Observable.FromEventPattern assumes you will not be firing concurrent events and so does nothing to prevent concurrent downstream notifications. If you plan on firing events from multiple threads, sometimes concurrently, then use Synchronize() as the first operation you do after FromEventPattern:

// this will get you in trouble if your event source might fire events concurrently.
var events = Observable.FromEventPattern(...).Select(...).GroupBy(...);

// this version will protect you in that case.
var events = Observable.FromEventPattern(...).Synchronize().Select(...).GroupBy(...);

Now all of the downstream operators (and eventually your observer) are protected from concurrent notifications, as promised by the Rx Design Guidelines. Synchronize works by using a simple mutex (aka the lock statement). There is no fancy queueing or anything. If one thread attempts to raise an event while another thread is already raising it, the 2nd thread will block until the first thread finishes.

他のヒント

In addition to the recommendation to use Synchronize, it's probably worth having a read of the Intro to Rx section on scheduling and threading. It Covers the different schedulers and their relationship to threads, as well as the differences between ObserveOn and SubscribeOn, etc.

If you have several producers then there are RX methods for combining them in a threadsafe way

For combining streams of the same type of event into a single stream

Observable.Merge

For combining stream of different types of events into a single stream using a selector to transform the latest value on each stream into a new value.

Observable.CombineLatest

For example combining stock prices from different sources

IObservable<StockPrice> source0;
IObservable<StockPrice> source1;
IObservable<StockPrice> combinedSources = source0.Merge(source1);

or create balloons at the current position every time there is a click

IObservable<ClickEvent> clicks;
IObservable<Position> position;
IObservable<Balloons> balloons = clicks
    .CombineLatest
    ( positions
    , (click,position)=>new Balloon(position.X, position.Y)
    );

To make this specifically relevant to your question you say there is a class which combines events from different threads. Then I would use Observable.Merge to combine the individual event sources and expose that as an Observable on your main class.

BTW if your threads are actually tasks that are firing events to say they have completed here is an interesting patterns

IObservable<Job> jobSource;
IObservable<IObservable<JobResult>> resultTasks = jobSource
    .Select(job=>Observable.FromAsync(cancelationToken=>DoJob(token,job)));
IObservable<JobResult> results = resultTasks.Merge();

Where what is happening is you are getting a stream of jobs in. From the jobs you are creating a stream of asynchronous tasks ( not running yet ). Merge then runs the tasks and collects the results. It is an example of a mapreduce algorithm. The cancellation token can be used to cancel running async tasks if the observable is unsubscribed from (ie canceled )

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top