проблема производителя / потребителя с многопроцессорностью python

StackOverflow https://stackoverflow.com/questions/914821

  •  06-09-2019
  •  | 
  •  

Вопрос

Я пишу серверную программу с одним производителем и несколькими потребителями, что меня смущает, так это то, что получает только первый производитель задач, помещенный в очередь потребляется, после чего задачи, поставленные в очередь, больше не потребляются, они остаются в очереди вечно.

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

Производитель - это HTTP-сервер, который ставит задачу в очередь после получения запроса от пользователя.Похоже, что потребительские процессы по-прежнему блокируются, когда в очереди появляются новые задачи, что странно.

P.S.Еще два вопроса, не относящихся к вышесказанному, я не уверен, стоит ли лучше поместить HTTP-сервер в свой собственный процесс, отличный от основного процесс, если да, то как я могу заставить основной процесс продолжать работать перед всеми дочерние процессы завершаются.Второй вопрос, каков наилучший способ изящно остановить HTTP-сервер ?

Редактировать:добавьте код производителя, это всего лишь простой сервер python wsgi:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
Это было полезно?

Решение

Я думаю, что, должно быть, что-то не так с веб-серверной частью, так как это работает отлично:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        self.queue.close()


Manager().start()

Пример вывода:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

Другие советы

"Второй вопрос, каков наилучший способ корректно остановить HTTP-сервер?"

Это тяжело.

У вас есть два варианта межпроцессного взаимодействия:

  • Внеполосные элементы управления.У сервера есть другой механизм для связи.Другой сокет, сигнал Unix или что-то еще.Что-то еще может быть файлом "stop-now" в локальном каталоге сервера.Кажется странным, но это действительно работает хорошо и проще, чем введение цикла выбора для прослушивания нескольких сокетов или обработчика сигнала для перехвата сигнала Unis.

    Файл "stop-now" прост в реализации.В evwsgi.run() цикл просто проверяет наличие этого файла после каждого запроса.Чтобы заставить сервер остановиться, вы создаете файл, выполняете /control запрос (который выдаст ошибку 500 или что-то в этом роде, на самом деле это не имеет значения), и сервер должен остановиться.Не забудьте удалить файл stop-now, иначе ваш сервер не будет перезапущен.

  • Внутриполосное управление.У сервера есть другой URL-адрес (/stop), который остановит это.На первый взгляд это кажется кошмаром безопасности, но это полностью зависит от того, где и как будет использоваться этот сервер.Поскольку это выглядит как простая оболочка вокруг внутренней очереди запросов, этот дополнительный URL работает хорошо.

    Чтобы это сработало, вам нужно написать свою собственную версию evwsgi.run() это может быть прекращено путем установки некоторой переменной таким образом, чтобы она выходила из цикла.

Редактировать

Вероятно, вы не хотите завершать работу своего сервера, поскольку не знаете состояния его рабочих потоков.Вам нужно подать сигнал серверу, а затем вам просто нужно подождать, пока он завершит работу в обычном режиме.

Если вы хотите принудительно уничтожить сервер, то os.kill() (или multiprocessing.terminate) будет работать.За исключением, конечно, того, что вы не знаете, что делали дочерние потоки.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top