Errno 9, используя многопроцессорный модуль с торнадо в Python

StackOverflow https://stackoverflow.com/questions/4073092

Вопрос

Для операций на моем сервере TORNADO, которые, как ожидается, блокируют (и не могут быть легко изменены, чтобы использовать такие вещи, как асинхронный клиент HTTP-запроса Tornado), я выгружал работу на отдельные рабочие процессы, используя multiprocessing модуль. В частности, я использовал многопроцессорную Pool Потому что он предлагает метод под названием apply_async, который работает очень хорошо с торнадо, так как он принимает обратный вызов как один из его аргументов.

Недавно я понял, что у бассейна предоставляет количество процессов, поэтому, если все они становятся блокирующими, операции, которые требуют нового процесса, придется ждать. Я понимаю, что сервер все еще может принимать соединения, поскольку apply_async Работает, добавляя вещи в очередь задач, а скорее немедленно закончили, сам, но я ищу N. процессы для N. Количество блокирующих задач, которые мне нужно выполнить.

Я подумал, что могу использовать add_handler Способ для моего IOLOP TORNADO SERVER, чтобы добавить обработчик для каждого нового PID, который я создаю для этого IOLOP. Я сделал что-то похожее раньше, но он использовал POPEN и произвольную команду. Пример такого использования этого метода здесь. Отказ Я хотел пройти аргументы в произвольный целевой Python функцию в моем объеме, хотя, поэтому я хотел придерживаться multiprocessing.

Однако кажется, что что-то не любит PID, что мой multiprocessing.Process объекты имеют. я получил IOError: [Errno 9] Bad file descriptor. Отказ Эти процессы как-то ограничены? Я знаю, что PID не доступен, пока я не начну процесс, но я делать начать процесс. Вот исходный код примера, который я сделал, который демонстрирует эту проблему:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

Вот трассировка:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

Приведенный выше код фактически модифицирован со более старого примера, который использовал Process Pools. Я был спасен для справки для моих коллег и себя (отсюда и тяжелое количество комментариев) на некоторое время. Я построил его таким образом, чтобы я мог открыть два маленьких браузера Windows бок о бок, чтобы продемонстрировать моему боссу, что Blip Blocks Blocks / Sync URI, пока / async позволяет больше соединений. Для целей этого вопроса все, что вам нужно сделать, чтобы воспроизвести, пытается получить доступ к обработчику ASYNC. Это ошибки немедленно.

Что мне делать с этим? Как PID может быть «плохо»? Если вы запускаете программу, вы можете увидеть его напечатано на STDOUT.

Для записи я использую Python 2.6.5 на Ubuntu 10.04. Торнадо 1,1.

Это было полезно?

Решение

Add_Handler принимает допустимый дескриптор файлов, а не PID. В качестве примера того, что ожидается, что сам Tornado использует Add_Handler, обычно проходя в файло () объекта сокета (), который возвращает дескриптор файла объекта. PID не имеет значения в этом случае.

Другие советы

Проверьте этот проект:

https://github.com/vukasin/tornado-subprocess.

Это позволяет вам начать произвольные процессы из торнадо и получить обратный вызов, когда они закончат (с доступом к их статусу, stdout и STDERR).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top