Правильный способ отменить accept и закрыть соединение с обработчиком Python / многопроцессорным прослушивателем
-
21-08-2019 - |
Вопрос
(Я использую обработка pyprocessing модуль в этом примере, но замена обработки на многопроцессорную, вероятно, должна сработать, если вы запустите python 2.6 или используйте резервный порт многопроцессорной обработки)
В настоящее время у меня есть программа, которая прослушивает сокет unix (используя прослушиватель processing.connection.), принимает подключения и запускает поток, обрабатывающий запрос.В определенный момент я хочу изящно завершить процесс, но поскольку вызов accept() блокируется, и я не вижу способа отменить его приятным способом.У меня есть один способ, который работает здесь (по крайней мере, OS X), устанавливающий обработчик сигнала и сигнализирующий процессу из другого потока следующим образом:
import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno
# This is actually called by the connection handler.
def closeme():
time.sleep(1)
print 'Closing socket...'
listener.close()
os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)
oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)
listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
listener.accept()
except socket.error, e:
if e.args[0] != errno.EINTR:
raise
# Cleanup here...
print 'Done...'
Единственный другой способ, о котором я думал, - это глубоко проникнуть в соединение (listener._listener._socket) и установить неблокирующий параметр...но это, вероятно, имеет некоторые побочные эффекты и, как правило, действительно пугает.
Есть ли у кого-нибудь более элегантный (и, возможно, даже правильный!) способ добиться этого?Он должен быть переносимым на OS X, Linux и BSD, но переносимость Windows и т.д. Не обязательна.
Разъяснение:Спасибо всем!Как обычно, выявляются двусмысленности в моем первоначальном вопросе :)
- Мне нужно выполнить очистку после того, как я отменил прослушивание, и я не всегда хочу на самом деле завершать этот процесс.
- Мне нужно иметь возможность получить доступ к этому процессу из других процессов, не созданных одним и тем же родительским, что делает очереди громоздкими
- Причины появления потоков заключаются в том, что:
- Они получают доступ к общему состоянию.На самом деле это более или менее обычная база данных в памяти, так что я полагаю, это можно было бы сделать по-другому.
- Я должен иметь возможность принимать несколько подключений одновременно, но фактические потоки большую часть времени что-то блокируют.Каждое принятое соединение порождает новый поток;это для того, чтобы не блокировать всех клиентов при операциях ввода-вывода.
Что касается потоков по сравнениюпроцессы, я использую потоки, чтобы сделать мои блокирующие операции неблокирующими, и процессы, чтобы включить многопроцессорность.
Решение
Разве не для этого существует select??
Вызывайте accept в сокете только в том случае, если select указывает, что он не будет заблокирован...
У select есть тайм-аут, поэтому вы можете время от времени выходить, чтобы проверить не пришло ли время для завершения работы....
Другие советы
Я думал, что смогу избежать этого, но, похоже, я должен сделать что-то вроде этого:
from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()
import select
l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
print "Accepting..."
c = l.accept()
# ...
Я знаю, что это нарушает закон Деметры и вводит некоторые злые исправления, но, похоже, это был бы самый простой в переносе способ достижения этой цели.Если у кого-нибудь есть более элегантное решение, я был бы рад его услышать :)
Я новичок в модуле многопроцессорной обработки, но мне кажется, что смешивание модуля обработки и модуля обработки потоков нелогично, разве они не нацелены на решение одной и той же проблемы?
В любом случае, как насчет того, чтобы включить ваши функции прослушивания в сам процесс?Мне не совсем ясно, как это влияет на остальную часть вашего кода, но это может быть более чистой альтернативой.
from multiprocessing import Process
from multiprocessing.connection import Listener
class ListenForConn(Process):
def run(self):
listener = Listener('/tmp/asdf', 'AF_UNIX')
listener.accept()
# do your other handling here
listen_process = ListenForConn()
listen_process.start()
print listen_process.is_alive()
listen_process.terminate()
listen_process.join()
print listen_process.is_alive()
print 'No more listen process.'
Вероятно, это не идеально, но вы можете освободить блок, отправив сокету некоторые данные из обработчика сигналов или потока, который завершает процесс.
Редактировать:Другим способом реализации этого может быть использование Соединение Очереди, поскольку они, похоже, поддерживают тайм-ауты (прошу прощения, я неправильно прочитал ваш код при первом чтении).
Я столкнулся с той же проблемой.Я решил это, отправив слушателю команду "стоп".В главном потоке прослушивателя (том, который обрабатывает входящие сообщения) каждый раз, когда получено новое сообщение, я просто проверяю, является ли это командой "стоп", и выхожу из основного потока.
Вот код, который я использую:
def start(self):
"""
Start listening
"""
# set the command being executed
self.command = self.COMMAND_RUN
# startup the 'listener_main' method as a daemon thread
self.listener = Listener(address=self.address, authkey=self.authkey)
self._thread = threading.Thread(target=self.listener_main, daemon=True)
self._thread.start()
def listener_main(self):
"""
The main application loop
"""
while self.command == self.COMMAND_RUN:
# block until a client connection is recieved
with self.listener.accept() as conn:
# receive the subscription request from the client
message = conn.recv()
# if it's a shut down command, return to stop this thread
if isinstance(message, str) and message == self.COMMAND_STOP:
return
# process the message
def stop(self):
"""
Stops the listening thread
"""
self.command = self.COMMAND_STOP
client = Client(self.address, authkey=self.authkey)
client.send(self.COMMAND_STOP)
client.close()
self._thread.join()
Я использую ключ аутентификации, чтобы предотвратить отключение моего сервиса потенциальными хакерами путем отправки команды stop от произвольного клиента.
Мое решение не является идеальным.Кажется, лучшим решением могло бы быть пересмотр кода в multiprocessing.connection.Listener
, и добавьте stop()
способ.Но для этого потребовалось бы отправить его через процесс на утверждение командой Python.