Pythonマルチプロセッシングにおけるプロデューサー/コンシューマーの問題

StackOverflow https://stackoverflow.com/questions/914821

  •  06-09-2019
  •  | 
  •  

質問

私は1人のプロデューサーと複数の消費者と一緒にサーバープログラムを作成しています。私を混乱させるのは、プロデューサーがキューに入れられる最初のタスクであるだけでなく、その後タスクが消費されなくなり、彼らは永遠にキューに留まります。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

プロデューサーは、ユーザーからリクエストを受け取った後、キューにタスクを配置するHTTPサーバーです。キューに新しいタスクがある場合、消費者プロセスはまだブロックされているようです。これは奇妙です。

追伸上記に関連していない別の2つの質問は、すべての子供のプロセスが終了する前にメインプロセスを実行し続けるにはどうすればよいか、どのようにしてメインプロセス以外にHTTPサーバーを独自のプロセスに配置する方が良いかどうかはわかりません。2番目の質問、HTTPサーバーを優雅に停止する最良の方法は何ですか?

編集:プロデューサー コードを追加すると、単純な Python wsgi サーバーになります。

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
役に立ちましたか?

解決

私は、これは完璧に動作として、Webサーバの一部に異常がなければならないと考えます:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESS):
            self.workers[i].join()
        self.queue.close()


Manager().start()

出力例:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

他のヒント

「2 番目の質問です。HTTP サーバーを正常に停止する最善の方法は何ですか?」

これはきつい。

プロセス間通信には 2 つの選択肢があります。

  • 帯域外コントロール。サーバーには通信のための別のメカニズムがあります。別のソケット、Unix Signal、またはその他のもの。他のものは、サーバーのローカル ディレクトリ内の「stop-now」ファイルである可能性があります。奇妙に思えますが、これはうまく機能し、複数のソケットでリッスンするための選択ループや Unis シグナルをキャッチするためのシグナル ハンドラーを導入するよりも簡単です。

    「stop-now」ファイルは実装が簡単です。の evwsgi.run() ループは、各リクエストの後にこのファイルをチェックするだけです。サーバーを停止するには、ファイルを作成し、 /control リクエスト (500 エラーか何かが返されますが、実際には問題ありません) を実行すると、サーバーが停止するはずです。stop-now ファイルを削除することを忘れないでください。そうしないと、サーバーが再起動しません。

  • インバンドコントロール。サーバーには別の URL (/stop)それを停止します。表面的には、これはセキュリティ上の悪夢のように見えますが、それは完全にこのサーバーがどこでどのように使用されるかによって異なります。これは内部リクエスト キューの単純なラッパーであるように見えるため、この追加の URL は適切に機能します。

    これを機能させるには、独自のバージョンの evwsgi.run() これは、ループから抜け出す方法で変数を設定することで終了できます。

編集

サーバーのワーカー スレッドの状態がわからないため、おそらくサーバーを終了したくないでしょう。サーバーに信号を送る必要があります。その後はサーバーが正常に終了するまで待つだけです。

サーバーを強制終了したい場合は、 os.kill() (または multiprocessing.terminate) 働くでしょう。もちろん、子スレッドが何をしていたかはわかりません。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top