Domanda

I have a tail implemetation that pushes new lines from a text file to a Subject. The file has the following data:

source1, 1
source2, 3
source1, 2
source1, 1
source3, 10

I'm trying the create a view of latest aggregations, let's say running sum, by source.

After first line:

source1, 1

After second line:

source1, 1
source2, 3

After third line:

source1, 3
source2, 3

After forth line:

source1, 4
source2, 3

After last line:

After forth line:

source1, 4
source2, 3
source3, 10

This is what I have got so far (in LinqPad):

var source = Observable.Generate<int,Measurment>(0,
    current => current <= 10,
    current => current + 1,
    current => current % 3 == 0
        ? new Measurment { Source = "Source1", Value = current }
        : current != 10
            ? new Measurment { Source = "Source2", Value = current }
            : new Measurment { Source = "Source3", Value = current }
);
var grouped = source
.GroupBy(m => m.Source)
.Select(g =>g.Scan((acc,current) =>
    new Measurment { Source = acc.Source, Value = acc.Value + current.Value }));

grouped.Dump();

}
struct Measurment
{
    public string Source;
    public int Value;

I get IObservable<IObservable<Measurment>>. The inner IObservable have the correctly aggregated values. Now I need to combine the streams and push out a list of Measurments on change in any of the Observables, Any suggestion how to do it?

È stato utile?

Soluzione

Similarly to this answer and this one, there is an Rxx overload to CombineLatest that transforms an IObservable<IObservable<T>> to an IObservable<IList<T>> which should do what you want. I think that this function should really be in the main Rx library as it's very useful.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top