Question

Comment gérer les événements KeyboardInterrupt avec les pools de multitraitement de python? Voici un exemple simple:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Lors de l'exécution du code ci-dessus, KeyboardInterrupt est soulevé lorsque j'appuie sur ^ C , mais le processus se bloque simplement à ce stade et je dois le tuer de manière externe. / p>

Je souhaite pouvoir appuyer sur ^ C à tout moment pour que tous les processus se terminent normalement.

Était-ce utile?

La solution

Ceci est un bogue Python. En attente d'une condition dans threading.Condition.wait (), KeyboardInterrupt n'est jamais envoyé. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

L'exception KeyboardInterrupt ne sera pas livrée jusqu'au retour de wait () et ne le sera jamais, donc l'interruption ne se produira jamais. KeyboardInterrupt devrait presque certainement interrompre une attente de condition.

Notez que cela ne se produit pas si un délai d'attente est spécifié; cond.wait (1) recevra l'interruption immédiatement. Une solution de contournement consiste donc à spécifier un délai d'expiration. Pour ce faire, remplacez

    results = pool.map(slowly_square, range(40))

avec

    results = pool.map_async(slowly_square, range(40)).get(9999999)

ou similaire.

Autres conseils

D'après ce que j'ai découvert récemment, la meilleure solution consiste à configurer les processus de travail pour qu'ils ignorent complètement SIGINT et à limiter tout le code de nettoyage au processus parent. Cela résout le problème des processus de travail inactifs et occupés et ne nécessite aucun code de traitement des erreurs dans vos processus enfants.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Des explications et des exemples de code complets sont disponibles à l'adresse http://noswap.com/blog/. python-multiprocessing-keyboardinterrupt / et http://github.com/jreese/multiprocessing-keyboardinterrupt respectivement.

Pour certaines raisons, seules les exceptions héritées de la classe de base Exception sont gérées normalement. En guise de solution de contournement, vous pouvez ré-afficher votre KeyboardInterrupt en tant qu'instance Exception :

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalement, vous obtiendrez la sortie suivante:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Donc, si vous appuyez sur ^ C , vous obtiendrez:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

Généralement, cette structure simple fonctionne pour Ctrl - C sur le pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Comme indiqué dans quelques messages similaires:

capturer une interruption de clavier en Python sans try-except

Il semble que deux problèmes fassent des exceptions lors du multitraitement gênant. La première (notée par Glenn) indique que vous devez utiliser map_async avec un délai d'attente au lieu de map afin d'obtenir une réponse immédiate (en d'autres termes, ne terminez pas le traitement du liste entière). La deuxième (notée par Andrey) est que le multitraitement ne capture pas les exceptions qui n’héritent pas de Exception (par exemple, SystemExit ). Voici donc ma solution qui traite de ces deux aspects:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

La réponse votée n'aborde pas le problème central mais un effet secondaire similaire.

Jesse Noller, l'auteur de la bibliothèque de multitraitement, explique comment gérer correctement CTRL + C lors de l'utilisation de multiprocessing.Pool dans un ancien article de blog .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

J'ai trouvé pour le moment que la meilleure solution consiste à ne pas utiliser la fonctionnalité multiprocessing.pool mais plutôt à déployer la fonctionnalité de votre propre pool. J'ai fourni un exemple démontrant l'erreur avec apply_async ainsi qu'un exemple montrant comment éviter d'utiliser complètement la fonctionnalité de pool.

http://www.bryceboe.com/ 2010/08/26 / python-multitraitement-et-interruption-clavier /

Je suis un débutant en Python. Je cherchais partout la réponse et tombais sur cela et quelques autres blogs et vidéos youtube. J'ai essayé de copier coller le code de l'auteur ci-dessus et de le reproduire sur mon python 2.7.13 sous Windows 7 64 bits. C'est proche de ce que je veux réaliser.

J'ai obligé mes processus enfants à ignorer le ControlC et à mettre fin au processus parent. On dirait que le fait de contourner le processus enfant évite ce problème pour moi.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

La partie commençant par pool.terminate () ne semble jamais s'exécuter.

Vous pouvez essayer d'utiliser la méthode apply_async d'un objet Pool, comme suit:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Sortie:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Un avantage de cette méthode est que les résultats traités avant l'interruption seront renvoyés dans le dictionnaire de résultats:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Étrangement, il semble que vous deviez également gérer le KeyboardInterrupt dans les enfants. Je me serais attendu à ce que cela fonctionne comme écrit ... essayez de remplacer slow_square par:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Cela devrait fonctionner comme prévu.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top