Question

For operations in my Tornado server that are expected to block (and can't be easily modified to use things like Tornado's asynchronous HTTP request client), I have been offloading the work to separate worker processes using the multiprocessing module. Specifically, I was using a multiprocessing Pool because it offers a method called apply_async, which works very well with Tornado since it takes a callback as one of its arguments.

I recently realized that a pool preallocates the number of processes, so if they all become blocking, operations that require a new process will have to wait. I do realize that the server can still take connections since apply_async works by adding things to a task queue, and is rather immediately finished, itself, but I'm looking to spawn n processes for n amount of blocking tasks I need to perform.

I figured that I could use the add_handler method for my Tornado server's IOLoop to add a handler for each new PID that I create to that IOLoop. I've done something similar before, but it was using popen and an arbitrary command. An example of such use of this method is here. I wanted to pass arguments into an arbitrary target Python function within my scope, though, so I wanted to stick with multiprocessing.

However, it seems that something doesn't like the PIDs that my multiprocessing.Process objects have. I get IOError: [Errno 9] Bad file descriptor. Are these processes restricted somehow? I know that the PID isn't available until I actually start the process, but I do start the process. Here's the source code of an example I've made that demonstrates this issue:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

Here is the traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

The above code is actually modified from an older example that used process pools. I've had it saved for reference for my coworkers and myself (hence the heavy amount of comments) for quite a while. I constructed it in such a way so that I could open two small browser windows side-by-side to demonstrate to my boss that the /sync URI blocks connections while /async allows more connections. For the purposes of this question, all you need to do to reproduce it is try to access the /async handler. It errors immediately.

What should I do about this? How can the PID be "bad"? If you run the program, you can see it be printed to stdout.

For the record, I'm using Python 2.6.5 on Ubuntu 10.04. Tornado is 1.1.

Was it helpful?

Solution

add_handler takes a valid file descriptor, not a PID. As an example of what's expected, tornado itself uses add_handler normally by passing in a socket object's fileno(), which returns the object's file descriptor. PID is irrelevant in this case.

OTHER TIPS

Check out this project:

https://github.com/vukasin/tornado-subprocess

it allows you to start arbitrary processes from tornado and get a callback when they finish (with access to their status, stdout and stderr).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top