Скрученный сетевой клиент с многопроцессорными работниками?

StackOverflow https://stackoverflow.com/questions/1470850

  •  16-09-2019
  •  | 
  •  

Вопрос

Итак, у меня есть приложение, которое использует Twisted + Stomper в качестве клиента STOMP, который передает работу многопроцессорному пулу рабочих.

Кажется, это работает нормально, когда я просто использую скрипт Python для запуска, который (упрощенно) выглядит примерно так:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

Поскольку это упаковывается для развертывания, я решил воспользоваться сценарием Twistd и запустить его из файла TAC.

Вот мой очень похожий тактический файл:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

Для иллюстрации я свернул или изменил некоторые детали;надеюсь, они не были сутью проблемы.Например, в моем приложении есть система плагинов, пул инициализируется отдельным методом, а затем работа делегируется пулу с помощью Pool.apply_async(), передавая один из методов процесса() моего плагина.

Итак, если я запускаю скрипт (stompclient.py), все работает как положено.

Также кажется, что он работает нормально, если я запускаю поворот в режиме без демона (-n):

twistd -noy stompclient.tac

однако это так нет работает, когда я запускаю в режиме демона:

twistd -oy stompclient.tac

Кажется, что приложение запускается нормально, но когда оно пытается завершить работу, оно просто зависает.Под «зависанием» я имею в виду, что дочерний процесс никогда не просит что-либо сделать, а родительский процесс (который вызываетсяpool.apply_async()) просто сидит и ждет возврата ответа.

Я уверен, что делаю что-то глупое с Twisted + multiprocessing, но очень надеюсь, что кто-нибудь объяснит мне недостаток моего подхода.

Заранее спасибо!

Это было полезно?

Решение

Поскольку разница между вашим рабочим вызовом и нерабочим вызовом заключается только в опции «-n», наиболее вероятно, что проблема вызвана процессом демонизации (который «-n» предотвращает).

В POSIX одним из этапов демонизации является разветвление и родительский выход.Помимо всего прочего, это приводит к тому, что ваш код выполняется в другом процессе, отличном от того, в котором оценивался файл .tac.Это также переупорядочивает дочерние/родительские отношения процессов, которые были запущены в файле .tac, как это было в вашем пуле многопроцессорных процессов.

Процессы многопроцессорного пула начинаются с родительского процесса, который вы запускаете.Однако когда этот процесс завершается в рамках демонизации, его родительским процессом становится процесс инициализации системы.Это может вызвать некоторые проблемы, но, вероятно, не ту проблему зависания, которую вы описали.Вероятно, существуют и другие подобные низкоуровневые детали реализации, которые обычно позволяют работать многопроцессорному модулю, но нарушаются процессом демонизации.

К счастью, избежать этого странного взаимодействия нетрудно.Сервисные API Twisted позволяют запускать код после завершения демонизации.Если вы используете эти API, вы можете отложить инициализацию пула процессов многопроцессорного модуля до завершения демонизации и, надеюсь, избежать проблемы.Вот пример того, как это может выглядеть:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

Теперь, отдельно, вы также можете столкнуться с проблемами, связанными с очисткой дочерних процессов модуля многопроцессорности, или, возможно, с проблемами с процессами, созданными с помощью API создания процессов Twisted, реактора.spawnProcess.Это связано с тем, что часть правильной работы с дочерними процессами обычно включает обработку сигнала SIGCHLD.Однако Twisted и multiprocessing не будут сотрудничать в этом отношении, поэтому один из них будет получать уведомление обо всех выходах дочерних процессов, а другой никогда не будет уведомлен.Если вы вообще не используете API Twisted для создания дочерних процессов, то это может быть для вас нормально, но вы можете захотеть проверить, чтобы убедиться, что любой обработчик сигналов, который пытается установить многопроцессорный модуль, действительно «выигрывает» и не получает заменен собственным обработчиком Twisted.

Другие советы

Возможная идея для вас...

При работе в режиме демона Twistd закроет stdin, stdout и stderr.Ваши клиенты что-то читают или пишут в них?

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top