Нерегулярная проблема передачи с витой Python Push Push

StackOverflow https://stackoverflow.com/questions/3743662

Вопрос

Я хочу передавать данные из очереди, используя скрученные. В настоящее время я использую Push-производителе для опроса очереди элементов и записывать на транспорт.

class Producer:

    implements(interfaces.IPushProducer)

    def __init__(self, protocol, queue):
        self.queue = queue
        self.protocol = protocol

    def resumeProducing(self):
        self.paused = False
        while not self.paused:
            try:
                data = self.queue.get_nowait()
                logger.debug("Transmitting: '%s'", repr(data))
                data = cPickle.dumps(data)
                self.protocol.transport.write(data + "\r\n")
            except Empty:
                pass

    def pauseProducing(self):
        logger.debug("Transmitter paused.")
        self.paused = True

    def stopProducing(self):
        pass

Проблема в том, что данные отправляются очень нерегулярно, и если в очереди был только один элемент, данные никогда не будут отправлены. Похоже, что скрученные ждет, пока данные будут переданы, не выросли до конкретного значения, пока он не передает его. Так, как я реализовал мой продюсер правильно? Могу ли я заставить скручиваться для передачи данных сейчас?

Я также пытался использовать тягущий производитель, но скрученные не называют resumeProducing() Способ этого вообще. Должен ли я позвонить resumeProducer() Способ снаружи, при использовании вытягивающего производителя?

Это было полезно?

Решение

Трудно сказать, почему ваш продюсер не работает хорошо, не увидев полный пример (то есть, не видя кода, который регистрирует его с потребителем и кодом, который представляет собой предметы в эту очередь).

Тем не менее, одна проблема, вероятно, имела, это то, что если ваша очередь пустой когда resumeProducing называется, тогда вы не будете писать байты вообще для потребителя. И когда предметы вставляются в очередь, они будут сидеть там навсегда, потому что потребитель не позвонит вам resumeProducing метод снова.

И это обобщает любой другой случай, когда очередь не имеет достаточных данных в нем, чтобы потребителя позвонить pauseProducing на вашем продюсере. Как толкающий производитель, это ваша работа, чтобы продолжать производить данные, до тех пор, пока потребительские звонки pauseProducing (или stopProducing).

Для этого конкретного случая, что, вероятно, означает, что всякий раз, когда вы собираетесь что-то поставить в эту очередь - Стоп: проверьте, если продюсер не приостановлен, а если это нет, напишите ему потребителю вместо. Отказ Только поставить предметы в очередь, когда производитель приостановлен.

Другие советы

Вот два возможных решения:

1) Периодически опрос ваше локальное приложение, чтобы увидеть, есть ли у вас дополнительные данные для отправки.

Прис. Это опирается на периодический async обратный вызов от метода Deferlater в скрученном. Если вам нужно отзывчивое приложение, которое отправляет данные по запросу или длительной работе блокировки (например, UI, который использует свой собственный контур событий), это может быть не подходящим.

Код:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from zope.interface import implementer
import time

# Deferred action
def periodically_poll_for_push_actions_async(reactor, protocol):
  while True:
    protocol.send(b"Hello World\n")
    yield deferLater(reactor, 2, lambda: None)

# Push protocol
@implementer(IPushProducer)
class PushProtocol(Protocol):

   def connectionMade(self):
     self.transport.registerProducer(self, True)
     gen = periodically_poll_for_push_actions_async(self.transport.reactor, self)
     self.task = cooperate(gen)

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def pauseProducing(self):
     print 'Workload paused'
     self.task.pause()

   def resumeProducing(self):
     print 'Workload resumed'
     self.task.resume()

   def stopProducing(self):
     print 'Workload stopped'
     self.task.stop()

   def connectionLost(self, reason):
     print 'Connection lost'
     try:
       self.task.stop()
     except:
       pass

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

2) Вручную отслеживать экземпляры протоколов и использовать Reactor.callFromThread () из другого потока. Позволяет уйти с длительной работой блокировки в другой ните (например, контур событий UI).

Код:

from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.interfaces import IPushProducer
from twisted.internet.task import deferLater, cooperate
from twisted.internet.protocol import Protocol
from twisted.internet import reactor, threads
import time
import random
import threading

# Connection
protocol = None

# Some other thread that does whatever it likes.
class SomeThread(threading.Thread):
  def run(self):
    while True:
      print("Thread loop")
      time.sleep(random.randint(0, 4))
      if protocol is not None:
        reactor.callFromThread(self.dispatch)
  def dispatch(self):
    global protocol
    protocol.send("Hello World\n")

# Push protocol
class PushProtocol(Protocol):

   def connectionMade(self):
     global protocol
     protocol = self

   def dataReceived(self, data):
     self.transport.write(data)

   def send(self, data):
     self.transport.write(data)

   def connectionLost(self, reason):
     print 'Connection lost'

# Push factory
class PushFactory(Factory):
  def buildProtocol(self, addr):
    return PushProtocol()

# Start thread
other = SomeThread()
other.start()

# Run the reactor that serves everything
endpoint = TCP4ServerEndpoint(reactor, 8089)
endpoint.listen(PushFactory())
reactor.run()

Лично я нахожу тот факт, что iPushProducer и IpullProducter требуют периодического обратного вызова, делает их менее полезными. Другие не согласны ... пожимать. Отказ Выбирайте.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top