Forma correcta de cancelar aceptar y cerrar un / conexión de multiprocesamiento Listener procesamiento Python

StackOverflow https://stackoverflow.com/questions/357656

  •  21-08-2019
  •  | 
  •  

Pregunta

(estoy usando el pyprocessing módulo en este ejemplo, pero reemplazando el procesamiento con multiprocesamiento probablemente debería trabajar si ejecuta pitón 2.6 o utilizar el multiprocesamiento backport )

Actualmente tengo un programa que escucha a un socket UNIX (usando un processing.connection.Listener), aceptar conexiones y genera un manejo de la solicitud hilo. En un cierto punto que quiero salir del proceso con gracia, pero ya que el aceptar () - llamada está bloqueando y no veo ninguna manera de cancelar de una manera agradable. Tengo una forma en que trabaja aquí (OS X) por lo menos, el establecimiento de un manejador de señales y la señalización del proceso desde otro hilo de esta manera:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

La única otra forma que he pensado es llegar a profundizar en la conexión (listener._listener._socket) y el establecimiento de la opción de no bloqueo ... pero que probablemente tiene algunos efectos secundarios y es generalmente mucho miedo.

¿Alguien tiene una forma más elegante (y tal vez incluso correcta!) Para lograr esto? Tiene que ser portátil para OS X, Linux y BSD, pero Windows portabilidad, etc. no es necesario.

Aclaración : ¡Gracias a todos! Como es habitual, las ambigüedades en mi pregunta original se revelan:)

  • necesito para llevar a cabo la limpieza después de haber cancelado la escucha, y no siempre lo quiero en realidad salir de ese proceso.
  • Tengo que ser capaz de acceder a este proceso desde otros procesos no engendrados del mismo padre, lo que hace difícil de manejar colas
  • Las razones de hilos son que:
    • que acceda a un estado compartido. En realidad, más o menos una base de datos común en memoria, así que supongo que se podría hacer de manera diferente.
    • I debe ser capaz de tener varias conexiones aceptadas, al mismo tiempo, pero los hilos reales están bloqueando algo mayor parte del tiempo. Cada conexión aceptada genera un nuevo hilo; esto con el fin de no bloquear todos los clientes en operaciones de E / S.

En cuanto a las discusiones frente a los procesos, que utiliza hilos para hacer mis operaciones de bloqueo no bloqueante y procesos para permitir el multiprocesamiento.

¿Fue útil?

Solución

No es eso lo que es para seleccionar ??

Sólo llame a aceptar en el zócalo si el selecto indica que no bloquee ...

El Select tiene un tiempo de espera, por lo que puede salir de vez en cuando de vez en cuando para comprobar si es el momento de cerrar ....

Otros consejos

pensé que podía evitarlo, pero parece que tengo que hacer algo como esto:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

Soy consciente de que esto rompe la ley del Demeter e introduce algún mal mono-parches, pero parece que esta sería la forma más fácil de puerto de lograr esto. Si alguien tiene una solución más elegante que estaría feliz de saber que:)

Soy nuevo en el módulo de multiprocesamiento, pero me parece que mezclar el módulo de procesamiento y el módulo de roscar es contrario a la intuición, no son dirigidos a resolver el mismo problema?

De todos modos, ¿qué hay de embalar sus funciones escuchar en un proceso en sí mismo? No estoy claro cómo esto afecta al resto de su código, pero esto puede ser una alternativa más limpia.

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'

Probablemente no es ideal, pero se puede liberar el bloque mediante el envío de la toma de algunos datos desde el controlador de señal o el hilo que se da por concluido el proceso.

EDIT: Otra forma de implementar esto podría ser el uso de la etiqueta Conexión colas , ya que parecen apoyar los tiempos de espera (disculpas, leí mal el código en mi primera lectura).

Me encontré con el mismo problema. Lo resuelto mediante el envío de un comando de "parada" para el oyente. En hilo principal del oyente (el que procesa los mensajes entrantes), cada vez que se recibe un mensaje nuevo, acabo de comprobar para ver si es un comando "Stop" y salir del hilo principal.

Aquí está el código que estoy usando:

def start(self):
    """
    Start listening
    """
    # set the command being executed
    self.command = self.COMMAND_RUN

    # startup the 'listener_main' method as a daemon thread
    self.listener = Listener(address=self.address, authkey=self.authkey)
    self._thread = threading.Thread(target=self.listener_main, daemon=True)
    self._thread.start()

def listener_main(self):
    """
    The main application loop
    """

    while self.command == self.COMMAND_RUN:
        # block until a client connection is recieved
        with self.listener.accept() as conn:

            # receive the subscription request from the client
            message = conn.recv()

            # if it's a shut down command, return to stop this thread
            if isinstance(message, str) and message == self.COMMAND_STOP:
                return

            # process the message

def stop(self):
    """
    Stops the listening thread
    """
    self.command = self.COMMAND_STOP
    client = Client(self.address, authkey=self.authkey)
    client.send(self.COMMAND_STOP)
    client.close()

    self._thread.join()

Estoy usando una clave de autenticación para evitar que sería hackers se apague mi servicio mediante el envío de una orden de parada desde un cliente arbitrario.

La mía no es una solución perfecta. Parece una mejor solución podría ser la de revisar el código en multiprocessing.connection.Listener, y añadir un método stop(). Sin embargo, eso requeriría de enviarlo a través del proceso de aprobación por parte del equipo de Python.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top