Domanda

I need to cap the number of events n permitted during a time period deltaT. Any approach I can think of, space is O(m), where m is the maximum number of eventrequests sent per deltaT, or O(deltaT/r), where r is an acceptable resolution.

Edit: deltaT is a sliding time window relative to the timestamp.

For instance: Keep a circular buffer of event timestamps. On event crop all earlier timestamps than t-deltaT. Deny event if the number of timestamps exceeds n. Add timestamp to the buffer.

Or, init a circular bucket buffer of integers of size deltaT/r indexed by time relative to the current with resolution r. Maintain pointer i. On event, increment i by time since last event divided by r. Zero the buffer between the original i and the new one. Increment at i. Deny, if the sum of the bugger exceeds n.

What's a better way?


I just implemented my second suggestion above in c# with a fixed deltaT of 1 s and a fixed resolution of 10 ms.

public class EventCap
{
    private const int RES = 10; //resolution in ms

    private int _max;
    private readonly int[] _tsBuffer;
    private int p = 0;
    private DateTime? _lastEventTime;
    private int _length = 1000 / RES;

    public EventCap(int max)
    {
        _max = max;

        _tsBuffer = new int[_length];
    }

    public EventCap()
    {
    }

    public bool Request(DateTime timeStamp)
    {
        if (_max <= 0)
            return true;

        if (!_lastEventTime.HasValue)
        {
            _lastEventTime = timeStamp;
            _tsBuffer[0] = 1;
            return true;
        }

        //A
        //Mutually redundant with B
        if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
        {
            _lastEventTime = timeStamp;
            Array.Clear(_tsBuffer, 0, _length);
            _tsBuffer[0] = 1;
            p = 0;
            return true;
        }

        var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;

        if (newP < _length)
            Array.Clear(_tsBuffer, p + 1, newP - p);

        else if (newP > p + _length)
        {
            //B
            //Mutually redundant with A
            Array.Clear(_tsBuffer, 0, _length);
        }
        else
        {
            Array.Clear(_tsBuffer, p + 1, _length - p - 1);
            Array.Clear(_tsBuffer, 0, newP % _length);
        }

        p = newP % _length;
        _tsBuffer[p]++;
        _lastEventTime = timeStamp;

        var sum = _tsBuffer.Sum();

        return sum <= 10;
    }
}
È stato utile?

Soluzione

What about having these variables: num_events_allowed, time_before, time_now, time_passed

At init time you will do: time_before = system.timer(), num_events_allowed = n

When an event is received you do the following:

  time_now = system.timer()
  time_passed = time_now - time_before
  time_before = time_now

  num_events_allowed += time_passed * (n / deltaT);

  if num_events_allowed > n 
      num_events_allowed = n

  if num_events_allowed >= 1
      let event through, num_events_allowed -= 1
  else
      ignore event

Whats nice about this algorithm is the num_events_allowed is actually incremented by the time that has passed since the last event and the rate of which events can be received, that way you get an incrementation of the number of events you can send per that time_passed in order to stay in the limit of n. So if you get an event too soon, you will increment it by less than 1, if its after too much time you will increment it by more than one. Of course if the event goes through you decrement the allowance by 1 as you just got an event. If the allowance passes the max events which is n , you return it back to n as you cant allow more than n in any time phase. If the allowance is less than 1, you cant send a whole event, dont let it through!

This is the leaky bucket algorithm: https://en.wikipedia.org/wiki/Leaky_bucket

Altri suggerimenti

One way to keep the sliding window and still have it O(1) + very small O(n) for each incoming request is to make a suitable sized array of ints and keep it as a circular buffer and discretize incoming requests (the requests as integrated as with the sampled levels as in a A/D-converter, or as a histogram if you are a statistican) and keep track of the sum of the circular buffer, like this

assumptions: 
"there can be no more than 1000 request per minute" and 
"we discretize on every second"

int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)

for each request:
    check if request-integrator < 1000 then
         // the following incs could be placed outside 
         // the if statement for saturation on to many
         // requests (punishment)
         request-integrator++                     // important
         discretizer-integrator++
         proceed with request

once every second:                    // in a transactional memory transaction, for God's saké 
    buffer-index++
    if (buffer-index = 60) then buffer-index=0    // for that circular buffer feeling!
    request-integrator -= buffer[buffer-index]    // clean for things happening one minute ago
    buffer[buffer-index] = discretizer-integrator // save this times value
    discretizer-integrator = 0                    // resetting for next sampling period

Note that the increase of the request-integrator "could" be done just once every second, but that leaves a hole open for saturating it with 1000 requests or worse in one second about once every minute depending on punishment behaviour.

While reading about the various possible solutions to the problem. I came across token bucket algorithm ( http://en.wikipedia.org/wiki/Token_bucket ). If i understand your question completely you can implement a token bucket algorithm without actually having a bucket with N tokens by instead taking an counter which can be incremented and decremented accordingly. like

syncronized def get_token = 
    if count>0 
       { --count, return true }
    else return false

syncronized def add_token = 
    if count==N
       return;
    else ++count

Now the remaining part is to call the add_token in deltaT/r time repetadly.

To make it completely threadsafe we would need an atomic reference to count. But the above code is to show basic idea of doing it in O(1) memory.

I wrote the class below (ActionQueue) to limit the frequency of function calls. One of the nice things is that it uses a timer to pop things off the queue... so the CPU is used minimally (or even not at all, if the queue is empty)... as opposed to any polling type of technique.

Example...

    // limit to two call every five seconds
    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(5), 2);
    public void Test()
    {
        for (var i = 0; i < 10; i++)
        {
            _actionQueue.Enqueue((i2) =>
            {
                Console.WriteLineAction " + i2 + ": " + DateTime.UtcNow);
            }, i);
        }
    }

Real world example...

    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(1), 10);

    public override void SendOrderCancelRequest(Order order, SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var parms = (Tuple<Order, SessionID>)state;
            base.SendOrderCancelRequest(parms.Item1, parms.Item2);
        }, new Tuple<Order, SessionID>(order, sessionID));
    }
    public override void SendOrderMassStatusRequest(SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var sessionID2 = (SessionID)state;
            base.SendOrderMassStatusRequest(sessionID2);
        }, sessionID);
    }

The actual class...

public class ActionQueue
{
    private class ActionState
    {
        public Action<object> Action;
        public object State;
        public ActionState(Action<object> action, object state)
        {
            Action = action;
            State = state;
        }
    }
    Queue<ActionState> _actions = new Queue<ActionState>();
    Queue<DateTime> _times = new Queue<DateTime>();

    TimeSpan _timeSpan;
    int _maxActions;
    public ActionQueue(TimeSpan timeSpan, int maxActions)
    {
        _timeSpan = timeSpan;
        _maxActions = maxActions;           
    }
    public void Enqueue(Action<object> action, object state)
    {
        lock (_times)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);

            if (_times.Count <= _maxActions)
                action(state);
            else
                _actions.Enqueue(new ActionState(action, state));

            CreateDequeueTimerIfNeeded();
        }
    }

    System.Threading.Timer _dequeueTimer;
    protected void CreateDequeueTimerIfNeeded()
    {
        // if we have no timer and we do have times, create a timer
        if (_dequeueTimer == null && _times.Count > 0) 
        {
            var timeSpan = _times.Peek() - DateTime.UtcNow;
            if (timeSpan.TotalSeconds <= 0)
            {
                HandleTimesQueueChange();
            }
            else
            {
                _dequeueTimer = new System.Threading.Timer((obj) =>
                {
                    lock (_times)
                    {
                        _dequeueTimer = null;
                        HandleTimesQueueChange();
                    }
                }, null, timeSpan, System.Threading.Timeout.InfiniteTimeSpan);
            }
        }
    }

    private void HandleTimesQueueChange()
    {
        _times.Dequeue();
        while (_times.Count > 0 && _times.Peek() < DateTime.UtcNow)
            _times.Dequeue();

        while (_actions.Count > 0 && _times.Count < _maxActions)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);
            var actionState = _actions.Dequeue();
            actionState.Action(actionState.State);
        }

        CreateDequeueTimerIfNeeded();
    }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top