Question

I have a tail implemetation that pushes new lines from a text file to a Subject. The file has the following data:

source1, 1
source2, 3
source1, 2
source1, 1
source3, 10

I'm trying the create a view of latest aggregations, let's say running sum, by source.

After first line:

source1, 1

After second line:

source1, 1
source2, 3

After third line:

source1, 3
source2, 3

After forth line:

source1, 4
source2, 3

After last line:

After forth line:

source1, 4
source2, 3
source3, 10

This is what I have got so far (in LinqPad):

var source = Observable.Generate<int,Measurment>(0,
    current => current <= 10,
    current => current + 1,
    current => current % 3 == 0
        ? new Measurment { Source = "Source1", Value = current }
        : current != 10
            ? new Measurment { Source = "Source2", Value = current }
            : new Measurment { Source = "Source3", Value = current }
);
var grouped = source
.GroupBy(m => m.Source)
.Select(g =>g.Scan((acc,current) =>
    new Measurment { Source = acc.Source, Value = acc.Value + current.Value }));

grouped.Dump();

}
struct Measurment
{
    public string Source;
    public int Value;

I get IObservable<IObservable<Measurment>>. The inner IObservable have the correctly aggregated values. Now I need to combine the streams and push out a list of Measurments on change in any of the Observables, Any suggestion how to do it?

Was it helpful?

Solution

Similarly to this answer and this one, there is an Rxx overload to CombineLatest that transforms an IObservable<IObservable<T>> to an IObservable<IList<T>> which should do what you want. I think that this function should really be in the main Rx library as it's very useful.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top