문제

의사 코드나 더 나은 Python을 사용할 수 있습니다.Python IRC 봇에 대한 속도 제한 대기열을 구현하려고 하는데 부분적으로 작동하지만 누군가가 제한보다 적은 메시지를 트리거하는 경우(예: 속도 제한은 8초당 5개의 메시지이고 해당 사용자는 4개만 트리거합니다) 다음 트리거가 8초를 초과하는 경우(예: 16초 후) 봇은 메시지를 보내지만 대기열이 가득 차서 8초 기간이 경과했기 때문에 필요하지 않더라도 봇은 8초를 기다립니다.

도움이 되었습니까?

해결책

여기서는 가장 간단한 알고리즘, 메시지가 너무 빨리 도착할 때 메시지를 삭제하려는 경우(큐를 대기열에 넣는 대신 대기열이 임의로 커질 수 있으므로 의미가 있음):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

데이터 구조, 타이머 등이 ​​없습니다.이 솔루션에서는 깔끔하게 작동합니다 :) 이를 확인하려면 '허용'이 초당 최대 5/8 단위의 속도로 증가합니다.8초당 최대 5개 단위입니다.전달되는 모든 메시지는 하나의 단위를 차감하므로 8초마다 5개 이상의 메시지를 보낼 수 없습니다.

참고하세요 rate 정수여야 합니다. 즉,0이 아닌 소수 부분이 없으면 알고리즘이 올바르게 작동하지 않습니다(실제 환율은 rate/per).예: rate=0.5; per=1.0; 때문에 작동하지 않습니다 allowance 결코 1.0으로 성장하지 않을 것입니다.하지만 rate=1.0; per=2.0; 잘 작동합니다.

다른 팁

이 데코레이터 @ratelimited (ratepersec)를 사용하기 전에 queue를 사용하기 전에 사용하십시오.

기본적으로, 이것은 마지막 시간 이후 1/rate SEC가 통과되었는지 확인하고 그렇지 않은 경우 나머지 시간을 기다립니다. 그렇지 않으면 기다리지 않습니다. 이것은 효과적으로 당신이 평가/초로 제한합니다. 데코레이터는 속도 제한 원하는 기능에 적용 할 수 있습니다.

귀하의 경우 8 초당 최대 5 개의 메시지를 원할 경우 SendToqueue 기능 전에 @ratelimited (0.625)를 사용하십시오.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

토큰 버킷은 구현하기가 상당히 간단합니다.

5 개의 토큰이있는 버킷으로 시작하십시오.

5/8 초마다 : 버킷에 5 미만의 토큰이 있으면 하나를 추가하십시오.

메시지를 보내려고 할 때마다 : 버킷에 토큰이 ≥ 인 경우 토큰을 꺼내 메시지를 보내십시오. 그렇지 않으면 메시지를 기다리거나 떨어 뜨립니다.

(실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하고 타임 스탬프를 저장하여 5/8 초마다 최적화 할 수 있습니다).


질문을 다시 읽으면서, 요율 제한이 8 초마다 완전히 재설정되면 여기에 수정이 있습니다.

타임 스탬프로 시작하여 last_send, 오래 전에 (예를 들어, 에포크에서). 또한 동일한 5 톤 버킷으로 시작하십시오.

5/8 초마다 규칙을칩니다.

메시지를 보낼 때마다 : 먼저 last_send ≥ 8 초 전에. 그렇다면 버킷을 채우십시오 (5 개의 토큰으로 설정). 둘째, 버킷에 토큰이있는 경우 메시지를 보내십시오 (그렇지 않으면 드롭/대기/등). 셋째, 설정 last_send 지금까지.

그것은 그 시나리오에서 작동해야합니다.


나는 실제로 이와 같은 전략 (첫 번째 접근법)을 사용하여 IRC 봇을 작성했습니다. Python이 아닌 Perl에 있지만 설명 할 코드가 있습니다.

여기서 첫 번째 부분은 버킷에 토큰을 추가합니다. 시간 (2 ~ 마지막 줄)을 기준으로 토큰을 추가 한 다음 마지막 줄 클램프 버킷 내용물을 최대 값 (Message_burst)에 따라 최적화 할 수 있습니다.

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ Conn은 통과되는 데이터 구조입니다. 이것은 일상적으로 실행되는 메소드 안에 있습니다 (다음에 할 일이있을 때 계산하고 네트워크 트래픽이 길어질 때까지 잠을 자고 있습니다). 메소드의 다음 부분은 전송을 처리합니다. 메시지에는 우선 순위가 있기 때문에 다소 복잡합니다.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

그것이 첫 번째 줄입니다. 홍수를 위해 우리의 연결이 사망하더라도. 서버의 핑에 응답하는 것과 같이 매우 중요한 것들에 사용됩니다. 다음으로, 나머지 대기열 :

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

마지막으로 버킷 상태는 $ Conn 데이터 구조로 다시 저장됩니다 (실제로이 방법의 후반에는 먼저 더 많은 작업이 얼마나 빨리 있는지 계산합니다).

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

보시다시피, 실제 버킷 처리 코드는 매우 작습니다. 약 4 줄. 코드의 나머지 부분은 우선 순위 대기열 처리입니다. 봇은 우선 순위가있어 대기열이있어서 누군가와 채팅하는 누군가가 중요한 킥/금지 업무를 수행하지 못하게 할 수 없습니다.

메시지가 전송 될 때까지 처리를 차단하려면 추가 메시지를 대기하면 Antti의 아름다운 솔루션도 다음과 같이 수정 될 수 있습니다.

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

메시지를 보내기에 충분한 수당이있을 때까지 기다립니다. 비율의 두 배로 시작하지 않으려면 허용량도 0으로 초기화 될 수 있습니다.

마지막 5 줄이 전송 된 시간을 유지하십시오. 다섯 번째 로스크 메시지 (존재하는 경우)가 과거에 8 초가 될 때까지 대기하는 메시지를 보관하십시오 (Last_five와 함께) :

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

한 가지 해결책은 각 대기열 항목에 타임 스탬프를 연결하고 8 초가 지나간 후에 항목을 버리는 것입니다. 대기열이 추가 될 때 마다이 점검을 수행 할 수 있습니다.

이것은 큐 크기를 5로 제한하고 큐가 가득 찬 상태에서 추가를 폐기하는 경우에만 작동합니다.

여전히 관심이있는 사람이라면,이 간단한 호출 가능한 클래스를 사용하여 시간이 지정된 LRU 키 값 저장소를 사용하여 IP 당 요청 속도를 제한합니다. Deque를 사용하지만 대신 목록과 함께 사용할 수 있도록 다시 작성할 수 있습니다.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

허용 된 답변에서 코드의 파이썬 구현.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

이건 어때:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

스칼라의 변형이 필요했습니다. 여기있어:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

사용 방법은 다음과 같습니다.

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top