Pregunta

¿Cómo puedo manejar eventos KeyboardInterrupt con grupos de multiprocesamiento de python? Aquí hay un ejemplo simple:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Al ejecutar el código anterior, el KeyboardInterrupt se levanta cuando presiono ^ C , pero el proceso simplemente se bloquea en ese punto y tengo que matarlo externamente.

Quiero poder presionar ^ C en cualquier momento y hacer que todos los procesos salgan correctamente.

¿Fue útil?

Solución

Este es un error de Python. Cuando se espera una condición en threading.Condition.wait (), KeyboardInterrupt nunca se envía. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

La excepción KeyboardInterrupt no se entregará hasta que devuelva wait (), y nunca vuelva, por lo que la interrupción nunca ocurre. KeyboardInterrupt debería casi con toda seguridad interrumpir una condición de espera.

Tenga en cuenta que esto no sucede si se especifica un tiempo de espera; cond.wait (1) recibirá la interrupción inmediatamente. Por lo tanto, una solución es especificar un tiempo de espera. Para hacer eso, reemplaza

    results = pool.map(slowly_square, range(40))

con

    results = pool.map_async(slowly_square, range(40)).get(9999999)

o similar.

Otros consejos

Por lo que he encontrado recientemente, la mejor solución es configurar los procesos de trabajo para que ignoren el SIGINT por completo, y confinar todo el código de limpieza al proceso principal. Esto soluciona el problema tanto para los procesos de trabajo inactivos como para los ocupados, y no requiere ningún código de manejo de errores en sus procesos secundarios.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

La explicación y el código de ejemplo completo se pueden encontrar en http://noswap.com/blog/ python-multiprocessing-keyboardinterrupt / y http://github.com/jreese/multiprocessing-keyboardinterrupt respectivamente.

Por algunas razones, solo las excepciones heredadas de la base Exception se manejan normalmente. Como solución alternativa, puede volver a subir su KeyboardInterrupt como una instancia de Exception :

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalmente obtendrías la siguiente salida:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Entonces, si presionas ^ C , obtendrás:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

Por lo general, esta estructura simple funciona para Ctrl - C en Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Como se dijo en algunas publicaciones similares:

Captura el teclado interrumpido en Python sin intentarlo excepto

Parece que hay dos problemas que hacen que las excepciones al multiprocesamiento sean molestas. El primero (señalado por Glenn) es que necesita usar map_async con un tiempo de espera en lugar de mapa para obtener una respuesta inmediata (es decir, no termine de procesar el lista completa). El segundo (señalado por Andrey) es que el multiprocesamiento no detecta las excepciones que no se heredan de Exception (por ejemplo, SystemExit ). Así que aquí está mi solución que trata con estos dos:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

La respuesta votada no aborda el problema central sino un efecto secundario similar.

Jesse Noller, el autor de la biblioteca de multiprocesamiento, explica cómo manejar CTRL + C correctamente al usar multiprocessing.Pool en un antiguo publicación de blog .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

Por el momento, encontré que la mejor solución es no usar la función multiprocessing.pool, sino rodar su propia funcionalidad de pool. Proporcioné un ejemplo que demuestra el error con apply_async, así como un ejemplo que muestra cómo evitar el uso total de la funcionalidad de la agrupación.

http://www.bryceboe.com/ 2010/08/26 / python-multiprocessing-and-keyboardinterrupt /

Soy un novato en Python. Buscaba respuestas en todas partes y me topé con este y algunos otros blogs y videos de YouTube. He intentado copiar, pegar el código del autor anterior y reproducirlo en mi python 2.7.13 en Windows 7 de 64 bits. Está cerca de lo que quiero lograr.

Hice mis procesos secundarios para ignorar el ControlC y hacer que el proceso principal finalice. Parece que pasar por alto el proceso secundario evita este problema para mí.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

La parte que comienza en pool.terminate () nunca parece ejecutarse.

Puedes intentar usar el método apply_async de un objeto Pool, como este:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Salida:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Una ventaja de este método es que los resultados procesados ??antes de la interrupción se devolverán en el diccionario de resultados:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Por extraño que parezca, tienes que manejar el KeyboardInterrupt en los niños también. Hubiera esperado que esto funcionara como está escrito ... intente cambiar slowly_square a:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Eso debería funcionar como esperabas.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top