アルゴリズムを制限する良い率とは何ですか?
-
21-08-2019 - |
質問
私はいくつかの擬似コード、またはより良い、Pythonのを使用することができます。私は、PythonのIRCボットのためのレート制限キューを実装しようとしています、それは部分的に動作しますが、誰かが限界未満のメッセージをトリガした場合(例えば、レートリミットは8秒ごとに5つのメッセージであり、人はわずか4をトリガ)次のトリガは8秒(例えば、16秒後)を超えていると、ボットは、メッセージを送信しますが、キューがいっぱいになり、ボットが8秒の期間が経過したので、それが必要ではないですにもかかわらず、8秒間待機します。
解決
ここで最も単純なアルゴリズムに、あなたは彼らがあまりにも早く到着したときにだけメッセージを削除する場合(代わりに)キューが任意に大きいかもしれませんので、理にかなっている、それらをキューイングのます:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
何のデータ構造、タイマーなどは、このソリューションではありません、それはきれいに動作します:)これを確認するには、「手当」は8秒あたり最大5台で、すなわち、最大で秒間あたりの速度で5/8の単位を成長します。あなたはすべての8秒あたりつ以上のメッセージを送信することはできませんので、転送されたすべてのメッセージは、1つの単位が差し引かれます。
そのrate
、すなわち非ゼロの小数部なしで、整数でなければならない注意、またはアルゴリズムは、(実際のレートはrate/per
されない)が正しく動作しません。例えば。 rate=0.5; per=1.0;
が1.0に成長することはありませんので、allowance
は動作しません。しかしrate=1.0; per=2.0;
は正常に動作します。
他のヒント
キューに登録し、あなたの関数の前に、このデコレータ@RateLimited(ratepersec)を使用します。
は、基本的には1 /秒レートが前回から経過した場合、これは確認していない場合、それ以外の場合は待機しない、時間の残りを待ちます。これは、効果的にレート/秒にあなたを制限します。デコレータは、あなたはレート制限したい任意の関数に適用することができます。
あなたのケースでは、あなたが8秒ごとに5つのメッセージの最大をしたい場合は、あなたのsendToQueue関数の前に@RateLimited(0.625)を使用します。
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
Aトークンバケットを実装するために非常に簡単です。
5つのトークンバケットで開始します。
すべての5/8秒:バケットが5つの未満のトークンを持っている場合は、1を追加します。
あなたがメッセージを送信するたびに:バケットが≥1トークンを持っている場合は、1つのトークンを取り出し、メッセージを送信します。それ以外の場合は、メッセージをドロップ/待つ/どんなます。
(明らかに、実際のコードでは、あなたの代わりに実際のトークンの整数カウンタを使用したいとあなたがタイムスタンプを格納することによって、すべての5 / 8S工程を最適化することができる)
<時間> レート制限が完全に各8秒にリセットされた場合は、、もう一度質問を読んで、そして、ここに修正されます:
(エポックで、例えば)ずっと前時点で、タイムスタンプ、last_send
を開始します。また、同じ5-トークンバケットで開始します。
すべての5/8秒ルールをたたいています。
あなたがメッセージを送信するたびに:まず、もしlast_send
≥8秒前に確認してください。もしそうなら、(5つのトークンに設定)バケツを埋めます。バケット内のトークンがある場合には、第2、(/待機/などを落とし、そうでない場合。)メッセージを送信します。第三に、今にlast_send
を設定します。
これはそのシナリオのために働く必要があります。
<時間>私は実際にこのような戦略(最初のアプローチ)を使用して、IRCボットを書いています。そのPerlやPythonでないが、ここで説明するいくつかのコードがあります:
ここでの最初の部分は、バケットにトークンを追加処理します。
あなたは時間(最後の行に第二)に基づいてトークンを追加の最適化を見ることができ、その後、最後の行には、最大(MESSAGE_BURST)にバケットの内容をクランプ my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ connが周りに渡されるデータ構造です。これは、(それは次の時間は、それが何かを持っているだろうとき計算し、それが長いか、それがネットワークトラフィックを取得するまでのいずれかに眠る)、日常的に実行する方法の内側にあります。この方法の次の部分は、送信処理します。メッセージは、それに関連付けられた優先順位を持っているので、それは、かなり複雑である。
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
これはどんな実行されていない最初のキュー、です。それはなっても、私たちの接続が洪水のために殺されました。サーバーのPINGに応答するように、非常に重要なもののために使用されます。次に、キューの残ります:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
(;それは最初に、それはより多くの仕事を持っていますどのようにすぐに計算し、実際に後者の方法ではビット) この最後に、バケット状態は、$ CONNデータ構造に戻って保存されます
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
あなたが見ることができるように、コードを処理し、実際のバケットは非常に小さい - 約4行。コードの残りの部分は、プライオリティキューの処理です。例えば、それとのチャット誰かがその重要なキック/禁止業務をやってからそれを防ぐことができないように、ボットは、プライオリティキューを持っています。
メッセージを送信することができるまで、一層メッセージをキューイング、処理をブロックするように、アンティの美しい溶液はまた、このような修飾され得る:
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
十分な手当がメッセージを送信することがあるまで、それはただ待ちます。 2回の割合で起動しないように、手当も0で初期化されたことがあります。
最後の5行が送信されたことを時間をおいてください。 (倍の配列としてlast_fiveで)過去に少なくとも8秒です(存在する場合)まで第五最も最近のメッセージをキューに入れられたメッセージを保持します:
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
一つの解決策は、各キュー項目にタイムスタンプを添付すると、8秒が経過した後にアイテムを破棄することです。あなたは毎回、キューが追加され、このチェックを実行することができます。
あなたが5にキューのサイズを制限し、キューがいっぱいになっている間、任意の追加を破棄する場合は、これはのみ動作します。
誰かがまだ興味を持っている場合、私はIPあたりのリクエストレートを制限するための時限LRUキー値格納と一緒にこの単純な呼び出し可能なクラスを使用します。両端キューを使用しますが、代わりにリストで使用するように書き換えができます。
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
ただ、Pythonの実装ます。
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
これはどうます:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
私はスカラ座の変化を必要としていました。ここでは、次のとおりです。
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
ここではそれがどのように使用できるかです。
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}