Pergunta

Eu poderia usar alguma pseudo-código, ou melhor, Python. Eu estou tentando implementar uma fila de limitação de taxa para um bot Python IRC, e parcialmente funciona, mas se alguém acionar menos mensagens do que o limite (por exemplo, limite de taxa é de 5 mensagens por 8 segundos, e os gatilhos pessoa só 4), e no próximo gatilho é ao longo dos 8 segundos (por exemplo, 16 segundos mais tarde), o bot envia a mensagem, mas a fila ficar cheia eo bot espera 8 segundos, mesmo que não é necessária uma vez que o segundo período de 8 expirou.

Foi útil?

Solução

Aqui a mais simples algoritmo , se você quiser apenas a deixar mensagens quando chegam muito rapidamente ( em vez de fila-los, o que faz sentido porque a fila pode ter arbitrariamente grande):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

Não há estruturas de dados, temporizadores etc. nesta solução e funciona de forma limpa :) Para ver isto, 'subsídio' cresce a uma velocidade de 5/8 unidades por segundos, no máximo, ou seja, no máximo, cinco unidades por oito segundos. Cada mensagem que é enviada deduz uma unidade, então você não pode enviar mais de cinco mensagens por cada oito segundos.

Note que rate deve ser um inteiro, ou seja, sem zero, não decimal parte, ou o algoritmo não funcionará corretamente (taxa real não será rate/per). Por exemplo. rate=0.5; per=1.0; não funciona porque allowance nunca vai crescer para 1,0. Mas rate=1.0; per=2.0; funciona bem.

Outras dicas

Use este decorador @RateLimited (ratepersec) antes de sua função que enfileira.

Basicamente, este verifica se seg 1 / taxa passaram desde a última vez e se não, aguarda o restante do tempo, caso contrário ele não espera. Isto efetivamente limita a taxa / seg. O decorador pode ser aplicado a qualquer função que deseja limitado de taxa.

No seu caso, se você quer um máximo de 5 mensagens por 8 segundos, o uso @RateLimited (0,625) antes de sua função sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

A Token Bucket é bastante simples de implementar.

Comece com um balde com 5 tokens.

A cada 5/8 segundos:. Se o balde com menos de 5 fichas, adicione um

Cada vez que você quiser enviar uma mensagem: Se o balde tem =1 forma, tomar um fora token e enviar a mensagem. Caso contrário, espera / largar a mensagem / whatever.

(obviamente, no código atual, você pode usar um contador inteiro em vez de fichas reais e você pode otimizar o passo a cada 5 / 8s, armazenando timestamps)


Lendo a pergunta novamente, se o limite de taxa é redefinir completamente a cada 8 segundos, então aqui é uma modificação:

Comece com um timestamp, last_send, em um momento há muito tempo (por exemplo, na época). Além disso, começar com o mesmo balde de 5 token.

Greve governar os a cada 5/8 segundos.

Cada vez que você enviar uma mensagem: Primeiro, verifique se last_send = 8 segundos atrás. Se assim for, encher o balde (configurá-lo para 5 fichas). Em segundo lugar, se há fichas no balde, envie a mensagem (caso contrário, gota / wait / etc.). Em terceiro lugar, conjunto last_send agora.

Isso deve funcionar para esse cenário.


Eu realmente escrito um bot IRC usando uma estratégia como esta (a primeira abordagem). Sua em Perl, Python não, mas aqui é algum código para ilustrar:

A primeira parte aqui lida com a adição de fichas para o balde. Você pode ver a otimização da adição de fichas com base no tempo (2 a última linha) e, em seguida, os últimos grampos de linha balde conteúdo ao máximo (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn é uma estrutura de dados que é passado ao redor. Isso está dentro de um método que é executado rotineiramente (ele calcula quando a próxima vez que ele vai ter algo para fazer, e dorme quer tanto tempo ou até que ele recebe o tráfego de rede). A próxima parte do método manipula o envio. É um pouco complicado, porque as mensagens têm prioridades que lhes estão associados.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Essa é a primeira fila, que é executado, não importa o quê. Mesmo se ele fica a nossa ligação mortos por inundações. Usado para coisas extremamente importantes, como responder ao PING do servidor. Em seguida, o resto das filas:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

volta Finalmente, o status de balde é guardado para a estrutura de dados conn $ (na verdade, um pouco mais tarde no método; primeiro ele calcula quanto tempo ele vai ter mais trabalho)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Como você pode ver, o código de manipulação de balde real é muito pequena - cerca de quatro linhas. O resto do código é a manipulação fila de prioridade. O bot tem filas de prioridade de modo que, por exemplo, alguém conversando com ele não pode impedi-lo de fazer seus deveres / ban importante pontapé.

para bloquear o processamento até que a mensagem pode ser enviada, filas, assim, até mais mensagens, bela solução de Antti também pode ser modificado assim:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

ele só espera até provisão suficiente está lá para enviar a mensagem. para não começar com duas vezes a taxa, o subsídio pode também inicializado com 0.

Mantenha o tempo que os últimos cinco linhas foram enviados. Mantenha as mensagens na fila até o momento o quinto mais-recente mensagem (se existir) é um menos 8 segundos no passado (com last_five como uma matriz de vezes):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

Uma solução é anexar um timestamp a cada item da fila e para descartar o item após 8 segundos se passaram. Você pode realizar essa verificação cada vez que a fila é adicionada.

Isso só funciona se você limitar o tamanho da fila para 5 e descartar quaisquer acréscimos, enquanto a fila está cheia.

Se alguém ainda está interessado, eu usar essa classe pode ser chamado simples em conjunto com um armazenamento de valor da chave LRU cronometrado para limitar a taxa de solicitação por IP. Usa um deque, mas pode reescrito para ser usado com uma lista em seu lugar.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

Apenas uma implementação python de um código de resposta aceita.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

Como sobre isto:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

Eu precisava de uma variação no Scala. Aqui está:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Aqui está como ele pode ser usado:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top