zeromq Persistenz Muster
-
27-09-2019 - |
Frage
Wer muss verwaltet die persistent im ZeroMQ?
Wenn wir die ZeroMQ Kunden in Python-Sprache verwenden, was ist die Plug-In / Module zur Verfügung, die persistent zu verwalten?
Ich möchte die Muster kennen, um die ZeroMQ zu verwenden.
Lösung
Soweit ich weiß, Zeromq hat keine Ausdauer. Es ist aus Spielraum für sie und Bedürfnisse durch den Endverbraucher behandelt werden. Genau wie die Nachricht Serialisierung. In C # habe ich verwendet db4o Persistenz hinzuzufügen. Typischerweise bestehen ich das Objekt im Rohzustand, dann ist es serialisiert und an ZMQ Buchse senden. Btw, war dies für PUB / SUB Paar.
Andere Tipps
Auf den Anwendungsenden Sie entsprechend fortbestehen kann, zum Beispiel ich eine persistance Schicht in node.js aufgebaut haben, die Back-End-PHP-Anrufe kommuniziert und über WebSockets.
Der persistance Aspekt gehalten Nachrichten für einen bestimmten Zeitraum (http://en.wikipedia.org/wiki/Time_to_live) war dies die Kunden eine Chance zu geben, zu verbinden. Ich verwendete Datenstrukturen im Speicher, aber ich spielte mit dem Gedanken redis der Verwendung von On-Disk-persistance zu gewinnen.
Wir mussten die empfangenen Nachrichten von einem Teilnehmer bestehen, bevor sie verarbeitet werden. Die Nachrichten werden in einem eigenen Thread und gespeichert auf der Festplatte erhalten, während die beibehaltenen Nachrichtenwarteschlangen im Hauptthread manipuliert werden.
Das Modul ist verfügbar unter: https://pypi.org/project/persizmq . Aus der Dokumentation:
import pathlib
import zmq
import persizmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)
def on_exception(exception: Exception)->None:
print("an exception in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(
callback=storage.add_message, subscriber=subscriber,
on_exception=on_exception):
msg = storage.front() # non-blocking
if msg is not None:
print("Received a persistent message: {}".format(msg))
storage.pop_front()