質問

PythonのマルチプロセッシングプールでKeyboardInterruptイベントを処理するにはどうすればよいですか?以下に簡単な例を示します。

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

上記のコードを実行すると、 ^ C を押すと KeyboardInterrupt が発生しますが、その時点でプロセスがハングし、外部で強制終了する必要があります。

いつでも ^ C を押して、すべてのプロセスを正常に終了できるようにします。

役に立ちましたか?

解決

これはPythonのバグです。 threading.Condition.wait()で条件を待機している場合、KeyboardInterruptは送信されません。再現:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

wait()が戻るまでKeyboardInterrupt例外は配信されず、戻ることはないため、割り込みは発生しません。 KeyboardInterruptは、ほぼ確実に条件待機を中断するはずです。

タイムアウトが指定されている場合、これは発生しません。 cond.wait(1)はすぐに割り込みを受け取ります。そのため、回避策はタイムアウトを指定することです。それを行うには、次を置き換えます

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

または類似。

他のヒント

最近発見したことから、最良の解決策はワーカープロセスをセットアップしてSIGINTを完全に無視し、すべてのクリーンアップコードを親プロセスに限定することです。これにより、アイドルワーカープロセスとビジーワーカープロセスの両方の問題が修正され、子プロセスでエラー処理コードが不要になります。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

説明と完全なサンプルコードは、 http://noswap.com/blog/にあります。 python-multiprocessing-keyboardinterrupt / および http://github.com/jreese/multiprocessing-keyboardinterrupt それぞれ。

何らかの理由で、基本 Exception クラスから継承された例外のみが正常に処理されます。回避策として、 KeyboardInterrupt Exception インスタンスとして再発生させることができます:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常、次の出力が得られます。

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

したがって、 ^ C を押すと、以下が得られます。

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

通常、この単純な構造は、プール上の Ctrl - C で機能します:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

いくつかの同様の投稿で述べられているように:

try-exceptなしでPythonでキーボード割り込みをキャプチャ

迷惑なマルチプロセス中に例外を作る2つの問題があるようです。 1つ目(グレン)は、すぐに応答を得るために、 map ではなく map_async をタイムアウトで使用する必要があることです(つまり、リスト全体)。 2番目(Andreyが指摘)は、マルチプロセッシングが Exception から継承しない例外(たとえば、 SystemExit )をキャッチしないことです。これらの両方に対処する私のソリューションは次のとおりです。

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

投票された回答は、コアの問題に対処するのではなく、同様の副作用に対処します。

マルチプロセッシングライブラリの作成者であるJesse Nollerは、 multiprocessing.Pool を古いブログ投稿

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

現時点では、multiprocessing.pool機能を使用せずに、独自のプール機能をロールすることが最善の解決策であることがわかりました。 apply_asyncでエラーを示す例と、プール機能の使用を完全に回避する方法を示す例を提供しました。

http://www.bryceboe.com/ 2010/08/26 / python-multiprocessing-and-keyboardinterrupt /

私はPythonの初心者です。私はどこでも答えを探していましたが、これや他のいくつかのブログやyoutubeビデオを見つけました。上記の作者のコードをコピーして、Windows 7 64ビットのPython 2.7.13で再現しようとしました。それは私が達成したいものに近い。

子プロセスにControlCを無視させ、親プロセスを終了させました。子プロセスをバイパスすると、この問題を回避できるように見えます。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

pool.terminate()で始まる部分は実行されないようです。

次のように、Poolオブジェクトのapply_asyncメソッドを使用してみてください:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

出力:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

この方法の利点は、中断する前に処理された結果が結果辞書に返されることです:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

奇妙なことに、子の KeyboardInterrupt も処理する必要があるように見えます。私はこれが書かれた通りに動作することを期待していました... slowly_square を次のように変更してみてください:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

これは期待どおりに機能するはずです。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top