Frage

Wie kann ich umgehen KeyboardInterrupt Ereignisse mit Multiprozessing Pools der Python? Hier ist ein einfaches Beispiel:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Wenn Sie den Code oben ausgeführt, die KeyboardInterrupt angehoben wird, wenn ich ^C drücken, aber der Prozess hängt einfach an diesem Punkt, und ich habe es von außen zu töten.

Ich möchte ^C jederzeit in der Lage sein zu drücken und alle Prozesse verursachen anmutig zu beenden.

War es hilfreich?

Lösung

Dies ist ein Python-Fehler. Wenn für einen Zustand in threading.Condition.wait warten () wird KeyboardInterrupt nie gesendet. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Die KeyboardInterrupt Ausnahme wird erst wait () zurückkehrt geliefert werden, und es nie wieder, so dass die Unterbrechung nie passiert. KeyboardInterrupt sollte an Sicherheit grenzender Wahrscheinlichkeit eine Bedingung warten unterbrechen.

Beachten Sie, dass dies nicht der Fall, wenn ein Timeout angegeben wird; cond.wait (1) wird die Unterbrechungs sofort erhalten. So ist eine Behelfslösung ein Timeout angeben. Um das zu tun, ersetzen

    results = pool.map(slowly_square, range(40))

mit

    results = pool.map_async(slowly_square, range(40)).get(9999999)

oder ähnliches.

Andere Tipps

Von dem, was ich vor kurzem gefunden habe, ist die beste Lösung, um die Arbeitsprozesse einzurichten ist SIGINT ganz zu ignorieren, und beschränkt alle Bereinigungscodes zu dem übergeordneten Prozess. Dies behebt das Problem für beide Leerlauf und beschäftigt Arbeitsprozessen und erfordert keine Fehlercode in Ihrem Kind Prozesse Handhabung.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Erklärung und vollständige Beispielcode kann unter http://noswap.com/blog/ finden python-Multiprocessing-KeyboardInterrupt / und http://github.com/jreese/multiprocessing-keyboardinterrupt ist.

Aus irgendwelchen Gründen nur Ausnahmen von der Basis Exception Klasse geerbt werden in der Regel behandelt. Zur Umgehung des Problems können Sie Ihre KeyboardInterrupt als Exception Instanz erneut erhöhen:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalerweise würden Sie die folgende Ausgabe:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Wenn Sie also ^C treffen, erhalten Sie:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

In der Regel diese einfache Struktur funktioniert für Strg - C auf Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Wie bereits in wenigen ähnlichen Beiträgen angegeben:

in Python Capture-KeyboardInterrupt ohne Anprobe außer

Es scheint, gibt es zwei Probleme, die Ausnahmen zu machen, während ärgerlich Multiprozessing. Die erste (bereits von Glenn) ist, dass Sie map_async mit einem Timeout statt map, um eine sofortige Antwort zu erhalten (das heißt, nicht beendet die Bearbeitung die gesamte Liste) verwenden müssen. Die zweite (festgestellt durch Andrey) besteht darin, dass Multiprozessing keine Ausnahmen verfängt, die von Exception erbt nicht (zum Beispiel SystemExit). Also hier ist meine Lösung, die mit diesen beiden Angeboten:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

Die gestimmte Antwort anpacken nicht die Kernfrage, sondern eine ähnliche Nebenwirkung.

Jesse Noller, der Autor der Multiprozeß-Bibliothek, erklärt, wie man richtig mit STRG + C behandeln, wenn sie in einem alten Blog-Post .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

Ich fand, für den Augenblick, ist die beste Lösung nicht die multiprocessing.pool Funktion nutzen zu können, sondern Ihre eigene Pool-Funktionalität rollen. Ich lieferte ein Beispiel die Fehler mit apply_async demonstriert sowie einem Beispiel zeigt, wie die Pool-Funktionalität ganz zu vermeiden verwenden.

http://www.bryceboe.com/ 2010/08/26 / python-Multiprocessing-and-KeyboardInterrupt /

Ich bin ein Neuling in Python. Ich suchte überall nach Antworten und stolpern auf diese und ein paar anderen Blogs und YouTube-Videos. Ich habe versucht, den Autor angezeigten Code kopieren einfügen und reproduzieren es auf meinem Python 2.7.13 in Windows 7 64-Bit. Es ist nah an, was ich will erreichen.

ich mein Kind Prozesse machte die ControlC zu ignorieren und den übergeordnete Prozess machen beenden. Sieht aus wie das Kind Prozess zu umgehen dieses Problem für mich nicht vermeiden.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Der Teil an pool.terminate() Start scheint nie auszuführen.

Sie können versuchen, die apply_async Methode eines Pool-Objekt, wie folgt aus:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Ausgabe:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Ein Vorteil dieses Verfahrens besteht darin, dass die Ergebnisse verarbeitet, bevor Unterbrechung im Ergebnis Wörterbuch zurückgegeben werden:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Merkwürdigerweise sieht es aus wie Sie die KeyboardInterrupt in den Kindern als auch zu behandeln haben. Ich würde dies als geschrieben erwartet zu arbeiten ... versuchen slowly_square an sich ändernde:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Das sollte funktionieren wie erwartet.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top