errno 9 Pythonで竜巻を使用してマルチプロセッシングモジュールを使用しています

StackOverflow https://stackoverflow.com/questions/4073092

質問

ブロックすると予想される私の竜巻サーバーでの操作の場合(また、Tornadoの非同期HTTPリクエストクライアントのようなものを使用するように簡単に変更できません)、私は作業をオフロードして、 multiprocessing モジュール。具体的には、マルチプロセッシングを使用していました Pool 呼ばれるメソッドを提供するからです apply_async, 、Tornadoはその議論の1つとしてコールバックを取るため、非常にうまく機能します。

私は最近、プールがプロセスの数をpreallocesteしているので、それらがすべてブロックされた場合、新しいプロセスを必要とする操作が待たなければならないことに気付きました。以来、サーバーがまだ接続を取ることができることを理解しています apply_async タスクキューに物事を追加することで機能し、すぐに完成しますが、それ自体ですが、私はスポーンしたいと思っています n のプロセス n 実行する必要があるブロッキングタスクの量。

私はそれを使うことができると考えました add_handler Tornado Serverのioloopの方法では、そのioloopに作成した新しいPIDごとにハンドラーを追加します。私は以前に同様のことをしましたが、それはPopenと任意のコマンドを使用していました。この方法のそのような使用の例は次のとおりです ここ. 。ただし、私の範囲内で引数を任意のターゲットPython関数に渡したかったので、 multiprocessing.

しかし、何かが私のものが好きではないようです multiprocessing.Process オブジェクトにはあります。私は得ます IOError: [Errno 9] Bad file descriptor. 。これらのプロセスはどういうわけか制限されていますか?私は実際にプロセスを開始するまでPIDが利用できないことを知っていますが、私は 行う プロセスを開始します。これが、この問題を示している例のソースコードです。

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

これがトレースバックです:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

上記のコードは、実際にはプロセスプールを使用した古い例から変更されています。私はそれを私の同僚と私自身のために参照のために保存しました(したがって、大量のコメント)かなり長い間。 2つの小さなブラウザウィンドウを並べて開き、 /同期URIが接続をブロックすると /asyncがより多くの接続を可能にすることを上司に示すように、そのような方法で構築しました。この質問の目的のために、それを再現するために必要なことは、 /asyncハンドラーにアクセスしようとすることだけです。すぐにエラーが発生します。

これについてどうすればよいですか?どのようにしてPIDは「悪い」ことができますか?プログラムを実行すると、stdoutに印刷されることがわかります。

記録のために、Ubuntu 10.04でPython 2.6.5を使用しています。竜巻は1.1です。

役に立ちましたか?

解決

add_handlerは、pidではなく有効なファイル記述子を取ります。予想される例として、Tornado自体は通常、SocketオブジェクトのFileno()を渡すことによりAdd_Handlerを使用します。これは、オブジェクトのファイル記述子を返します。この場合、PIDは無関係です。

他のヒント

このプロジェクトをチェックしてください:

https://github.com/vukasin/tornado-subprocess

Tornadoから任意のプロセスを開始し、終了時にコールバックを取得することができます(ステータス、STDOUT、STDERへのアクセスがあります)。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top