errno 9 باستخدام وحدة المعالجة المتعددة مع إعصار في بيثون

StackOverflow https://stackoverflow.com/questions/4073092

سؤال

بالنسبة للعمليات في خادم Tornado الخاص بي والتي من المتوقع حظرها (ولا يمكن تعديلها بسهولة لاستخدام أشياء مثل عميل طلب HTTP غير المتزامن من Tornado) ، فقد قمت بتفريغ العمل لفصل عمليات العمال باستخدام multiprocessing وحدة. على وجه التحديد ، كنت أستخدم المعالجة المتعددة Pool لأنه يقدم طريقة تسمى apply_async, ، الذي يعمل بشكل جيد للغاية مع Tornado لأنه يأخذ رد الاتصال كأحد حججه.

لقد أدركت مؤخرًا أن مجموعة التجمعات تسبق عدد العمليات ، لذلك إذا أصبحت جميعها حظرًا ، فسيتعين على العمليات التي تتطلب عملية جديدة الانتظار. أدرك أنه لا يزال بإمكان الخادم اتخاذ الاتصالات منذ ذلك الحين apply_async يعمل عن طريق إضافة أشياء إلى قائمة انتظار مهمة ، وينتهي على الفور ، على الفور ، لكنني أتطلع إلى تفرخ ن عمليات ن مقدار مهام الحظر التي أحتاجها.

اعتقدت أنه يمكنني استخدام add_handler طريقة Ioloop الخاصة بخادم Tornado الخاص بي لإضافة معالج لكل pid جديد أقوم بإنشائه إلى ذلك ioloop. لقد فعلت شيئًا مشابهًا من قبل ، لكنه كان يستخدم Popen وأمر تعسفي. مثال على هذا الاستخدام لهذه الطريقة هو هنا. كنت أرغب في نقل الحجج إلى وظيفة ثعبان مستهدفة تعسفية ضمن عملي ، لذلك أردت الالتزام بها multiprocessing.

ومع ذلك ، يبدو أن شيئًا ما لا يحب Pids multiprocessing.Process الأشياء لها. انا حصلت IOError: [Errno 9] Bad file descriptor. هل هذه العمليات مقيدة بطريقة ما؟ أعلم أن PID غير متوفر حتى أبدأ العملية بالفعل ، لكنني فعل ابدأ العملية. فيما يلي رمز المصدر لمثال قدمته والذي يوضح هذه المشكلة:

#!/usr/bin/env python

"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""

import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time

__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'

def sleepy(queue):
    """Pushes a string to the queue after sleeping for 5 seconds.
    This sleeping can be thought of as a blocking operation."""

    time.sleep(5)
    queue.put("Now I'm awake.")
    return

def random_num():
    """Returns a string containing a random number.
    This function can be used by handlers to receive text for writing which
    facilitates noticing change on the webpage when it is refreshed."""

    n = random.random()
    return "<br />Here is a random number to show change: {0}".format(n)

class SyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request synchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. While the process is sleeping, the Tornado server cannot
    handle any requests at all."""

    def get(self):
        q = mp.Queue()
        sleepy(q)
        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by SyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())

class AsyncHandler(tornado.web.RequestHandler):
    """Demonstrates handing a request asynchronously.
    It executes sleepy() before writing some more text and a random number to
    the webpage. It passes the sleeping function off to another process using
    the multiprocessing module in order to handle more requests concurrently to
    the sleeping, which is like a blocking operation."""

    @tornado.web.asynchronous
    def get(self):
        """Handles the original GET request (normal function delegation).
        Instead of directly invoking sleepy(), it passes a reference to the
        function to the multiprocessing pool."""

        # Create an interprocess data structure, a queue.
        q = mp.Queue()
        # Create a process for the sleepy function. Provide the queue.
        p = mp.Process(target=sleepy, args=(q,))
        # Start it, but don't use p.join(); that would block us.
        p.start()
        # Add our callback function to the IOLoop. The async_callback wrapper
        # makes sure that Tornado sends an HTTP 500 error to the client if an
        # uncaught exception occurs in the callback.
        iol = tornado.ioloop.IOLoop.instance()
        print "p.pid:", p.pid
        iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)

    def _finish(self, q):
        """This is the callback for post-sleepy() request handling.
        Operation of this function occurs in the original process."""

        val = q.get()
        self.write(val)
        self.write('<br />Brought to you by AsyncHandler.')
        self.write('<br />Try refreshing me and then the main page.')
        self.write(random_num())
        # Asynchronous handling must be manually finished.
        self.finish()

class MainHandler(tornado.web.RequestHandler):
    """Returns a string and a random number.
    Try to access this page in one window immediately after (<5 seconds of)
    accessing /async or /sync in another window to see the difference between
    them. Asynchronously performing the sleepy() function won't make the client
    wait for data from this handler, but synchronously doing so will!"""

    def get(self):
        self.write('This is just responding to a simple request.')
        self.write('<br />Try refreshing me after one of the other pages.')
        self.write(random_num())

if __name__ == '__main__':
    # Create an application using the above handlers.
    application = tornado.web.Application([
        (r"/", MainHandler),
        (r"/sync", SyncHandler),
        (r"/async", AsyncHandler),
    ])
    # Create a single-process Tornado server from the application.
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'The HTTP server is listening on port 8888.'
    tornado.ioloop.IOLoop.instance().start()

ها هو التتبع:

Traceback (most recent call last):
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
    yield
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
    getattr(self, self.request.method.lower())(*args, **kwargs)
  File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
    return method(self, *args, **kwargs)
  File "./process_async.py", line 73, in get
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
  File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
    self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor

يتم تعديل الكود أعلاه بالفعل من مثال أقدم يستخدم تجمعات العملية. لقد تم إنقاذها للرجوع إليها لزملائي في العمل ونفسي (ومن ثم مقدار التعليقات الثقيلة) لفترة طويلة. لقد قمت بإنشائها بطريقة حتى أتمكن من فتح نوافذ متصفح صغيرة جنبًا إلى جنب لإثبات أن /متزامنة اتصالات URI بينما يتيح /Async المزيد من الاتصالات. لأغراض هذا السؤال ، كل ما عليك القيام به لإعادة إنتاجه هو محاولة الوصول إلى معالج /aroync. يخطئ على الفور.

ما الذي علي فعله بخصوص هذا؟ كيف يمكن أن يكون PID "سيئًا"؟ إذا قمت بتشغيل البرنامج ، فيمكنك رؤيته مطبوعًا على stdout.

للسجل ، أنا أستخدم Python 2.6.5 على Ubuntu 10.04. الإعصار هو 1.1.

هل كانت مفيدة؟

المحلول

Add_handler يأخذ واصف ملف صالح ، وليس PID. كمثال على ما هو متوقع ، يستخدم Tornado نفسه Add_handler عادةً عن طريق تمرير FileNo () لكائن المقبس ، والذي يقوم بإرجاع واصف ملف الكائن. PID غير ذي صلة في هذه الحالة.

نصائح أخرى

تحقق من هذا المشروع:

https://github.com/vukasin/tornado-subprocess

يتيح لك البدء في عمليات تعسفية من Tornado والحصول على رد اتصال عند الانتهاء (مع الوصول إلى وضعهم و stdout و Stderr).

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top