Pergunta

Como posso lidar com eventos KeyboardInterrupt com piscinas multiprocessamento do python? Aqui está um exemplo simples:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Ao executar o código acima, o KeyboardInterrupt se levantou quando eu pressionar ^C, mas o processo simplesmente trava nesse ponto e eu tenho que matá-lo externamente.

Eu quero ser capaz de imprensa ^C a qualquer momento e causar todos os processos para sair normalmente.

Foi útil?

Solução

Este é um bug Python. Quando à espera de uma condição na threading.Condition.wait (), KeyboardInterrupt nunca é enviada. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

A exceção KeyboardInterrupt não será entregue até wait () retorna, e nunca retorna, assim que a interrupção nunca acontece. KeyboardInterrupt deve quase certamente interrupção de uma espera condição.

Note que isso não aconteceria se um tempo limite é especificado; cond.wait (1) receberá a interrupção imediatamente. Assim, a solução é especificar um tempo limite. Para fazer isso, substitua

    results = pool.map(slowly_square, range(40))

com

    results = pool.map_async(slowly_square, range(40)).get(9999999)

ou similar.

Outras dicas

Pelo que eu descobri recentemente, a melhor solução é configurar os processos de trabalho para ignorar SIGINT completamente, e confinar todo o código de limpeza para o processo pai. Isso corrige o problema tanto para os processos de trabalho ocupados ocioso e, e não requer nenhum tratamento de erro de código em seus processos filhos.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Explicação e exemplo de código completo pode ser encontrado em http://noswap.com/blog/ -python multiprocessamento-KeyboardInterrupt / e http://github.com/jreese/multiprocessing-keyboardinterrupt , respectivamente.

Por algumas razões, únicas exceções herdados da classe base Exception são tratados normalmente. Como alternativa, você pode voltar a aumentar o seu KeyboardInterrupt como uma instância Exception:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalmente, você teria o seguinte resultado:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Então, se você bater ^C, você vai ter:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

Normalmente, esta estrutura simples funciona para Ctrl - C em Piscina:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Como foi indicado em alguns posts semelhantes:

em Python sem try-exceto

Parece que há duas questões que fazem exceções enquanto multiprocessamento irritante. O primeiro (observado por Glenn) é que você precisa usar map_async com um tempo limite em vez de map a fim de obter uma resposta imediata (ou seja, não terminar o processamento da lista inteira). O segundo (observado por Andrei) é que multiprocessamento não pega excepções que não herdam Exception (por exemplo, SystemExit). Então aqui está a minha solução que lida com ambos:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

A resposta votou não aborda a questão central, mas um efeito colateral similar.

Jesse Noller, o autor da biblioteca de multiprocessamento, explica como lidar corretamente com CTRL + C quando se utiliza multiprocessing.Pool em uma idade Blog Post .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

Eu encontrei, por enquanto, a melhor solução é não usar o recurso multiprocessing.pool mas sim rolar sua própria funcionalidade piscina. I forneceu um exemplo que demonstra o erro com apply_async, bem como um exemplo mostrando como evitar o uso da funcionalidade piscina completamente.

http://www.bryceboe.com/ 2010/08/26 / python-multiprocessamento-and-KeyboardInterrupt /

Eu sou um novato em Python. Eu estava procurando por toda parte para resposta e tropeçar este e alguns outros blogs e vídeos do youtube. Eu tentei copiar e colar o código do autor acima e reproduzi-lo no meu python 2.7.13 no Windows 7 64 bits. É perto do que eu quero alcançar.

Eu fiz meus processos filhos a ignorar a ControlC e tornar o processo pai terminar. Looks como ignorando o processo de criança não evitar este problema para mim.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

A parte a partir de pool.terminate() parece nunca executar.

Você pode tentar usar o método apply_async de um objeto exterior, como este:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Uma vantagem deste método é que os resultados processados ??antes da interrupção serão devolvidos nos resultados Dicionário:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Curiosamente, parece que você tem que lidar com a KeyboardInterrupt nas crianças também. Eu teria esperado que isso funcione como escrito ... tente alterar slowly_square a:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Isso deve funcionar como o esperado.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top