I would probably write the threshold function the following way, taking advantage of the Timestamp
combinator.
public static IObservable<U> TimeLimitedThreshold
<T,U>
( this IObservable<T> source
, int count
, TimeSpan timeSpan
, Func<IList<T>,U> selector
, IScheduler scheduler = null
)
{
var tmp = scheduler == null
? source.Timestamp()
: source.Timestamp(scheduler);
return tmp
.Buffer(count, 1).Where(b=>b.Count==count)
.Select(b => new { b, span = b.Last().Timestamp - b.First().Timestamp })
.Where(o => o.span <= timeSpan)
.Select(o => selector(o.b.Select(ts=>ts.Value).ToList()));
}
As an added convenience when the trigger is fired the complete buffer that satisfies the trigger is provided to your selector function.
For example
var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
IObservable<string> fastKeySequences = keys.TimeLimitedThreshHold
( 3
, TimeSpan.FromSeconds(5)
, keys => String.Join("", keys)
);
The extra IScheduler
parameter is given as the Timestamp
method has an extra overload which takes one. This might be useful if you want to have a custom scheduler which doesn't track time according to the internal clock. For testing purposes using an historical scheduler can be useful and then you would need the extra overload.
and here is a fully working test showing the use of a schedular. ( using XUnit and FluentAssertions for the Should().Be(..) )
public class TimeLimitedThresholdSpec : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateColdObservable
( OnNext(100, "A")
, OnNext(200, "B")
, OnNext(250, "C")
, OnNext(255, "D")
, OnNext(258, "E")
, OnNext(600, "F")
);
var fixture = o
.TimeLimitedThreshold
(3
, TimeSpan.FromTicks(20)
, b => String.Join("", b)
, _Scheduler
);
var actual = _Scheduler
.Start(()=>fixture, created:0, subscribed:1, disposed:1000);
actual.Messages.Count.Should().Be(1);
actual.Messages[0].Value.Value.Should().Be("CDE");
}
}
Subscribing and is the following way
IDisposable subscription = fastKeySequences.Subscribe(s=>Console.WriteLine(s));
and when you want to cancel the subscription ( clean up memory and resources ) you dispose of the subscription. Simply.
subscription.Dispose()