Strilanc, given your concern about unwanted activity when the source stream is quiet, you might be interested in this method of pacing events - I wasn't going to add this otherwise, as I think J. Lennon's implementation is perfectly reasonable (and much simpler), and the performance of the timer isn't going to hurt.
There is one other interesting difference in this implementation - it differs from the Sample
approach because it emits events occurring outside the cooldown period immediately rather than at the next sampling interval. It maintains no timer outside the cooldown.
EDIT - Here is v3 solving the issue Chris mentioned in the comments - it ensures that changes occurring during the cool-down themselves trigger a new cool-down period.
public static IObservable<T> LimitRate<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
return source.DistinctUntilChanged()
.GroupByUntil(k => 0,
g => Observable.Timer(duration, scheduler))
.SelectMany(x => x.FirstAsync()
.Merge(x.Skip(1)
.TakeLast(1)))
.Select(x => Observable.Return(x)
.Concat(Observable.Empty<T>()
.Delay(duration, scheduler)))
.Concat();
}
This works by initially using a GroupByUntil
to pack all events into the same group for the duration of the cool-down period. It watches for changes and emits the final change (if any) as the group expires.
Then the resulting events are projected into a streams whose OnCompleted is delayed by the cool-down period. These streams are then concatenated together. This prevents events being any closer together than the cool-down, but otherwise they are emitted as soon as possible.
Here are the unit tests (updated for v3 edit), which you can run using nuget packages rx-testing
and nunit
:
public class LimitRateTests : ReactiveTest
{
[Test]
public void SlowerThanRateIsUnchanged()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
}
[Test]
public void FasterThanRateIsSampled()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(140, 5),
OnNext(150, 2),
OnNext(300, 3),
OnNext(350, 4));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
}
[Test]
public void DuplicatesAreOmitted()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(300, 1),
OnNext(350, 1));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1));
}
[Test]
public void CoolResetsCorrectly()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 2),
OnNext(205, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void MixedPacingWorks()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(825, 5));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(850, 5));
}
}