Domanda

Per le operazioni in mio server Tornado che si prevede di blocco (e non può essere facilmente modificato per utilizzare le cose come asincrono cliente richiesta HTTP del Tornado), mi è stato scaricando il lavoro per processi di lavoro separati utilizzando il modulo multiprocessing. Specificamente, usando un Pool multiprocessore perché offre un metodo chiamato apply_async, che funziona molto bene con ciclone poiché richiede un callback come uno dei suoi argomenti.

Di recente ho capito che un pool prealloca il numero di processi, quindi se tutti diventano blocco, operazioni che richiedono un nuovo processo dovrà aspettare. Mi rendo conto che il server può ancora prendere le connessioni da opere apply_async con l'aggiunta di cose da una coda compito, ed è piuttosto immediatamente finito, in sé, ma sto cercando di deporre le uova n processi per n quantità di compiti di blocco ho bisogno di eseguire.

Ho pensato che avrei potuto utilizzare il metodo add_handler per IOLoop del mio server Tornado per aggiungere un gestore per ogni nuovo PID che creo a che IOLoop. Ho fatto qualcosa di simile prima, ma stavo usando popen e un comando arbitrario. Un esempio di tale utilizzo di questo metodo è qui . Volevo passare gli argomenti in un bersaglio funzione arbitraria Python nel mio ambito, però, così ho voluto attaccare con multiprocessing.

Tuttavia, sembra che qualcosa non gli piace il PID che i miei oggetti multiprocessing.Process hanno. Ottengo IOError: [Errno 9] Bad file descriptor. Sono questi processi limitato in qualche modo? So che il PID non è disponibile fino a quando ho effettivamente iniziare il processo, ma I do avviare il processo. Ecco il codice sorgente di un esempio che ho fatto che illustra questo problema:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

Questa è la traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

Il codice di cui sopra è in realtà modificato da un esempio più vecchi che piscine processo utilizzato. L'ho avuto conservata per un riferimento per i miei colleghi e io (da qui il forte quantità di commenti) per un bel po '. Ho costruito in tal modo un modo che io possa aprire due finestre del browser piccole side-by-side per dimostrare al mio capo che il / sync URI blocca le connessioni mentre / asincrono permette più connessioni. Ai fini di questa domanda, tutto quello che dovete fare per riprodurre è cercare di accedere al gestore / asincrona. E gli errori immediatamente.

Che cosa devo fare? Come può il PID essere "cattivo"? Se si esegue il programma, si può vedere che essere stampata su stdout.

Per la cronaca, sto usando Python 2.6.5 su Ubuntu 10.04. Tornado è 1.1.

È stato utile?

Soluzione

add_handler prende un descrittore di file valido, non un PID. Come esempio di ciò che è previsto, ciclone si utilizza normalmente add_handler passando fileno di un oggetto socket (), che restituisce il file descrittore dell'oggetto. PID è irrilevante in questo caso.

Altri suggerimenti

Dai un'occhiata a questo progetto:

https://github.com/vukasin/tornado-subprocess

permette di avviare i processi arbitrari dal tornado e ottenere un callback quando hanno finito (con accesso al loro status, stdout e stderr).

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top