Domanda

Come posso gestire gli eventi KeyboardInterrupt con i pool multiprocessore di Python? Ecco un semplice esempio:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Quando eseguo il codice sopra, il KeyboardInterrupt viene sollevato quando premo ^ C , ma il processo semplicemente si blocca a quel punto e devo ucciderlo esternamente.

Voglio essere in grado di premere ^ C in qualsiasi momento e fare in modo che tutti i processi terminino con grazia.

È stato utile?

Soluzione

Questo è un bug di Python. Durante l'attesa di una condizione in threading.Condition.wait (), KeyboardInterrupt non viene mai inviato. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

L'eccezione KeyboardInterrupt non verrà consegnata fino a quando wait () non ritorna e non ritorna mai, quindi l'interrupt non si verifica mai. KeyboardInterrupt dovrebbe quasi sicuramente interrompere una condizione di attesa.

Nota che ciò non accade se viene specificato un timeout; cond.wait (1) riceverà immediatamente l'interrupt. Pertanto, una soluzione alternativa consiste nello specificare un timeout. Per farlo, sostituisci

    results = pool.map(slowly_square, range(40))

con

    results = pool.map_async(slowly_square, range(40)).get(9999999)

o simile.

Altri suggerimenti

Da quanto ho scoperto di recente, la soluzione migliore è impostare i processi di lavoro in modo da ignorare del tutto SIGINT e limitare tutto il codice di pulizia al processo padre. Questo risolve il problema per i processi di lavoro inattivi e occupati e non richiede errori nella gestione del codice nei processi figlio.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Spiegazione e codice di esempio completo sono disponibili all'indirizzo http://noswap.com/blog/ python-multiprocessing-keyboardinterrupt / e http://github.com/jreese/multiprocessing-keyboardinterrupt rispettivamente.

Per alcuni motivi, vengono gestite normalmente solo le eccezioni ereditate dalla classe Exception di base. Per ovviare a questo problema, puoi aumentare nuovamente il tuo KeyboardInterrupt come istanza Exception :

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalmente otterrai il seguente output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Quindi, se premi ^ C , otterrai:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

Di solito questa semplice struttura funziona per Ctrl - C su Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Come indicato in alcuni post simili:

Cattura l'interruzione della tastiera in Python senza provare-tranne

Sembra che ci siano due problemi che rendono le eccezioni mentre il multiprocessing è fastidioso. Il primo (notato da Glenn) è che è necessario utilizzare map_async con un timeout anziché map per ottenere una risposta immediata (ovvero, non terminare l'elaborazione intero elenco). Il secondo (notato da Andrey) è che il multiprocessing non rileva eccezioni che non ereditano da Exception (ad es. SystemExit ). Quindi ecco la mia soluzione che si occupa di entrambi:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

La risposta votata non affronta la questione principale ma un effetto collaterale analogo.

Jesse Noller, l'autore della libreria multiprocessing, spiega come gestire correttamente CTRL + C quando si utilizza multiprocessing.Pool in un vecchio post di blog .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

Ho trovato, per il momento, la soluzione migliore non usare la funzione multiprocessing.pool ma piuttosto rotolare la propria funzionalità pool. Ho fornito un esempio che dimostra l'errore con apply_async e un esempio che mostra come evitare di utilizzare completamente la funzionalità del pool.

http://www.bryceboe.com/ 2010/08/26 / python-multiprocessing-e-KeyboardInterrupt /

Sono un principiante in Python. Cercavo ovunque risposte e inciampare in questo e in alcuni altri blog e video di YouTube. Ho provato a copiare incollare il codice dell'autore sopra e riprodurlo sul mio Python 2.7.13 in Windows 7 a 64 bit. È vicino a ciò che voglio ottenere.

Ho fatto in modo che i miei processi figlio ignorassero ControlC e facessero terminare il processo padre. Sembra che bypassare il processo figlio eviti questo problema per me.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

La parte che inizia con pool.terminate () non sembra mai essere eseguita.

Puoi provare a utilizzare il metodo apply_async di un oggetto Pool, in questo modo:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Un vantaggio di questo metodo è che i risultati elaborati prima dell'interruzione verranno restituiti nel dizionario dei risultati:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Stranamente sembra che tu debba gestire il KeyboardInterrupt anche nei bambini. Mi sarei aspettato che funzionasse come scritto ... prova a cambiare slow_square in:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Dovrebbe funzionare come previsto.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top