我可以使用一些伪代码,或更好,Python的。我想实现的限速队列Python的IRC机器人,它部分作品,但如果有人比限制将触发少消息(例如,速率限制为每8个秒5级的消息,而人只触发4)和下一个触发是8秒(例如,16秒后),机器人发送消息,但队列变满和机器人等待8秒,尽管不是需要它由于8秒段已过过去。

有帮助吗?

解决方案

下面的简单的算法,如果你只想放弃邮件到达时速度太快(而不是排队他们,这是有道理的,因为队列可能会得到任意大):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

有没有数据结构,定时器等在该溶液中,它的工作原理干净地:)看到这一点,“津贴”生长在速度5/8单元每秒最多,即至多五个单位每8秒。被转发的每个消息中扣除一个单元,所以可以不是每个8秒每发送多于五个消息。

请注意rate应该没有非零小数部分的整数,即,或算法将无法正常工作(实际幅度不会rate/per)。例如。 rate=0.5; per=1.0;不起作用,因为allowance绝不会增长到1.0。但rate=1.0; per=2.0;正常工作。

其他提示

您函数入列之前,使用这个装饰@RateLimited(ratepersec)。

基本上,这种检查,如果1 /秒速率自上次时间过去了,如果没有,等待时间为剩余的时间,否则它不会等待。这有效地限制了你速度/秒。装饰器可以要速率限制被应用到任何功能。

在你的情况下,如果要每8个秒的最大5条消息,您的sendToQueue函数之前使用@RateLimited(0.625)。

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

一个令牌桶是相当简单的实现。

开始,带5级的令牌桶。

每5/8秒:如果桶具有小于5个标记,添加一个

您要发送邮件每次:如果桶中有≥1的道理,拿一个令牌并发送邮件。否则,等待/丢弃所述消息/不管。

(显然,在实际的代码,则最好使用一个整数计数器,而不是真正的令牌并且可以通过存储时间戳优化了每5 / 787-8步骤)


再次读的问题,如果速率极限被完全重置每个8秒,然后在这里是一个修饰:

启动与时间戳,last_send,在时间前不久(例如,在信号出现)。另外,开始以相同的5-令牌桶。

打击黑每5/8秒规则。

每个发送邮件时:首先,检查是否last_send≥8秒前。如果是这样,装满铲斗(将其设置为5个标记)。其次,如果有在铲斗是令牌,发送消息(否则,丢弃/等待/等)。三,设置last_send到现在。

这应该适用于该场景。


我已经实际写入使用这样的策略(第一种方法)的IRC机器人。其在Perl,而不是Python,但这里是一些代码来说明:

的第一部分在这里处理添加令牌桶。你可以看到基于时间(第二到最后一行)将令牌的优化,然后将最后一行夹具桶含量为最大(MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn是其周围通过的数据结构。这是一个运行常规的方法中(它计算在接下来的时间这将有事可做,并进入休眠状态,要么长或直到它得到的网络流量)。该方法的下一个部分处理发送。这是相当复杂的,因为邮件已与它们相关联的优先级。

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,这是跑不管。即使它得到我们的连接杀害泛滥。用于极为重要的东西,象响应服务器的PING。接着,对队列的其余部分:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,斗状态保存回$康恩数据结构(实际上是晚了一点的方法,它首先计算多久它就会有更多的工作)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

正如你所看到的,实际的桶处理代码是非常小 - 大约四行。代码的其余部分是优先级队列处理。僵尸有个优先级队列,以便例如,有人用它聊天不能阻止它做它的重要踢/禁止职责。

到块处理,直到该消息可被发送,从而排队进一步消息,安蒂美丽的溶液也可以被修改如下:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等待,直到足够的余量是有要发送的消息。不与两倍速率开始,津贴还可以初始化0。

请的是,最后五行被发送的时间。保持排队的消息,直到时间第五最近期的信息(如果存在)是一种至少8秒过去(具有last_five作为次阵列):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

一种解决方案是时间戳附加到每个队列项和后8秒钟后丢弃的项目。可以执行此每个队列被加到时间检查。

如果你限制队列的大小为5,丢弃任何补充,而队列已满这仅适用。

如果有人仍然有兴趣,我结合使用这个简单的可调用的类与定时LRU密钥值存储限制每个IP请求速率。使用双端队列,但可重写到与列表中使用来代替。

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

仅有Python实现的从接受的答案的码。

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

这样如何:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

我需要Scala中的一个的变化。在这里它是:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

下面是如何可以使用:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top