مقاطعة لوحة المفاتيح مع مجموعة المعالجة المتعددة في بايثون

StackOverflow https://stackoverflow.com/questions/1408356

سؤال

كيف يمكنني التعامل مع أحداث KeyboardInterrupt باستخدام مجموعات المعالجة المتعددة في بايثون؟اليك مثال بسيط:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

عند تشغيل الكود أعلاه، سيتم KeyboardInterrupt يتم رفعه عندما اضغط ^C, ، لكن العملية معلقة عند تلك النقطة ويجب أن أقوم بإيقافها خارجيًا.

أريد أن أكون قادرًا على الضغط ^C في أي وقت والتسبب في إنهاء جميع العمليات بأمان.

هل كانت مفيدة؟

المحلول

وهذا هو خلل بيثون. عند انتظار شرط في threading.Condition.wait ()، يتم إرسال KeyboardInterrupt أبدا. إعدادها في صورة جاهزة:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

ولن يتم تسليم الاستثناء KeyboardInterrupt حتى الانتظار () العودة، وذلك يعود أبدا، وبالتالي فإن المقاطعة يحدث أبدا. يجب KeyboardInterrupt يكاد يكون من المؤكد يقطع انتظار حالة.

لاحظ أن هذا لا يحدث إذا تم تحديد مهلة. cond.wait (1) سوف تتلقى المقاطعة فورا. لذلك، الحل هو تحديد مهلة. للقيام بذلك، استبدال

    results = pool.map(slowly_square, range(40))

مع

    results = pool.map_async(slowly_square, range(40)).get(9999999)

وأو ما شابه ذلك.

نصائح أخرى

مما وجدته مؤخرًا، فإن الحل الأفضل هو إعداد العمليات المنفذة لتجاهل SIGINT تمامًا، وقصر كل كود التنظيف على العملية الأصلية.يعمل هذا على إصلاح المشكلة لكل من العمليات المنفذة الخاملة والمشغولة، ولا يتطلب أي خطأ في معالجة التعليمات البرمجية في العمليات التابعة لديك.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

يمكن العثور على الشرح ورمز المثال الكامل على http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ و http://github.com/jreese/multiprocessing-keyboardinterrupt على التوالى.

لبعض الأسباب، يتم التعامل مع استثناءات فقط ورثت من الفئة الأساسية Exception عادة. كحل مؤقت، قد إعادة رفع KeyboardInterrupt الخاص بك كما مثيل Exception:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

وعادة ستحصل الإخراج التالي:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

وحتى إذا كنت ضرب ^C، سوف تحصل:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

وعادة ما تكون هذه هيكل بسيط يعمل ل السيطرة - C على بركة:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

وكما ورد في بعض وظائف مماثلة:

التقاط keyboardinterrupt في بيثون دون محاولة-باستثناء

ويبدو أن هناك نوعان من القضايا التي تجعل من الاستثناءات في حين متعدد المعالجة مزعج. أول (لاحظ جلين) هي التي تحتاج إلى استخدام map_async مع مهلة بدلا من map من أجل الحصول على استجابة فورية (أي لا تنتهي تجهيز قائمة كاملة). والثاني (لاحظ أندريه) هو أن المعالجة المتعددة لا يصاب الاستثناءات التي لا ترث من Exception (على سبيل المثال، SystemExit). حتى هنا بلدي الحل الذي يتعامل مع كل من هذه:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

والجواب صوت لا تعالج القضية الأساسية ولكن أحد الآثار الجانبية مماثلة.

وجيسي Noller، صاحب مكتبة متعدد المعالجة، ويوضح كيفية التعامل بشكل صحيح مع CTRL + C عند استخدام multiprocessing.Pool في القديم <لأ href = "http://jessenoller.com/blog/2009/01/08/multiprocessingpool -و-keyboardinterrupt "يختلط =" نوفولو noreferrer "> بلوق وظيفة .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

ولقد وجدت، في الوقت الراهن، فإن أفضل حل هو عدم استخدام ميزة multiprocessing.pool بل لفة وظائف تجمع الخاصة بك. أنا قدمت مثالا يدل على خطأ مع apply_async وكذلك مثال يوضح كيفية تجنب استخدام وظيفة تجمع تماما.

http://www.bryceboe.com/ 2010/08/26 / بيثون متعدد المعالجة، وkeyboardinterrupt /

وأنا مبتدئ في بيثون. كنت أبحث في كل مكان للإجابة وتتعثر هذه وغيرها من بلوق وأشرطة فيديو يوتيوب الحصر. لقد حاولت نسخ لصق رمز البلاغ أعلاه وإعادة إنتاجه على بلدي الثعبان 2.7.13 في ويندوز 7 64- قليلا. أنها قريبة إلى ما أريد تحقيقه.

ولقد تقدمت عمليات طفلي تجاهل ControlC وجعل عملية الأصل تنتهي. يبدو تجاوز عملية طفل لا تجنب هذه المشكلة بالنسبة لي.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

والجزء ابتداء من الساعة pool.terminate() لا يبدو لتنفيذه.

ويمكنك محاولة استخدام طريقة apply_async لكائن بركة، مثل هذا:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

وإخراج:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

وميزة هذه الطريقة هي أن النتائج معالجتها قبل أن تعاد انقطاع في القاموس النتائج:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

والغريب أنه يبدو أن لديك للتعامل مع KeyboardInterrupt في الأطفال كذلك. كنت أتوقع هذا العمل كما هو مكتوب ... حاول تغيير slowly_square إلى:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

ويجب أن تعمل كما هو متوقع.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top