مقاطعة لوحة المفاتيح مع مجموعة المعالجة المتعددة في بايثون
-
05-07-2019 - |
سؤال
كيف يمكنني التعامل مع أحداث KeyboardInterrupt باستخدام مجموعات المعالجة المتعددة في بايثون؟اليك مثال بسيط:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
عند تشغيل الكود أعلاه، سيتم KeyboardInterrupt
يتم رفعه عندما اضغط ^C
, ، لكن العملية معلقة عند تلك النقطة ويجب أن أقوم بإيقافها خارجيًا.
أريد أن أكون قادرًا على الضغط ^C
في أي وقت والتسبب في إنهاء جميع العمليات بأمان.
المحلول
وهذا هو خلل بيثون. عند انتظار شرط في threading.Condition.wait ()، يتم إرسال KeyboardInterrupt أبدا. إعدادها في صورة جاهزة:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
ولن يتم تسليم الاستثناء KeyboardInterrupt حتى الانتظار () العودة، وذلك يعود أبدا، وبالتالي فإن المقاطعة يحدث أبدا. يجب KeyboardInterrupt يكاد يكون من المؤكد يقطع انتظار حالة.
لاحظ أن هذا لا يحدث إذا تم تحديد مهلة. cond.wait (1) سوف تتلقى المقاطعة فورا. لذلك، الحل هو تحديد مهلة. للقيام بذلك، استبدال
results = pool.map(slowly_square, range(40))
مع
results = pool.map_async(slowly_square, range(40)).get(9999999)
وأو ما شابه ذلك.
نصائح أخرى
مما وجدته مؤخرًا، فإن الحل الأفضل هو إعداد العمليات المنفذة لتجاهل SIGINT تمامًا، وقصر كل كود التنظيف على العملية الأصلية.يعمل هذا على إصلاح المشكلة لكل من العمليات المنفذة الخاملة والمشغولة، ولا يتطلب أي خطأ في معالجة التعليمات البرمجية في العمليات التابعة لديك.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
يمكن العثور على الشرح ورمز المثال الكامل على http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ و http://github.com/jreese/multiprocessing-keyboardinterrupt على التوالى.
لبعض الأسباب، يتم التعامل مع استثناءات فقط ورثت من الفئة الأساسية Exception
عادة. كحل مؤقت، قد إعادة رفع KeyboardInterrupt
الخاص بك كما مثيل Exception
:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
وعادة ستحصل الإخراج التالي:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
وحتى إذا كنت ضرب ^C
، سوف تحصل:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
وعادة ما تكون هذه هيكل بسيط يعمل ل السيطرة - C على بركة:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
وكما ورد في بعض وظائف مماثلة:
ويبدو أن هناك نوعان من القضايا التي تجعل من الاستثناءات في حين متعدد المعالجة مزعج. أول (لاحظ جلين) هي التي تحتاج إلى استخدام map_async
مع مهلة بدلا من map
من أجل الحصول على استجابة فورية (أي لا تنتهي تجهيز قائمة كاملة). والثاني (لاحظ أندريه) هو أن المعالجة المتعددة لا يصاب الاستثناءات التي لا ترث من Exception
(على سبيل المثال، SystemExit
). حتى هنا بلدي الحل الذي يتعامل مع كل من هذه:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
والجواب صوت لا تعالج القضية الأساسية ولكن أحد الآثار الجانبية مماثلة.
وجيسي Noller، صاحب مكتبة متعدد المعالجة، ويوضح كيفية التعامل بشكل صحيح مع CTRL + C عند استخدام multiprocessing.Pool
في القديم <لأ href = "http://jessenoller.com/blog/2009/01/08/multiprocessingpool -و-keyboardinterrupt "يختلط =" نوفولو noreferrer "> بلوق وظيفة .
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
ولقد وجدت، في الوقت الراهن، فإن أفضل حل هو عدم استخدام ميزة multiprocessing.pool بل لفة وظائف تجمع الخاصة بك. أنا قدمت مثالا يدل على خطأ مع apply_async وكذلك مثال يوضح كيفية تجنب استخدام وظيفة تجمع تماما.
http://www.bryceboe.com/ 2010/08/26 / بيثون متعدد المعالجة، وkeyboardinterrupt /
وأنا مبتدئ في بيثون. كنت أبحث في كل مكان للإجابة وتتعثر هذه وغيرها من بلوق وأشرطة فيديو يوتيوب الحصر. لقد حاولت نسخ لصق رمز البلاغ أعلاه وإعادة إنتاجه على بلدي الثعبان 2.7.13 في ويندوز 7 64- قليلا. أنها قريبة إلى ما أريد تحقيقه.
ولقد تقدمت عمليات طفلي تجاهل ControlC وجعل عملية الأصل تنتهي. يبدو تجاوز عملية طفل لا تجنب هذه المشكلة بالنسبة لي.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
والجزء ابتداء من الساعة pool.terminate()
لا يبدو لتنفيذه.
ويمكنك محاولة استخدام طريقة apply_async لكائن بركة، مثل هذا:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
وإخراج:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
وميزة هذه الطريقة هي أن النتائج معالجتها قبل أن تعاد انقطاع في القاموس النتائج:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
والغريب أنه يبدو أن لديك للتعامل مع KeyboardInterrupt
في الأطفال كذلك. كنت أتوقع هذا العمل كما هو مكتوب ... حاول تغيير slowly_square
إلى:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
ويجب أن تعمل كما هو متوقع.