How can we use group functions on IObservable<T> while observing is not finished or on hot, long running observable

StackOverflow https://stackoverflow.com/questions/18753658

Question

I have the following Observable

IObservable<int> observable = Observable.Range(0, 10);

I have the following subscriptions

observable.Subscribe(v => Console.WriteLine("Received value: " + v));
observable.Sum().Subscribe(s => Console.WriteLine("Sum so far: " + s));

My desired output is following

Received value: 0
Sum so far: 0
Received value: 1
Sum so far: 1
Received value: 2
Sum so far: 3
...
...

But what I am actually getting is the Sum after receiving the final value of the series.

My actual goal is to fit this into the puzzle where I have a Hot, continuously running observable giving me decimal values and I have to subscribe() to the current average and sum values.

Any help in this will be appreciated.

Was it helpful?

Solution

Use Observable.Scan:

var observable = Observable.Range(0, 10);
observable
    .Subscribe(x => Console.WriteLine("Received value: " + x));
observable
    .Scan(0, (sum, x) => sum + x)
    .Subscribe(sum => Console.Writeline("Sum so far: " + sum));

Observable.Sum is going to only ever emit a single token, which is the sum of the entire finished sequence. Observable.Scan is going to emit a token every time it receives a token from the source.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top