什么是一个很好的限速算法?
-
21-08-2019 - |
题
我可以使用一些伪代码,或更好,Python的。我想实现的限速队列Python的IRC机器人,它部分作品,但如果有人比限制将触发少消息(例如,速率限制为每8个秒5级的消息,而人只触发4)和下一个触发是8秒(例如,16秒后),机器人发送消息,但队列变满和机器人等待8秒,尽管不是需要它由于8秒段已过过去。
解决方案
下面的简单的算法,如果你只想放弃邮件到达时速度太快(而不是排队他们,这是有道理的,因为队列可能会得到任意大):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
有没有数据结构,定时器等在该溶液中,它的工作原理干净地:)看到这一点,“津贴”生长在速度5/8单元每秒最多,即至多五个单位每8秒。被转发的每个消息中扣除一个单元,所以可以不是每个8秒每发送多于五个消息。
请注意rate
应该没有非零小数部分的整数,即,或算法将无法正常工作(实际幅度不会rate/per
)。例如。 rate=0.5; per=1.0;
不起作用,因为allowance
绝不会增长到1.0。但rate=1.0; per=2.0;
正常工作。
其他提示
您函数入列之前,使用这个装饰@RateLimited(ratepersec)。
基本上,这种检查,如果1 /秒速率自上次时间过去了,如果没有,等待时间为剩余的时间,否则它不会等待。这有效地限制了你速度/秒。装饰器可以要速率限制被应用到任何功能。
在你的情况下,如果要每8个秒的最大5条消息,您的sendToQueue函数之前使用@RateLimited(0.625)。
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
一个令牌桶是相当简单的实现。
开始,带5级的令牌桶。
每5/8秒:如果桶具有小于5个标记,添加一个
您要发送邮件每次:如果桶中有≥1的道理,拿一个令牌并发送邮件。否则,等待/丢弃所述消息/不管。
(显然,在实际的代码,则最好使用一个整数计数器,而不是真正的令牌并且可以通过存储时间戳优化了每5 / 787-8步骤)
再次读的问题,如果速率极限被完全重置每个8秒,然后在这里是一个修饰:
启动与时间戳,last_send
,在时间前不久(例如,在信号出现)。另外,开始以相同的5-令牌桶。
打击黑每5/8秒规则。
每个发送邮件时:首先,检查是否last_send
≥8秒前。如果是这样,装满铲斗(将其设置为5个标记)。其次,如果有在铲斗是令牌,发送消息(否则,丢弃/等待/等)。三,设置last_send
到现在。
这应该适用于该场景。
我已经实际写入使用这样的策略(第一种方法)的IRC机器人。其在Perl,而不是Python,但这里是一些代码来说明:
的第一部分在这里处理添加令牌桶。你可以看到基于时间(第二到最后一行)将令牌的优化,然后将最后一行夹具桶含量为最大(MESSAGE_BURST)
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ conn是其周围通过的数据结构。这是一个运行常规的方法中(它计算在接下来的时间这将有事可做,并进入休眠状态,要么长或直到它得到的网络流量)。该方法的下一个部分处理发送。这是相当复杂的,因为邮件已与它们相关联的优先级。
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
这是第一个队列,这是跑不管。即使它得到我们的连接杀害泛滥。用于极为重要的东西,象响应服务器的PING。接着,对队列的其余部分:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
最后,斗状态保存回$康恩数据结构(实际上是晚了一点的方法,它首先计算多久它就会有更多的工作)
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
正如你所看到的,实际的桶处理代码是非常小 - 大约四行。代码的其余部分是优先级队列处理。僵尸有个优先级队列,以便例如,有人用它聊天不能阻止它做它的重要踢/禁止职责。
到块处理,直到该消息可被发送,从而排队进一步消息,安蒂美丽的溶液也可以被修改如下:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
它只是等待,直到足够的余量是有要发送的消息。不与两倍速率开始,津贴还可以初始化0。
请的是,最后五行被发送的时间。保持排队的消息,直到时间第五最近期的信息(如果存在)是一种至少8秒过去(具有last_five作为次阵列):
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
一种解决方案是时间戳附加到每个队列项和后8秒钟后丢弃的项目。可以执行此每个队列被加到时间检查。
如果你限制队列的大小为5,丢弃任何补充,而队列已满这仅适用。
如果有人仍然有兴趣,我结合使用这个简单的可调用的类与定时LRU密钥值存储限制每个IP请求速率。使用双端队列,但可重写到与列表中使用来代替。
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
仅有Python实现的从接受的答案的码。
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
这样如何:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
我需要Scala中的一个的变化。在这里它是:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
下面是如何可以使用:
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}