Similarly to this answer and this one, there is an Rxx overload to CombineLatest
that transforms an IObservable<IObservable<T>>
to an IObservable<IList<T>>
which should do what you want. I think that this function should really be in the main Rx library as it's very useful.
How to combine aggregations from grouping
-
22-06-2023 - |
Question
I have a tail implemetation that pushes new lines from a text file to a Subject
. The file has the following data:
source1, 1
source2, 3
source1, 2
source1, 1
source3, 10
I'm trying the create a view of latest aggregations, let's say running sum, by source.
After first line:
source1, 1
After second line:
source1, 1
source2, 3
After third line:
source1, 3
source2, 3
After forth line:
source1, 4
source2, 3
After last line:
After forth line:
source1, 4
source2, 3
source3, 10
This is what I have got so far (in LinqPad):
var source = Observable.Generate<int,Measurment>(0,
current => current <= 10,
current => current + 1,
current => current % 3 == 0
? new Measurment { Source = "Source1", Value = current }
: current != 10
? new Measurment { Source = "Source2", Value = current }
: new Measurment { Source = "Source3", Value = current }
);
var grouped = source
.GroupBy(m => m.Source)
.Select(g =>g.Scan((acc,current) =>
new Measurment { Source = acc.Source, Value = acc.Value + current.Value }));
grouped.Dump();
}
struct Measurment
{
public string Source;
public int Value;
I get IObservable<IObservable<Measurment>>
. The inner IObservable have the correctly aggregated values. Now I need to combine the streams and push out a list of Measurments on change in any of the Observables, Any suggestion how to do it?
Solution
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow