Pergunta

Para operações no meu servidor de tornado que devem bloquear (e não podem ser facilmente modificadas para usar coisas como o cliente de solicitação HTTP assíncrono de tornado), tenho descarregado o trabalho para separar os processos do trabalhador usando o multiprocessing módulo. Especificamente, eu estava usando um multiprocessamento Pool Porque oferece um método chamado apply_async, que funciona muito bem com o tornado, pois é preciso um retorno de chamada como um de seus argumentos.

Recentemente, percebi que um pool pré -allocata o número de processos; portanto, se todos se tornarem bloqueadores, as operações que exigem um novo processo terão que esperar. Eu percebo que o servidor ainda pode tomar conexões desde apply_async funciona adicionando coisas a uma fila de tarefas, e está imediatamente acabado, por si só, mas estou procurando gerar n processos para n quantidade de tarefas de bloqueio que preciso executar.

Eu imaginei que poderia usar o add_handler Método para o Ioloop do meu servidor de tornado para adicionar um manipulador para cada novo PID que eu crio a esse ioloop. Eu já fiz algo semelhante antes, mas estava usando Popen e um comando arbitrário. Um exemplo desse uso desse método é aqui. Eu queria passar argumentos para uma função arbitrária de Python no meu escopo, então eu queria ficar com multiprocessing.

No entanto, parece que algo não gosta dos pids que meu multiprocessing.Process objetos têm. eu recebo IOError: [Errno 9] Bad file descriptor. Esses processos estão restritos de alguma forma? Eu sei que o PID não está disponível até eu realmente começar o processo, mas eu Faz Inicie o processo. Aqui está o código -fonte de um exemplo que fiz que demonstra esse problema:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

Aqui está o Traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

O código acima é realmente modificado a partir de um exemplo mais antigo que usou pools de processos. Eu o economizei para referência para meus colegas de trabalho e para mim (daí a grande quantidade de comentários) por um bom tempo. Eu o construí dessa maneira para poder abrir duas pequenas janelas do navegador lado a lado para demonstrar ao meu chefe que as conexões /sync uri bloqueiam enquanto /async permite mais conexões. Para os propósitos desta pergunta, tudo o que você precisa fazer para reproduzi -la é tentar acessar o manipulador /assíncrono. Erra imediatamente.

O que devo fazer sobre isso? Como o PID pode ser "ruim"? Se você executar o programa, poderá vê -lo impresso para Stdout.

Para o registro, estou usando o Python 2.6.5 no Ubuntu 10.04. Tornado é 1.1.

Foi útil?

Solução

Add_handler pega um descritor de arquivo válido, não um PID. Como exemplo do que é esperado, o próprio tornado usa add_handler normalmente passando no arquivo de um objeto de soquete (), que retorna o descritor de arquivo do objeto. O PID é irrelevante neste caso.

Outras dicas

Confira este projeto:

https://github.com/vukasin/tornado-subprocess

Ele permite iniciar processos arbitrários do Tornado e obter um retorno de chamada quando terminar (com acesso ao status, stdout e stderr).

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top