質問
PythonアプリケーションでZeromQメッセージキューによって消費されるメモリの量を制限したいと思います。私はそれを設定することを知っています 高水マーク 送信者側でキューに入る金額は制限されますが、レシーバー側でどれだけのキューがキューになるかを制御する方法はありますか? Python Zeromqの結合は、無制限に設定されているようです。
私のテストシナリオ:テストに使用している2つのPython端子があります。 1つはレシーバーです。
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")
もう1つは送信者です:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
... print num
... socket.send(str(num))
... num = num + 1
...
走る socket.recv()
レシーバー側には数回、キューが機能することを確認しますが、それ以外は2つの端子をそこに座らせます。送信ループは決してブロックされないようで、受信プロンプトはメモリフットプリントの成長を遂げているようです。
解決
Zeromqの文書と矛盾して、 ハイウォーターマーク 両方に設定する必要があります PUSH
サイドと PULL
側。変更したら PULL
, 、それはうまく機能しました。新しい PULL
コードは次のとおりです。
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")
他のヒント
実際、ドキュメントはこれを言っています:
「ZMQ_PUSHソケットがすべての下流ノードの高い水マークに到達したため、または下流ノードがまったくない場合、ソケットのZMQ_SEND(3)操作は、例外的な状態が終了するまでブロックするか、少なくとも1つのダウンストリームノードが送信可能になります。メッセージは破棄されません。」
http://api.zeromq.org/2-1:zmq-socket
それは、下流ノードに高い水マークを設定できる(そして、それは別にプル)を設定できる(そしてすべき)、プッシュ側にそれを設定することは効果がないことを示唆していると述べています(ただし、それは真実ではないと思いますが、まだあります。ダウンストリームノードが利用可能であるが、メッセージを送信できるよりも速くメッセージが表示される場合。)
とともに zmq.SNDBUF
と zmq.RCVBUF
あなたができるオプション バッファサイズに制限を設定します.
また、私は使用しています zmq.CONFLATE
Zeromqキューサイズを1に制限するための受信者側のオプション:
ZMQの例は次のとおりです PUSH/PULL
:
送信者(zmq.PUSH
):
def create_pub_socket(ip, port):
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1)
zmq_address = "tcp://{}:{}".format(ip, port)
socket.connect(zmq_address)
return socket
except zmq.ZMQError as exp:
print(exp)
return False
sock = create_push_socket('127.0.0.1', 5558)
if sock:
sock.send_json({'a': 1})
ゲッター(zmq.PULL
):
def listen(self):
sock = None
try:
context = zmq.Context()
sock = context.socket(zmq.PULL)
sock.setsockopt(zmq.RCVHWM, 1)
sock.setsockopt(zmq.CONFLATE, 1) # last msg only.
sock.bind("tcp://*:5558")
except zmq.ZMQError:
logger.captureException()
configs = None
while configs is None:
if sock:
configs = sock.recv_json()
time.sleep(1e-1)
else:
time.sleep(5)
listen() # Recursive.
listen()