Нерегулярная проблема передачи с витой Python Push Push
-
04-10-2019 - |
Вопрос
Я хочу передавать данные из очереди, используя скрученные. В настоящее время я использую Push-производителе для опроса очереди элементов и записывать на транспорт.
class Producer:
implements(interfaces.IPushProducer)
def __init__(self, protocol, queue):
self.queue = queue
self.protocol = protocol
def resumeProducing(self):
self.paused = False
while not self.paused:
try:
data = self.queue.get_nowait()
logger.debug("Transmitting: '%s'", repr(data))
data = cPickle.dumps(data)
self.protocol.transport.write(data + "\r\n")
except Empty:
pass
def pauseProducing(self):
logger.debug("Transmitter paused.")
self.paused = True
def stopProducing(self):
pass
Проблема в том, что данные отправляются очень нерегулярно, и если в очереди был только один элемент, данные никогда не будут отправлены. Похоже, что скрученные ждет, пока данные будут переданы, не выросли до конкретного значения, пока он не передает его. Так, как я реализовал мой продюсер правильно? Могу ли я заставить скручиваться для передачи данных сейчас?
Я также пытался использовать тягущий производитель, но скрученные не называют resumeProducing()
Способ этого вообще. Должен ли я позвонить resumeProducer()
Способ снаружи, при использовании вытягивающего производителя?
Решение
Трудно сказать, почему ваш продюсер не работает хорошо, не увидев полный пример (то есть, не видя кода, который регистрирует его с потребителем и кодом, который представляет собой предметы в эту очередь).
Тем не менее, одна проблема, вероятно, имела, это то, что если ваша очередь пустой когда resumeProducing
называется, тогда вы не будете писать байты вообще для потребителя. И когда предметы вставляются в очередь, они будут сидеть там навсегда, потому что потребитель не позвонит вам resumeProducing
метод снова.
И это обобщает любой другой случай, когда очередь не имеет достаточных данных в нем, чтобы потребителя позвонить pauseProducing
на вашем продюсере. Как толкающий производитель, это ваша работа, чтобы продолжать производить данные, до тех пор, пока потребительские звонки pauseProducing
(или stopProducing
).
Для этого конкретного случая, что, вероятно, означает, что всякий раз, когда вы собираетесь что-то поставить в эту очередь - Стоп: проверьте, если продюсер не приостановлен, а если это нет, напишите ему потребителю вместо. Отказ Только поставить предметы в очередь, когда производитель приостановлен.
Другие советы
Вот два возможных решения:
1) Периодически опрос ваше локальное приложение, чтобы увидеть, есть ли у вас дополнительные данные для отправки.
Прис. Это опирается на периодический async обратный вызов от метода Deferlater в скрученном. Если вам нужно отзывчивое приложение, которое отправляет данные по запросу или длительной работе блокировки (например, UI, который использует свой собственный контур событий), это может быть не подходящим.
Код:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time
# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
while True:
protocol.send(b"Hello World\n")
yield deferLater(reactor, 2, lambda: None)
# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):
def connectionMade(self):
self.transport.registerProducer(self, True)
gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
self.task = cooperate(gen)
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def pauseProducing(self):
print 'Workload paused'
self.task.pause()
def resumeProducing(self):
print 'Workload resumed'
self.task.resume()
def stopProducing(self):
print 'Workload stopped'
self.task.stop()
def connectionLost(self, reason):
print 'Connection lost'
try:
self.task.stop()
except:
pass
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
2) Вручную отслеживать экземпляры протоколов и использовать Reactor.callFromThread () из другого потока. Позволяет уйти с длительной работой блокировки в другой ните (например, контур событий UI).
Код:
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading
# Connection
protocol = None
# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
def run(self):
while True:
print("Thread loop")
time.sleep(random.randint(0, 4))
if protocol is not None:
reactor.callFromThread(self.dispatch)
def dispatch(self):
global protocol
protocol.send("Hello World\n")
# Push protocol
class PushProtocol(Protocol):
def connectionMade(self):
global protocol
protocol = self
def dataReceived(self, data):
self.transport.write(data)
def send(self, data):
self.transport.write(data)
def connectionLost(self, reason):
print 'Connection lost'
# Push factory
class PushFactory(Factory):
def buildProtocol(self, addr):
return PushProtocol()
# Start thread
other = SomeThread()
other.start()
# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()
Лично я нахожу тот факт, что iPushProducer и IpullProducter требуют периодического обратного вызова, делает их менее полезными. Другие не согласны ... пожимать. Отказ Выбирайте.