Irregolare Trasmissione Problema con Python ritorto spinta Produttore
-
04-10-2019 - |
Domanda
Voglio trasmettere dati da una coda utilizzando ritorto. Attualmente uso un produttore di spinta per il polling coda per gli elementi di scrittura e al trasporto.
class Producer:
implements(interfaces.IPushProducer)
def __init__(self, protocol, queue):
self.queue = queue
self.protocol = protocol
def resumeProducing(self):
self.paused = False
while not self.paused:
try:
data = self.queue.get_nowait()
logger.debug("Transmitting: '%s'", repr(data))
data = cPickle.dumps(data)
self.protocol.transport.write(data + "\r\n")
except Empty:
pass
def pauseProducing(self):
logger.debug("Transmitter paused.")
self.paused = True
def stopProducing(self):
pass
Il problema è che i dati sono inviati molto irregolare e se un solo elemento era in coda, i dati non è mai sta per essere inviato. Sembra che torto attende i dati da trasmettere è cresciuta fino a un valore specifico finché non trasmette. è il modo in cui ho implementato il mio produttore nel modo giusto? Posso forzare storte per i dati di trasmissione Ora
Ho provato anche con un produttore di tiro, ma ritorto non chiama il metodo resumeProducing()
di esso a tutti. Devo chiamare il metodo resumeProducer()
dall'esterno, quando si utilizza un produttore di tiro?
Soluzione
E 'difficile dire perché il vostro produttore non funziona bene senza vedere un esempio completo (che è, senza anche vedere il codice che registra con un consumatore e il codice che sta mettendo gli elementi in quella coda).
Tuttavia, un problema è probabile che tu hai è che se la coda è Vuoto quando resumeProducing
viene chiamato, allora si scriverà nessun byte a tutti per il consumatore. E quando gli elementi vengono messi in coda, faranno siedono lì per sempre, perché il consumatore non ha intenzione di chiamare di nuovo il metodo di resumeProducing
.
E questo generalizza a qualsiasi altro caso in cui la coda non ha abbastanza dati in esso ad indurre il consumatore a chiamata pauseProducing
sul produttore. Come produttore spinta, è il vostro lavoro per continuare a produrre i dati sul proprio fino a quando il consumatore chiama pauseProducing
(o stopProducing
).
Per questo caso particolare, che probabilmente significa che ogni volta che si sta andando a mettere qualcosa in quella coda - arresto: controllo per vedere se il produttore non sia in pausa, e se non lo è, scrivere al consumatore invece . Solo mettere elementi nella coda quando il produttore è in pausa.
Altri suggerimenti
Qui ci sono due possibili soluzioni:
1) periodicamente il polling l'applicazione locale per vedere se si dispone di dati supplementari da inviare.
NB. Questo si basa su un callback asincroni periodica dal metodo deferLater in contorta. Se avete bisogno di un'applicazione reattivo che invia i dati su richiesta, o un'operazione di lunga esecuzione di blocco (ad es. Interfaccia utente che utilizza il proprio ciclo di eventi) potrebbe non essere appropriato.
Codice:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time
# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
while True:
protocol.send(b"Hello World\n")
yield deferLater(reactor, 2, lambda: None)
# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
self.task = cooperate(gen)
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
try:
self.task.stop()
except:
pass
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
2) manualmente tenere traccia delle istanze di protocollo e l'uso reactor.callFromThread () da un thread diverso. Consente di ottenere via con un'operazione lunga bloccare in altro thread (ad es. Ciclo di eventi ui).
Codice:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading
# Connection
protocol = None
# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
def run(self):
while True:
print("Thread loop")
time.sleep(random.randint(0, 4))
if protocol is not None:
reactor.callFromThread(self.dispatch)
def dispatch(self):
global protocol
protocol.send("Hello World\n")
# Push protocol
class PushProtocol(Protocol):
def connectionMade(self):
global protocol
protocol = self
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def connectionLost(self, reason):
print 'Connection lost'
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Start thread
other = SomeThread()
other.start()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
Personalmente, trovo il fatto che IPushProducer e IPullProducer richiedono un callback periodica, li rende meno utile. Altri sono in disaccordo ... spallucce . Fate la vostra scelta.