Question

I have a situation where i need to sample several observables at times defined by a controlling observable.

I'll jump straight to a marble diagram.

co below is the controlling observable (Observe.Interval(TimeSpan.FromSeconds(1))

o* is observables of different types

co tick  ---------x---------x---------x---------x---------x---------x-
o1 int   --2----1-------4--------3--------1-------2----3-------5------
o2 bool  -t-----------f---------t---------------------------f---------

result   ---------Z---------Z---------Z---------Z---------Z---------Z-

I need to develop an extension method that samples the latest value of each o-observable only when there is a tick on the controlling channel.

Z would then be the result of passing the latest sampled values from the o-observables to a selector function. It is kinda like CombineLatest but not quite.

i.e., as a very simplified example, lets say that the func looks like this:

 (i, b) => {if (b) return i; else return 0; }

I would like the result in this case to be

 result   ---------1---------0---------3---------1---------3---------0-

First 1 is because o2 was last true and o1 was last 1 Second 0 is because o2 was last false.

Note that o2 doesn't always produce one value between every sample. I still need to grab the last value. (.Sample() wont work.). In reality, the function and types involved is more complex so don't make assumptions because of the int and bool types above.

Also, i need the selector function to only run once every tick.

This is my current solution but it doesn't comply with the requirement above since i sample after CombineLatest.

 var combined = interval.CombineSampled(
            Status,
            H1,
            H2
            M,
            T,
            HMode,
            TMode,
            (i, s, hr1, hr2, m, t, hMode, tMode) =>
            {
               ... (left out)

               return result;
            });

CombineSampled:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
    {
        return controllingSource.Publish(s => s.CombineLatest(source2, source3, source4, source5, source6, source7, source8, selector).SampleEx(s));
    }


public static IObservable<T> SampleEx<T, S>( this IObservable<T> source, IObservable<S> samples )
    {
        // This is different from the Rx version in that source elements will be repeated, and that
        // we complete when either sequence ends. 
        return Observable.Create( ( IObserver<T> obs ) =>
            {
                object gate = new object();
                bool hasSource = false;
                var value = default(T);

                return new CompositeDisposable(
                    source.Synchronize( gate ).Subscribe( v => { value = v; hasSource = true; }, obs ),
                    samples.Synchronize( gate ).Subscribe( _ => { if ( hasSource ) obs.OnNext( value ); }, obs )
                );
            } );
    }
Was it helpful?

Solution

change your CombineSampled to this:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
{
    return controllingSource.Publish(s => s
        .CombineLatest(source2, source3, source4, source5, source6, source7, source8,
           Tuple.Create<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8>)
        .SampleEx(s)
        .Select(t => selector(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7, t.Item8));
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top