Question

I'm producing a sequence of 50 items each tree seconds. I then want to batch them at max 20 items, but also not waiting more than one second before I release the buffer.

That works great!

But since the interval never dies, Buffer keeps firing empty batch chunks...

How can I avoid that? Shure Where(buf => buf.Count > 0)should help - but that seems like a hack.

Observable
    .Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge()
    .Buffer(TimeSpan.FromSeconds(1), 20)
    .Subscribe(e => Console.WriteLine(e.Count));

Output:

0-0-0-20-20-10-0-20-20-10-0-0-20-20
Was it helpful?

Solution

The Where filter you propose is a sound approach, I'd go with that.

You could wrap the Buffer and Where into a single helper method named to make the intent clearer perhaps, but rest assured the Where clause is idiomatic Rx in this scenario.

Think of it this way; an empty Buffer is relaying information that no events occurred in the last second. While you can argue that this is implicit, it would require extra work to detect this if Buffer didn't emit an empty list. It just so happens it's not information you are interested in - so Where is an appropriate way to filter this information out.

A lazy timer solution

Following from your comment ("...the timer... be[ing] lazily initiated...") you can do this to create a lazy timer and omit the zero counts:

var source = Observable.Interval(TimeSpan.FromSeconds(3))
                    .Select(n => Observable.Repeat(n, 50))
                    .Merge();

var xs = source.Publish(pub =>
    pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
                        .Merge(pub.Skip(19)).Take(1)));

xs.Subscribe(x => Console.WriteLine(x.Count));

Explanation

Publishing

This query requires subscribing to the source events multiple times. To avoid unexpected side-effects, we use Publish to give us pub which is a stream that multicasts the source creating just a single subscription to it. This replaces the older Publish().RefCount() technique that achieved the same end, effectively giving us a "hot" version of the source stream.

In this case, this is necessary to ensure the subsequent buffer closing streams produced after the first will start with the current events - if the source was cold they would start over each time. I wrote a bit about publishing here.

The main query

We use an overload of Buffer that accepts a factory function that is called for every buffer emitted to obtain an observable stream whose first event is a signal to terminate the current buffer.

In this case, we want to terminate the buffer when either the first event into the buffer has been there for a full second, or when 20 events have appeared from the source - whichever comes first.

To achieve this we Merge streams that describe each case - the Take(1).Delay(...) combo describes the first condition, and the Skip(19).Take(1) describes the second.

However, I would still test performance the easy way, because I still suspect this is overkill, but a lot depends on the precise details of the platform and scenario etc.

OTHER TIPS

After using the accepted answer for quite a while I would now suggest a different implementation (inspired by James Skip / Take approach and this answer):

var source = Observable.Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge();

var xs = source.BufferOmitEmpty(TimeSpan.FromSeconds(1), 20);

xs.Subscribe(x => Console.WriteLine(x.Count));

With an extension method BufferOmitEmpty like:

public static IObservable<IList<TSource>> BufferOmitEmpty<TSource>(this IObservable<TSource> observable, TimeSpan maxDelay, int maxBufferCount)
{
    return observable
        .GroupByUntil(x => 1, g => Observable.Timer(maxDelay).Merge(g.Skip(maxBufferCount - 1).Take(1).Select(x => 1L)))
        .Select(x => x.ToArray())
        .Switch();
}  

It is 'lazy', because no groups are created as long as there are no elements on the source sequence, so there are no empty buffers. As in Toms answer there is an other nice advantage to the Buffer / Where implementation, that is the buffer is started when the first element arrives. So elements following each other within buffer time after a quiet period are processed in the same buffer.

Why not to use the Buffer method

Three problems occured when I was using the Buffer approach (they might be irrelevant for the scope of the question, so this is a warning to people who use stack overflow answers in different contexts like me):

  1. Because of the Delay one thread is used per subscriber.
  2. In scenarios with long running subscribers elements from the source sequence can be lost.
  3. With multiple subscribers it sometimes creates buffers with count greater than maxBufferCount.

(I can supply sample code for 2. and 3. but I'm insecure whether to post it here or in a different question because I cannot fully explain why it behaves this way)

RxJs5 has hidden features buried into their source code. It turns out it's pretty easy to achieve with bufferTime

From the source code, the signature looks like this:

export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: IScheduler): Observable<T[]>;

So your code would be like this:

observable.bufferTime(1000, null, 20)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top