Question

I have a sequence that is extremely chatty, and I was trying to make it more efficient by processing events in batches. A Buffer operator with time and count conditions was something that seemed to match my requirements, except for one small nuance. When you use this overload, the subscription gets notified after the specified time delay, regardless whether there are any items in the buffer. This gets really annoying cause most of the time my subscription gets an empty list from the buffer operator. Considering that it is a multi-threaded application where subscriber is on UI thread, it turns out to be not the most optimal approach to process items in batches. I was wondering if there was a way to use available operators to create a sequence that would fire either when a certain amount of items in the buffer are present, or when a certain time has passed, but if and only if there are any items in the buffer. I know that I could do something like this:

sequence.Buffer(TimeSpan.FromSeconds(5), 1).Where(e=>e.Count > 0)

But I was wondering if there is another way to do this, cause somehow I feel that it's not the best way.

Was it helpful?

Solution

I can't see a reason to worry about this - you have an idiomatic solution. An empty buffer is information, so it's reasonable for the framework implementation to return it. Any other method would effectively be doing the same thing you are internally anyway.

When I find myself using small groups of standard operators I often wrap them in a more explanatory extension method. E.g.:

public static class ObservableExtensions
{
    public static IObservable<IList<T>> ToNonEmptyBuffers<T>(
        this IObservable<T> source,
        TimeSpan timespan,
        int count,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Buffer(timespan, count, scheduler ?? Scheduler.Default)
                     .Where(buffer => buffer.Count > 0);
    }
}

Allowing:

sequence.ToNonEmptyBuffers(TimeSpan.FromSeconds(5), 1);

OTHER TIPS

For the sake of "Rx-i-ness" I throw the following in to the pile.

Personally, I think James' answer is sufficient (probably better in a lot of scenarios). The only difference (as far as output goes), is that the buffer timer starts only when a new item is yielded. This is why we don't need to filter out empty buffers. That being said, this may not be the most efficient solution. It's just here to show the power of composition.

var batches = source
        .GroupByUntil(
            // This means we're not really grouping, but windowing.
            // granted, if we needed to group our batches, this is useful!
            x => 0,
            group => Observable.Amb(
                // this means we get a max of 11 per batch
                group.Skip(10),
                // This means we get a max batch time of 10 seconds
                group.Take(1).Delay(TimeSpan.FromSeconds(10))
        ))
        // Since GroupByUntil gives us windows, we can ToArray them.
        .SelectMany(x => x.ToArray());
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top