Unregelmäßige Übertragung Problem mit Python Twisted-Push-Producer
-
04-10-2019 - |
Frage
Ich mag Sendedaten von einer Warteschlange mit Verdrehte. Ich zur Zeit einen Push-Hersteller verwenden, um die Warteschlange für Gegenstände und Schreiben auf den Transport abzufragen.
class Producer:
implements(interfaces.IPushProducer)
def __init__(self, protocol, queue):
self.queue = queue
self.protocol = protocol
def resumeProducing(self):
self.paused = False
while not self.paused:
try:
data = self.queue.get_nowait()
logger.debug("Transmitting: '%s'", repr(data))
data = cPickle.dumps(data)
self.protocol.transport.write(data + "\r\n")
except Empty:
pass
def pauseProducing(self):
logger.debug("Transmitter paused.")
self.paused = True
def stopProducing(self):
pass
Das Problem ist, dass die Daten sehr unregelmäßig gesendet werden, und wenn nur ein Element in der Warteschlange war, wird die Daten nie gesendet gehen werden. Es scheint, dass Verdrehte wartet, bis die Daten auf einen bestimmten Wert hat sich übertragen werden, bis er sie überträgt. Ist die Art, wie ich meinen Produzenten, den richtigen Weg umgesetzt? Kann ich Verdreht an Sendedaten Jetzt erzwingen?
Ich habe auch versucht, einen Pull Hersteller verwenden, aber Verdrehte ruft nicht die resumeProducing()
Methode der es überhaupt nicht. Muss ich die resumeProducer()
Methode von außen rufen, wenn ein Pull Produzent mit?
Lösung
Es ist schwer zu sagen, warum Ihr Produzent funktioniert nicht gut, ohne ein komplettes Beispiel zu sehen (das heißt, ohne auch den Code zu sehen, dass es Register mit einem Verbraucher und dem Code, welche Elemente in diese Warteschlange setzt).
Allerdings ist ein Problem, das Sie werden wahrscheinlich ist, dass, wenn Ihre Warteschlange leeren , wenn resumeProducing
genannt wird, dann werden Sie keine Bytes überhaupt an den Verbraucher schreiben. Und wenn Elemente in die Warteschlange gestellt werden, werden sie sitzen dort für immer, weil der Verbraucher nicht Ihre resumeProducing
Methode wird wieder aufrufen.
Und diese verallgemeinern zu jedem anderen Fall, in dem die Warteschlange nicht genügend Daten darin hat, den Verbraucher Anruf pauseProducing
auf Ihrem Hersteller zu verursachen. Als Push-Produzent, ist es Ihre Aufgabe, zu produzieren Daten auf eigene Faust fortzusetzen, bis der Verbraucher pauseProducing
(oder stopProducing
) nennt.
Für diesen speziellen Fall, die wahrscheinlich bedeutet, dass, wenn du gehst, etwas in dieser Warteschlange einzureihen - stop: Überprüfung, ob der Hersteller nicht angehalten ist, und wenn dies nicht der Fall, schreibt sie an den Verbraucher stattdessen . Nur Elemente in der Warteschlange gestellt, wenn der Hersteller angehalten wird.
Andere Tipps
Es gibt zwei mögliche Lösungen:
1) in regelmäßigen Abständen Ihre lokale Anwendung abfragen, um zu sehen, ob Sie zusätzliche Daten zu senden.
NB. Dies beruht auf einem periodischen async Rückruf vom deferLater Methode in verdrillt ist. Wenn Sie eine ansprechende Anwendung benötigen, die Daten auf Anfrage oder einen langer Laufsperrbetrieb sendet (z. B. ui, das seine eigene Ereignisschleife verwendet) ist es nicht geeignet sein kann.
Code:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time
# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
while True:
protocol.send(b"Hello World\n")
yield deferLater(reactor, 2, lambda: None)
# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
self.task = cooperate(gen)
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
try:
self.task.stop()
except:
pass
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
2) manuell im Auge behalten Protokoll-Instanzen und Verwendung reactor.callFromThread () von einem anderen Thread. Hier können Sie mit einem langen Sperrbetrieb in dem anderen Thread weg (zB. Ui Ereignisschleife).
Code:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading
# Connection
protocol = None
# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
def run(self):
while True:
print("Thread loop")
time.sleep(random.randint(0, 4))
if protocol is not None:
reactor.callFromThread(self.dispatch)
def dispatch(self):
global protocol
protocol.send("Hello World\n")
# Push protocol
class PushProtocol(Protocol):
def connectionMade(self):
global protocol
protocol = self
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def connectionLost(self, reason):
print 'Connection lost'
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Start thread
other = SomeThread()
other.start()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
Ich persönlich die Tatsache, dass IPushProducer und IPullProducer einen periodischen Rückruf benötigen, macht sie weniger nützlich. Andere widersprechen ... Achselzucken . Nehmen Sie Ihre Auswahl.