Frage

Ich mag Sendedaten von einer Warteschlange mit Verdrehte. Ich zur Zeit einen Push-Hersteller verwenden, um die Warteschlange für Gegenstände und Schreiben auf den Transport abzufragen.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

Das Problem ist, dass die Daten sehr unregelmäßig gesendet werden, und wenn nur ein Element in der Warteschlange war, wird die Daten nie gesendet gehen werden. Es scheint, dass Verdrehte wartet, bis die Daten auf einen bestimmten Wert hat sich übertragen werden, bis er sie überträgt. Ist die Art, wie ich meinen Produzenten, den richtigen Weg umgesetzt? Kann ich Verdreht an Sendedaten Jetzt erzwingen?

Ich habe auch versucht, einen Pull Hersteller verwenden, aber Verdrehte ruft nicht die resumeProducing() Methode der es überhaupt nicht. Muss ich die resumeProducer() Methode von außen rufen, wenn ein Pull Produzent mit?

War es hilfreich?

Lösung

Es ist schwer zu sagen, warum Ihr Produzent funktioniert nicht gut, ohne ein komplettes Beispiel zu sehen (das heißt, ohne auch den Code zu sehen, dass es Register mit einem Verbraucher und dem Code, welche Elemente in diese Warteschlange setzt).

Allerdings ist ein Problem, das Sie werden wahrscheinlich ist, dass, wenn Ihre Warteschlange leeren , wenn resumeProducing genannt wird, dann werden Sie keine Bytes überhaupt an den Verbraucher schreiben. Und wenn Elemente in die Warteschlange gestellt werden, werden sie sitzen dort für immer, weil der Verbraucher nicht Ihre resumeProducing Methode wird wieder aufrufen.

Und diese verallgemeinern zu jedem anderen Fall, in dem die Warteschlange nicht genügend Daten darin hat, den Verbraucher Anruf pauseProducing auf Ihrem Hersteller zu verursachen. Als Push-Produzent, ist es Ihre Aufgabe, zu produzieren Daten auf eigene Faust fortzusetzen, bis der Verbraucher pauseProducing (oder stopProducing) nennt.

Für diesen speziellen Fall, die wahrscheinlich bedeutet, dass, wenn du gehst, etwas in dieser Warteschlange einzureihen - stop: Überprüfung, ob der Hersteller nicht angehalten ist, und wenn dies nicht der Fall, schreibt sie an den Verbraucher stattdessen . Nur Elemente in der Warteschlange gestellt, wenn der Hersteller angehalten wird.

Andere Tipps

Es gibt zwei mögliche Lösungen:

1) in regelmäßigen Abständen Ihre lokale Anwendung abfragen, um zu sehen, ob Sie zusätzliche Daten zu senden.

NB. Dies beruht auf einem periodischen async Rückruf vom deferLater Methode in verdrillt ist. Wenn Sie eine ansprechende Anwendung benötigen, die Daten auf Anfrage oder einen langer Laufsperrbetrieb sendet (z. B. ui, das seine eigene Ereignisschleife verwendet) ist es nicht geeignet sein kann.

Code:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) manuell im Auge behalten Protokoll-Instanzen und Verwendung reactor.callFromThread () von einem anderen Thread. Hier können Sie mit einem langen Sperrbetrieb in dem anderen Thread weg (zB. Ui Ereignisschleife).

Code:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

Ich persönlich die Tatsache, dass IPushProducer und IPullProducer einen periodischen Rückruf benötigen, macht sie weniger nützlich. Andere widersprechen ... Achselzucken . Nehmen Sie Ihre Auswahl.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top