Pregunta

Estoy escribiendo un programa de servidor con un productor y múltiples consumidores, Lo que me confunde es sólo el primer productor tarea poner en la cola consigue consumida, después de lo cual las tareas en cola ya no conseguir consumidos, siguen siendo en la cola para siempre.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

El productor es un servidor HTTP que puso una tarea en la cola una vez que reciba una solicitud del usuario. Parece que los procesos de consumo siguen siendo bloqueado cuando hay nuevas tareas en la cola, lo cual es raro.

P.S. Otras dos cuestiones no relacionadas con lo anterior, no estoy seguro de si es mejor poner servidor HTTP en su propio proceso que no sea el principal proceso, en caso afirmativo ¿cómo puedo hacer que el proceso principal seguir corriendo delante de todos hijos procesos finales. Segunda pregunta, ¿cuál es la mejor manera de detener la servidor HTTP con gracia?

Editar : añadir código del productor, es sólo un simple servidor de pitón wsgi:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
¿Fue útil?

Solución

Creo que debe haber algo mal con la parte del servidor web, ya que esto funciona a la perfección:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        self.queue.close()


Manager().start()

Resultado de muestra:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

Otros consejos

"Segunda pregunta, ¿cuál es la mejor manera de detener el servidor HTTP con gracia?"

Esto es difícil.

Existen dos opciones para la comunicación entre procesos:

  • Fuera de la banda de los controles. El servidor tiene otro mecanismo de comunicación. Otra toma de corriente, una señal de Unix, o alguna otra cosa. La otra cosa podría ser un archivo "stop-ahora" en el directorio local del servidor. Parece extraño, pero funciona bien y es más simple que la introducción de un bucle de selección para escuchar en varios enchufes o un manejador de señales para captar una señal Unis.

    El archivo de "freno y ahora" es fácil de implementar. El bucle evwsgi.run() simplemente comprueba este archivo después de cada solicitud. Para hacer la parada del servidor, se crea el archivo, ejecutar una solicitud /control (que recibirá un error 500 o algo así, que en realidad no importa) y el servidor debe detendría. Recuerde que debe eliminar el nivel de archivo, ahora, de lo contrario el servidor no se reiniciará.

  • en banda controles. El servidor tiene otra URL (/stop) que detenerlo. Superficialmente, esto parece una pesadilla para la seguridad, sino que depende por completo de dónde y cómo se utilizará este servidor. Ya que parece ser un simple envoltorio alrededor de una cola de peticiones internas, este URL adicional funciona bien.

    Para que esto funcione, tiene que escribir su propia versión de evwsgi.run() que puede ser terminado por el establecimiento de alguna variable de una manera que va a salir del bucle.

Editar

Es probable que no desea terminar su servidor, ya que no se conoce el estado de su subprocesos de trabajo. Es necesario señalar el servidor y luego sólo hay que esperar a que termine cosas normalmente.

Si quieres matar a la fuerza el servidor, a continuación, os.kill() (o multiprocessing.terminate) va a funcionar. Excepto, por supuesto, usted no sabe lo que los procesos hijo estaban haciendo.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top