Pregunta

Para las operaciones en mi servidor del tornado que se espera que el bloque (y no se pueden modificar fácilmente para usar cosas como asíncrona petición del cliente HTTP de Tornado), he sido la descarga de la obra a los procesos de trabajo separados utilizando el módulo de multiprocessing. Específicamente, estaba usando un Pool multiprocesamiento, ya que ofrece un método llamado apply_async, que funciona muy bien con Tornado, ya que toma una devolución de llamada como uno de sus argumentos.

Hace poco se dio cuenta de que un grupo preasigna el número de procesos, por lo que si todos se convierten en el bloqueo, las operaciones que requieren un nuevo proceso tendrá que esperar. Soy consciente de que el servidor todavía puede tener conexiones desde las obras apply_async añadiendo cosas a una cola de tareas, y está bien terminado inmediatamente, sí, pero estoy buscando a desovar n procesos de n cantidad de tareas de bloqueo que tienen que realizar.

Me imaginé que podría utilizar el método de add_handler IOLoop de mi servidor de Tornado para agregar un controlador PID para cada nuevo que creo que a IOLoop. Yo he hecho algo similar antes, pero estaba usando popen y un comando arbitrario. Un ejemplo de tal uso de este método es aquí . Quería pasar argumentos a una función de Python objetivo arbitraria dentro de mi alcance, sin embargo, así que quería seguir con multiprocessing.

Sin embargo, parece que algo no le gusta el PID que mis objetos multiprocessing.Process tienen. Consigo IOError: [Errno 9] Bad file descriptor. Son estos procesos restringidos de alguna manera? Yo sé que el PID no está disponible hasta que realmente iniciar el proceso, pero DO a empezar el proceso. Aquí está el código fuente de un ejemplo que he hecho que muestra este problema:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

Aquí está el rastreo:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

El código anterior es en realidad modifica a partir de un ejemplo más viejo que las piscinas de proceso utilizadas. He tenido que guardarse como referencia para mis compañeros de trabajo y yo (de ahí la fuerte cantidad de comentarios) durante bastante tiempo. Construí en tal manera para que yo pudiera abrir dos ventanas de navegador pequeñas de lado a lado para demostrar a mi jefe que el / sincronización URI mientras que bloquea las conexiones / asíncrono permite más conexiones. A los efectos de esta pregunta, todo lo que necesita hacer para reproducirlo es tratar de acceder al manejador / asíncrono. Se errores inmediatamente.

¿Qué debo hacer al respecto? ¿Cómo puede ser el PID "malo"? Si ejecuta el programa, se puede ver que se imprime en la salida estándar.

Para que conste, estoy usando Python 2.6.5 en Ubuntu 10.04. Tornado es 1,1.

¿Fue útil?

Solución

add_handler toma un descriptor de archivo válido, no un PID. Como un ejemplo de lo que se espera, tornado en sí utiliza add_handler normalmente haciendo pasar en fileno de un objeto de socket (), que devuelve el descriptor de fichero del objeto. PID es irrelevante en este caso.

Otros consejos

Salida este proyecto:

https://github.com/vukasin/tornado-subprocess

que le permite iniciar procesos arbitrarios de tornado y obtener una devolución de llamada cuando terminen (con acceso a su estado, stdout y stderr).

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top