좋은 속도 제한 알고리즘은 무엇입니까?
-
21-08-2019 - |
문제
의사 코드나 더 나은 Python을 사용할 수 있습니다.Python IRC 봇에 대한 속도 제한 대기열을 구현하려고 하는데 부분적으로 작동하지만 누군가가 제한보다 적은 메시지를 트리거하는 경우(예: 속도 제한은 8초당 5개의 메시지이고 해당 사용자는 4개만 트리거합니다) 다음 트리거가 8초를 초과하는 경우(예: 16초 후) 봇은 메시지를 보내지만 대기열이 가득 차서 8초 기간이 경과했기 때문에 필요하지 않더라도 봇은 8초를 기다립니다.
해결책
여기서는 가장 간단한 알고리즘, 메시지가 너무 빨리 도착할 때 메시지를 삭제하려는 경우(큐를 대기열에 넣는 대신 대기열이 임의로 커질 수 있으므로 의미가 있음):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
데이터 구조, 타이머 등이 없습니다.이 솔루션에서는 깔끔하게 작동합니다 :) 이를 확인하려면 '허용'이 초당 최대 5/8 단위의 속도로 증가합니다.8초당 최대 5개 단위입니다.전달되는 모든 메시지는 하나의 단위를 차감하므로 8초마다 5개 이상의 메시지를 보낼 수 없습니다.
참고하세요 rate
정수여야 합니다. 즉,0이 아닌 소수 부분이 없으면 알고리즘이 올바르게 작동하지 않습니다(실제 환율은 rate/per
).예: rate=0.5; per=1.0;
때문에 작동하지 않습니다 allowance
결코 1.0으로 성장하지 않을 것입니다.하지만 rate=1.0; per=2.0;
잘 작동합니다.
다른 팁
이 데코레이터 @ratelimited (ratepersec)를 사용하기 전에 queue를 사용하기 전에 사용하십시오.
기본적으로, 이것은 마지막 시간 이후 1/rate SEC가 통과되었는지 확인하고 그렇지 않은 경우 나머지 시간을 기다립니다. 그렇지 않으면 기다리지 않습니다. 이것은 효과적으로 당신이 평가/초로 제한합니다. 데코레이터는 속도 제한 원하는 기능에 적용 할 수 있습니다.
귀하의 경우 8 초당 최대 5 개의 메시지를 원할 경우 SendToqueue 기능 전에 @ratelimited (0.625)를 사용하십시오.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
토큰 버킷은 구현하기가 상당히 간단합니다.
5 개의 토큰이있는 버킷으로 시작하십시오.
5/8 초마다 : 버킷에 5 미만의 토큰이 있으면 하나를 추가하십시오.
메시지를 보내려고 할 때마다 : 버킷에 토큰이 ≥ 인 경우 토큰을 꺼내 메시지를 보내십시오. 그렇지 않으면 메시지를 기다리거나 떨어 뜨립니다.
(실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하고 타임 스탬프를 저장하여 5/8 초마다 최적화 할 수 있습니다).
질문을 다시 읽으면서, 요율 제한이 8 초마다 완전히 재설정되면 여기에 수정이 있습니다.
타임 스탬프로 시작하여 last_send
, 오래 전에 (예를 들어, 에포크에서). 또한 동일한 5 톤 버킷으로 시작하십시오.
5/8 초마다 규칙을칩니다.
메시지를 보낼 때마다 : 먼저 last_send
≥ 8 초 전에. 그렇다면 버킷을 채우십시오 (5 개의 토큰으로 설정). 둘째, 버킷에 토큰이있는 경우 메시지를 보내십시오 (그렇지 않으면 드롭/대기/등). 셋째, 설정 last_send
지금까지.
그것은 그 시나리오에서 작동해야합니다.
나는 실제로 이와 같은 전략 (첫 번째 접근법)을 사용하여 IRC 봇을 작성했습니다. Python이 아닌 Perl에 있지만 설명 할 코드가 있습니다.
여기서 첫 번째 부분은 버킷에 토큰을 추가합니다. 시간 (2 ~ 마지막 줄)을 기준으로 토큰을 추가 한 다음 마지막 줄 클램프 버킷 내용물을 최대 값 (Message_burst)에 따라 최적화 할 수 있습니다.
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ Conn은 통과되는 데이터 구조입니다. 이것은 일상적으로 실행되는 메소드 안에 있습니다 (다음에 할 일이있을 때 계산하고 네트워크 트래픽이 길어질 때까지 잠을 자고 있습니다). 메소드의 다음 부분은 전송을 처리합니다. 메시지에는 우선 순위가 있기 때문에 다소 복잡합니다.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
그것이 첫 번째 줄입니다. 홍수를 위해 우리의 연결이 사망하더라도. 서버의 핑에 응답하는 것과 같이 매우 중요한 것들에 사용됩니다. 다음으로, 나머지 대기열 :
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
마지막으로 버킷 상태는 $ Conn 데이터 구조로 다시 저장됩니다 (실제로이 방법의 후반에는 먼저 더 많은 작업이 얼마나 빨리 있는지 계산합니다).
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
보시다시피, 실제 버킷 처리 코드는 매우 작습니다. 약 4 줄. 코드의 나머지 부분은 우선 순위 대기열 처리입니다. 봇은 우선 순위가있어 대기열이있어서 누군가와 채팅하는 누군가가 중요한 킥/금지 업무를 수행하지 못하게 할 수 없습니다.
메시지가 전송 될 때까지 처리를 차단하려면 추가 메시지를 대기하면 Antti의 아름다운 솔루션도 다음과 같이 수정 될 수 있습니다.
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
메시지를 보내기에 충분한 수당이있을 때까지 기다립니다. 비율의 두 배로 시작하지 않으려면 허용량도 0으로 초기화 될 수 있습니다.
마지막 5 줄이 전송 된 시간을 유지하십시오. 다섯 번째 로스크 메시지 (존재하는 경우)가 과거에 8 초가 될 때까지 대기하는 메시지를 보관하십시오 (Last_five와 함께) :
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
한 가지 해결책은 각 대기열 항목에 타임 스탬프를 연결하고 8 초가 지나간 후에 항목을 버리는 것입니다. 대기열이 추가 될 때 마다이 점검을 수행 할 수 있습니다.
이것은 큐 크기를 5로 제한하고 큐가 가득 찬 상태에서 추가를 폐기하는 경우에만 작동합니다.
여전히 관심이있는 사람이라면,이 간단한 호출 가능한 클래스를 사용하여 시간이 지정된 LRU 키 값 저장소를 사용하여 IP 당 요청 속도를 제한합니다. Deque를 사용하지만 대신 목록과 함께 사용할 수 있도록 다시 작성할 수 있습니다.
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
허용 된 답변에서 코드의 파이썬 구현.
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
이건 어때:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
스칼라의 변형이 필요했습니다. 여기있어:
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
사용 방법은 다음과 같습니다.
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}