Question

I've been trying to build a circuit breaker that can be configured to trip with compound rules like:

  • 2 timeout exceptions within 45 seconds; or
  • 50 of any other type of exception within 5 minutes

so figured the best way to do this was with sliding windows (or time-buffers whatever you want to call them).

Based on what I could find online and piece together myself I wrote this simple console app:

static IEnumerable<ConsoleKeyInfo> KeyPresses()
{
    ConsoleKeyInfo key;
    do
    {
        key = Console.ReadKey();
        yield return key;
    } while (key.Key != ConsoleKey.Escape);
}

static IObservable<int> TimeLimitedThreshold<T>(IObservable<T> source, TimeSpan timeLimit, int threshold)
{
    return source.Window(source, _ => Observable.Timer(timeLimit))
        .Select(x => x.Count())
        .Merge()
        .Where(count => count >= threshold)
        .Take(1);
}

static void Main(string[] args)
{
    Console.WriteLine("Starting");

    var timeLimit = TimeSpan.FromSeconds(5);
    const int threshold = 3;
    var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();

    var thresholdHit = TimeLimitedThreshold(keys, timeLimit, threshold);
    thresholdHit.Subscribe(count => Console.WriteLine("THRESHOLD BREACHED! Count is: {0}", count));

    // block the main thread so we don't terminate
    keys.Where(key => key.Key == ConsoleKey.Escape).FirstAsync().Wait();
    Console.WriteLine("Finished");
}

(If I should put that in a gist or pastebin instead of in the question please just say so)

Now this seems to do what I want, if I press any key 3 or more times within 5 seconds the "THRESHOLD BREACHED!!!" is printed once and nothing more happens.

My questions are:

  • Is the TimeLimitedThreshold function at a sensible level of abstraction for when working with Rx.Net?
  • Should I be injecting the scheduler to use when creating the Observable.Timer()?
  • What cleanup am I missing if any? Memory usage when it comes to Rx.NET really baffles me.
Was it helpful?

Solution

I would probably write the threshold function the following way, taking advantage of the Timestamp combinator.

    public static IObservable<U> TimeLimitedThreshold
        <T,U>
        ( this IObservable<T> source
        , int count
        , TimeSpan timeSpan
        , Func<IList<T>,U> selector
        , IScheduler scheduler = null
        )
    {
        var tmp = scheduler == null
            ? source.Timestamp()
            : source.Timestamp(scheduler);

        return  tmp
             .Buffer(count, 1).Where(b=>b.Count==count)
             .Select(b => new { b, span = b.Last().Timestamp - b.First().Timestamp })
             .Where(o => o.span <= timeSpan)
             .Select(o => selector(o.b.Select(ts=>ts.Value).ToList()));
    }

As an added convenience when the trigger is fired the complete buffer that satisfies the trigger is provided to your selector function.

For example

 var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
 IObservable<string> fastKeySequences = keys.TimeLimitedThreshHold
     ( 3
     , TimeSpan.FromSeconds(5)
     , keys => String.Join("", keys)
     );

The extra IScheduler parameter is given as the Timestamp method has an extra overload which takes one. This might be useful if you want to have a custom scheduler which doesn't track time according to the internal clock. For testing purposes using an historical scheduler can be useful and then you would need the extra overload.

and here is a fully working test showing the use of a schedular. ( using XUnit and FluentAssertions for the Should().Be(..) )

public class TimeLimitedThresholdSpec  : ReactiveTest
{

    TestScheduler _Scheduler = new TestScheduler();
    [Fact]
    public void ShouldWork()
    {
        var o = _Scheduler.CreateColdObservable
            ( OnNext(100, "A")
            , OnNext(200, "B")
            , OnNext(250, "C")
            , OnNext(255, "D")
            , OnNext(258, "E")
            , OnNext(600, "F")
            );

        var fixture = o
            .TimeLimitedThreshold
                (3
                , TimeSpan.FromTicks(20)
                , b => String.Join("", b)
                , _Scheduler
                );

        var actual = _Scheduler
            .Start(()=>fixture, created:0, subscribed:1, disposed:1000);
        actual.Messages.Count.Should().Be(1);
        actual.Messages[0].Value.Value.Should().Be("CDE");


    }

}

Subscribing and is the following way

IDisposable subscription = fastKeySequences.Subscribe(s=>Console.WriteLine(s));

and when you want to cancel the subscription ( clean up memory and resources ) you dispose of the subscription. Simply.

subscription.Dispose()

OTHER TIPS

Here's an alternative approach that uses a single delay in favour of buffers and timers. It doesn't give you the events - it just signals when there is a violation - but it uses less memory as it doesn't hold on to too much.

public static class ObservableExtensions
{
    public static IObservable<Unit> TimeLimitedThreshold<TSource>(
        this IObservable<TSource> source,
        long threshold,
        TimeSpan timeLimit,
        IScheduler s)
    {
        var events = source.Publish().RefCount();
        var count = events.Select(_ => 1)
                        .Merge(events.Select(_ => -1)
                                    .Delay(timeLimit, s));                                               
        return count.Scan((x,y) => x + y)              
                    .Where(c => c == threshold)
                    .Select(_ => Unit.Default);           
    }
}

The Publish().RefCount() is used to avoid subscribing to the source more than one. The query projects all events to 1, and a delayed stream of events to -1, then produces a running total. If the running total reaches the threshold, we emit a signal (Unit.Default is the Rx type to represent an event without a payload). Here's a test (just runs in LINQPad with nuget rx-testing):

void Main()
{    
    var s = new TestScheduler();
    var source = s.CreateColdObservable(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnNext(2)),
        new Recorded<Notification<int>>(300, Notification.CreateOnNext(3)),
        new Recorded<Notification<int>>(330, Notification.CreateOnNext(4)));

    var results = s.CreateObserver<Unit>();

    source.TimeLimitedThreshold(
        2,
        TimeSpan.FromTicks(30),
        s).Subscribe(results);

    s.Start();

    ReactiveAssert.AssertEqual(
        results.Messages,
        new List<Recorded<Notification<Unit>>> {
            new Recorded<Notification<Unit>>(
                330, Notification.CreateOnNext(Unit.Default))
        });
}

Edit

After Matthew Finlay's observation that the above would also fire as the threshold is passed "on the way down", I added this version that checks only for threshold crossing in the positive direction:

public static class ObservableExtensions
{
    public static IObservable<Unit> TimeLimitedThreshold<TSource>(
        this IObservable<TSource> source,
        long threshold,
        TimeSpan timeLimit,
        IScheduler s)
    {
        var events = source.Publish().RefCount();
        var count = events.Select(_ => 1)
                        .Merge(events.Select(_ => -1)
                                    .Delay(timeLimit, s));                                                   

        return count.Scan((x,y) => x + y)          
                    .Scan(new { Current = 0, Last = 0},
                          (x,y) => new { Current = y, Last = x.Current })                        
                    .Where(c => c.Current == threshold && c.Last < threshold)
                    .Select(_ => Unit.Default);                          
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top