Правильный способ отменить accept и закрыть соединение с обработчиком Python / многопроцессорным прослушивателем

StackOverflow https://stackoverflow.com/questions/357656

  •  21-08-2019
  •  | 
  •  

Вопрос

(Я использую обработка pyprocessing модуль в этом примере, но замена обработки на многопроцессорную, вероятно, должна сработать, если вы запустите python 2.6 или используйте резервный порт многопроцессорной обработки)

В настоящее время у меня есть программа, которая прослушивает сокет unix (используя прослушиватель processing.connection.), принимает подключения и запускает поток, обрабатывающий запрос.В определенный момент я хочу изящно завершить процесс, но поскольку вызов accept() блокируется, и я не вижу способа отменить его приятным способом.У меня есть один способ, который работает здесь (по крайней мере, OS X), устанавливающий обработчик сигнала и сигнализирующий процессу из другого потока следующим образом:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

Единственный другой способ, о котором я думал, - это глубоко проникнуть в соединение (listener._listener._socket) и установить неблокирующий параметр...но это, вероятно, имеет некоторые побочные эффекты и, как правило, действительно пугает.

Есть ли у кого-нибудь более элегантный (и, возможно, даже правильный!) способ добиться этого?Он должен быть переносимым на OS X, Linux и BSD, но переносимость Windows и т.д. Не обязательна.

Разъяснение:Спасибо всем!Как обычно, выявляются двусмысленности в моем первоначальном вопросе :)

  • Мне нужно выполнить очистку после того, как я отменил прослушивание, и я не всегда хочу на самом деле завершать этот процесс.
  • Мне нужно иметь возможность получить доступ к этому процессу из других процессов, не созданных одним и тем же родительским, что делает очереди громоздкими
  • Причины появления потоков заключаются в том, что:
    • Они получают доступ к общему состоянию.На самом деле это более или менее обычная база данных в памяти, так что я полагаю, это можно было бы сделать по-другому.
    • Я должен иметь возможность принимать несколько подключений одновременно, но фактические потоки большую часть времени что-то блокируют.Каждое принятое соединение порождает новый поток;это для того, чтобы не блокировать всех клиентов при операциях ввода-вывода.

Что касается потоков по сравнениюпроцессы, я использую потоки, чтобы сделать мои блокирующие операции неблокирующими, и процессы, чтобы включить многопроцессорность.

Это было полезно?

Решение

Разве не для этого существует select??

Вызывайте accept в сокете только в том случае, если select указывает, что он не будет заблокирован...

У select есть тайм-аут, поэтому вы можете время от времени выходить, чтобы проверить не пришло ли время для завершения работы....

Другие советы

Я думал, что смогу избежать этого, но, похоже, я должен сделать что-то вроде этого:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

Я знаю, что это нарушает закон Деметры и вводит некоторые злые исправления, но, похоже, это был бы самый простой в переносе способ достижения этой цели.Если у кого-нибудь есть более элегантное решение, я был бы рад его услышать :)

Я новичок в модуле многопроцессорной обработки, но мне кажется, что смешивание модуля обработки и модуля обработки потоков нелогично, разве они не нацелены на решение одной и той же проблемы?

В любом случае, как насчет того, чтобы включить ваши функции прослушивания в сам процесс?Мне не совсем ясно, как это влияет на остальную часть вашего кода, но это может быть более чистой альтернативой.

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'

Вероятно, это не идеально, но вы можете освободить блок, отправив сокету некоторые данные из обработчика сигналов или потока, который завершает процесс.

Редактировать:Другим способом реализации этого может быть использование Соединение Очереди, поскольку они, похоже, поддерживают тайм-ауты (прошу прощения, я неправильно прочитал ваш код при первом чтении).

Я столкнулся с той же проблемой.Я решил это, отправив слушателю команду "стоп".В главном потоке прослушивателя (том, который обрабатывает входящие сообщения) каждый раз, когда получено новое сообщение, я просто проверяю, является ли это командой "стоп", и выхожу из основного потока.

Вот код, который я использую:

def start(self):
    """
    Start listening
    """
    # set the command being executed
    self.command = self.COMMAND_RUN

    # startup the 'listener_main' method as a daemon thread
    self.listener = Listener(address=self.address, authkey=self.authkey)
    self._thread = threading.Thread(target=self.listener_main, daemon=True)
    self._thread.start()

def listener_main(self):
    """
    The main application loop
    """

    while self.command == self.COMMAND_RUN:
        # block until a client connection is recieved
        with self.listener.accept() as conn:

            # receive the subscription request from the client
            message = conn.recv()

            # if it's a shut down command, return to stop this thread
            if isinstance(message, str) and message == self.COMMAND_STOP:
                return

            # process the message

def stop(self):
    """
    Stops the listening thread
    """
    self.command = self.COMMAND_STOP
    client = Client(self.address, authkey=self.authkey)
    client.send(self.COMMAND_STOP)
    client.close()

    self._thread.join()

Я использую ключ аутентификации, чтобы предотвратить отключение моего сервиса потенциальными хакерами путем отправки команды stop от произвольного клиента.

Мое решение не является идеальным.Кажется, лучшим решением могло бы быть пересмотр кода в multiprocessing.connection.Listener, и добавьте stop() способ.Но для этого потребовалось бы отправить его через процесс на утверждение командой Python.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top