If your observables is designed to send generic messages, and your observer is design to translate them, you also need a way of indicating to the producer what kinds of messages you're interested in.
One way to do this is "requesting an observable".
ObservableWeatherSource GetNotifications(WeatherWarnings warningTypes, string location);
Another way might be to lazily indicate what notifications you're interested in.
ObservableWeatherSource source = GetWeatherSource();
source
.Where(x => x.WeatherWarningType === WeatherWarnings.Rain)
.Subscribe(new WeatherObserver());
source.ExpressInterestIn(WeatherWarnings.Rain, "San Francisco");
Or, possibly, you might be interested in writing a specialized query language for weather. You could probably do this through the IQbservable
and a query provider, but I have little knowledge of this area of Rx.