Domanda

Voglio trasmettere dati da una coda utilizzando ritorto. Attualmente uso un produttore di spinta per il polling coda per gli elementi di scrittura e al trasporto.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

Il problema è che i dati sono inviati molto irregolare e se un solo elemento era in coda, i dati non è mai sta per essere inviato. Sembra che torto attende i dati da trasmettere è cresciuta fino a un valore specifico finché non trasmette. è il modo in cui ho implementato il mio produttore nel modo giusto? Posso forzare storte per i dati di trasmissione Ora

Ho provato anche con un produttore di tiro, ma ritorto non chiama il metodo resumeProducing() di esso a tutti. Devo chiamare il metodo resumeProducer() dall'esterno, quando si utilizza un produttore di tiro?

È stato utile?

Soluzione

E 'difficile dire perché il vostro produttore non funziona bene senza vedere un esempio completo (che è, senza anche vedere il codice che registra con un consumatore e il codice che sta mettendo gli elementi in quella coda).

Tuttavia, un problema è probabile che tu hai è che se la coda è Vuoto quando resumeProducing viene chiamato, allora si scriverà nessun byte a tutti per il consumatore. E quando gli elementi vengono messi in coda, faranno siedono lì per sempre, perché il consumatore non ha intenzione di chiamare di nuovo il metodo di resumeProducing.

E questo generalizza a qualsiasi altro caso in cui la coda non ha abbastanza dati in esso ad indurre il consumatore a chiamata pauseProducing sul produttore. Come produttore spinta, è il vostro lavoro per continuare a produrre i dati sul proprio fino a quando il consumatore chiama pauseProducing (o stopProducing).

Per questo caso particolare, che probabilmente significa che ogni volta che si sta andando a mettere qualcosa in quella coda - arresto: controllo per vedere se il produttore non sia in pausa, e se non lo è, scrivere al consumatore invece . Solo mettere elementi nella coda quando il produttore è in pausa.

Altri suggerimenti

Qui ci sono due possibili soluzioni:

1) periodicamente il polling l'applicazione locale per vedere se si dispone di dati supplementari da inviare.

NB. Questo si basa su un callback asincroni periodica dal metodo deferLater in contorta. Se avete bisogno di un'applicazione reattivo che invia i dati su richiesta, o un'operazione di lunga esecuzione di blocco (ad es. Interfaccia utente che utilizza il proprio ciclo di eventi) potrebbe non essere appropriato.

Codice:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) manualmente tenere traccia delle istanze di protocollo e l'uso reactor.callFromThread () da un thread diverso. Consente di ottenere via con un'operazione lunga bloccare in altro thread (ad es. Ciclo di eventi ui).

Codice:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

Personalmente, trovo il fatto che IPushProducer e IPullProducer richiedono un callback periodica, li rende meno utile. Altri sono in disaccordo ... spallucce . Fate la vostra scelta.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top