Question

I've been looking for examples on how to use Observable.Buffer in rx but can't find anything more substantial than boiler plate time buffered stuff.

There does seem to be an overload to specify a "bufferClosingSelector" but I can't wrap my mind around it.

What I'm trying to do is create a sequence that buffers by time or by an "accumulation". Consider a request stream where every request has some sort of weight to it and I do not want to process more than x accumulated weight at a time, or if not enough has accumulated just give me what has come in the last timeframe(regular Buffer functionality)

Was it helpful?

Solution

bufferClosingSelector is a function called every time to get an Observable which will produce a value when the buffer is expected to be closed.

For example,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) works like the regular Buffer(time) overload.

In you want to weight a sequence, you can apply a Scan over the sequence and then decide on your aggregating condition.

E.g., source.Scan((a,c) => a + c).SkipWhile(a => a < 100) gives you a sequence which produces a value when the source sequence has added up to more than 100.

You can use Amb to race these two closing conditions to see which reacts first:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

You can use any series of combinators which produces any value for the buffer to be closed at that point.

Note: The value given to the closing selector doesn't matter - it's the notification that matters. So to combine sources of different types with Amb simply change it to System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top