According to the Rx Design Guidelines, an observable should never call OnNext
of an observer concurrently. It will always wait for the current call to complete before making the next call. All Rx
methods honor this convention. And, more importantly, they assume you also honor this convention. When you violate this condition, you may encounter subtle bugs in the behavior of your Observable
.
For those times when you have source data that does not honor this convention (ie it can produce data concurrently), they provide Synchronize.
Observable.FromEventPattern
assumes you will not be firing concurrent events and so does nothing to prevent concurrent downstream notifications. If you plan on firing events from multiple threads, sometimes concurrently, then use Synchronize()
as the first operation you do after FromEventPattern
:
// this will get you in trouble if your event source might fire events concurrently.
var events = Observable.FromEventPattern(...).Select(...).GroupBy(...);
// this version will protect you in that case.
var events = Observable.FromEventPattern(...).Synchronize().Select(...).GroupBy(...);
Now all of the downstream operators (and eventually your observer) are protected from concurrent notifications, as promised by the Rx Design Guidelines. Synchronize
works by using a simple mutex (aka the lock
statement). There is no fancy queueing or anything. If one thread attempts to raise an event while another thread is already raising it, the 2nd thread will block until the first thread finishes.