对于我的龙卷风服务器中的操作,预计将阻止(并且不能轻易修改以使用龙卷风的异步HTTP请求客户端之类的东西),我一直在下载工作以使用该工程使用该工作 multiprocessing 模块。具体来说,我正在使用多处理 Pool 因为它提供了一种称为的方法 apply_async, ,它与Tornado非常有效,因为它将回调作为其论点之一。

我最近意识到,一个池预先关注流程的数量,因此,如果它们都被阻止,那么需要新过程的操作将必须等待。我确实意识到服务器仍然可以进行连接,因为 apply_async 通过在任务队列中添加物品来工作,并且已经立即完成,但我希望产生 n 流程 n 我需要执行的阻止任务数量。

我认为我可以使用 add_handler 我的龙卷风服务器的IOLOOP的方法为我为IOLOOP创建的每个新PID添加了一个处理程序。我以前做过类似的事情,但是它使用了Popen和任意命令。这种使用此方法的一个例子是 这里. 。不过,我想在范围内将参数传递到任意目标python功能中,所以我想坚持 multiprocessing.

但是,似乎有些东西不喜欢我 multiprocessing.Process 对象有。我明白了 IOError: [Errno 9] Bad file descriptor. 。这些过程是否有限制?我知道直到我实际启动该过程,我才能使用PID,但是我 开始过程。这是我提出的示例的源代码,证明了这个问题:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

这是追溯:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

上面的代码实际上是从使用过程池的较旧示例中修改的。我已经为我的同事和我自己(因此有大量评论)保存了参考。我以这样的方式构造了它,以便可以并排打开两个小浏览器窗口,以向老板证明 /同步URI块连接,而 /async允许更多的连接。出于这个问题的目的,您需要做的所有复制就是尝试访问 /异步处理程序。它立即错误。

我该怎么办? PID如何“不好”?如果运行该程序,则可以将其打印到Stdout。

为了记录,我在Ubuntu 10.04上使用Python 2.6.5。龙卷风为1.1。

有帮助吗?

解决方案

add_handler获取有效的文件描述符,而不是PID。作为预期的示例,Tornado本身通常通过传递插座对象的FILENO()来使用ADD_HANDLER,从而返回对象的文件描述符。在这种情况下,PID是无关紧要的。

其他提示

查看此项目:

https://github.com/vukasin/tornado-subprocess

它允许您从龙卷风开始任意进程,并在完成后(访问其状态,Stdout和stderr)时获得回调。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top