Aggregate PropertyChanged events from the collection into one IObservable<EventPattern<PropertyChangedEventArgs>>
-
28-10-2019 - |
Question
I have collection of INotifyPropertyChanged objects and would like to stream all PropertyChanged events into a single observable sequence for further processing.
Here is a code
IObservable<EventPattern<PropertyChangedEventArgs>> _allEvents = null;
// Items contains collection of item, which implements INotifyPropertyChanged
foreach (var item in Items)
{
var seq = Observable.FromEventPattern<PropertyChangedEventArgs>(item, "PropertyChanged");
if (_allEvents == null)
_allEvents = seq;
else
_allEvents.Merge(seq);
}
// subscribe to the aggregated observable sequence
if (_allEvents != null)
_allEvents.Subscribe(evt => Trace.WriteLine(" Property Changed -> " + evt.EventArgs.PropertyName));
A single Subscribe doesn't work here for some reason in the aggregated sequence. Looks like I aggregated (using Reactive Extensions's Merge function) incorrectly. But, subscribe inside the loop works perfectly.
Can anybody assist me here, how to aggregate a many event streams into one with reactive extensions?
Thanks
Solution
Try this:
var _allEvents = Observable
.Merge(Items
.Select(item => Observable
.FromEventPattern<PropertyChangedEventArgs>(item, "PropertyChanged")));
_allEvents.Subscribe(evt => Trace.WriteLine(" Property Changed -> " + evt.EventArgs.PropertyName));
The reason your approach doesn't work is that you were calling IObservable<T> Observable.Merge<T>(this IObservable<T> first, IObservable<T> second)
. The return type of this is a resulting observable. I think you might have been thinking that Merge modified the Observable, but you can think of Observables as immutable (sort of). The way to get your approach to work would have been:
_allEvents = _allEvents.Merge(seq);
But... yuck. Don't do that.