Tastatur Unterbricht mit Python Multiprocessing Pool
-
05-07-2019 - |
Frage
Wie kann ich umgehen KeyboardInterrupt Ereignisse mit Multiprozessing Pools der Python? Hier ist ein einfaches Beispiel:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
Wenn Sie den Code oben ausgeführt, die KeyboardInterrupt
angehoben wird, wenn ich ^C
drücken, aber der Prozess hängt einfach an diesem Punkt, und ich habe es von außen zu töten.
Ich möchte ^C
jederzeit in der Lage sein zu drücken und alle Prozesse verursachen anmutig zu beenden.
Lösung
Dies ist ein Python-Fehler. Wenn für einen Zustand in threading.Condition.wait warten () wird KeyboardInterrupt nie gesendet. Repro:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
Die KeyboardInterrupt Ausnahme wird erst wait () zurückkehrt geliefert werden, und es nie wieder, so dass die Unterbrechung nie passiert. KeyboardInterrupt sollte an Sicherheit grenzender Wahrscheinlichkeit eine Bedingung warten unterbrechen.
Beachten Sie, dass dies nicht der Fall, wenn ein Timeout angegeben wird; cond.wait (1) wird die Unterbrechungs sofort erhalten. So ist eine Behelfslösung ein Timeout angeben. Um das zu tun, ersetzen
results = pool.map(slowly_square, range(40))
mit
results = pool.map_async(slowly_square, range(40)).get(9999999)
oder ähnliches.
Andere Tipps
Von dem, was ich vor kurzem gefunden habe, ist die beste Lösung, um die Arbeitsprozesse einzurichten ist SIGINT ganz zu ignorieren, und beschränkt alle Bereinigungscodes zu dem übergeordneten Prozess. Dies behebt das Problem für beide Leerlauf und beschäftigt Arbeitsprozessen und erfordert keine Fehlercode in Ihrem Kind Prozesse Handhabung.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
Erklärung und vollständige Beispielcode kann unter http://noswap.com/blog/ finden python-Multiprocessing-KeyboardInterrupt / und http://github.com/jreese/multiprocessing-keyboardinterrupt ist.
Aus irgendwelchen Gründen nur Ausnahmen von der Basis Exception
Klasse geerbt werden in der Regel behandelt. Zur Umgehung des Problems können Sie Ihre KeyboardInterrupt
als Exception
Instanz erneut erhöhen:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
Normalerweise würden Sie die folgende Ausgabe:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
Wenn Sie also ^C
treffen, erhalten Sie:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
In der Regel diese einfache Struktur funktioniert für Strg - C auf Pool:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
Wie bereits in wenigen ähnlichen Beiträgen angegeben:
Es scheint, gibt es zwei Probleme, die Ausnahmen zu machen, während ärgerlich Multiprozessing. Die erste (bereits von Glenn) ist, dass Sie map_async
mit einem Timeout statt map
, um eine sofortige Antwort zu erhalten (das heißt, nicht beendet die Bearbeitung die gesamte Liste) verwenden müssen. Die zweite (festgestellt durch Andrey) besteht darin, dass Multiprozessing keine Ausnahmen verfängt, die von Exception
erbt nicht (zum Beispiel SystemExit
). Also hier ist meine Lösung, die mit diesen beiden Angeboten:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
Die gestimmte Antwort anpacken nicht die Kernfrage, sondern eine ähnliche Nebenwirkung.
Jesse Noller, der Autor der Multiprozeß-Bibliothek, erklärt, wie man richtig mit STRG + C behandeln, wenn sie in einem alten Blog-Post .
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
Ich fand, für den Augenblick, ist die beste Lösung nicht die multiprocessing.pool Funktion nutzen zu können, sondern Ihre eigene Pool-Funktionalität rollen. Ich lieferte ein Beispiel die Fehler mit apply_async demonstriert sowie einem Beispiel zeigt, wie die Pool-Funktionalität ganz zu vermeiden verwenden.
http://www.bryceboe.com/ 2010/08/26 / python-Multiprocessing-and-KeyboardInterrupt /
Ich bin ein Neuling in Python. Ich suchte überall nach Antworten und stolpern auf diese und ein paar anderen Blogs und YouTube-Videos. Ich habe versucht, den Autor angezeigten Code kopieren einfügen und reproduzieren es auf meinem Python 2.7.13 in Windows 7 64-Bit. Es ist nah an, was ich will erreichen.
ich mein Kind Prozesse machte die ControlC zu ignorieren und den übergeordnete Prozess machen beenden. Sieht aus wie das Kind Prozess zu umgehen dieses Problem für mich nicht vermeiden.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
Der Teil an pool.terminate()
Start scheint nie auszuführen.
Sie können versuchen, die apply_async Methode eines Pool-Objekt, wie folgt aus:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
Ausgabe:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
Ein Vorteil dieses Verfahrens besteht darin, dass die Ergebnisse verarbeitet, bevor Unterbrechung im Ergebnis Wörterbuch zurückgegeben werden:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
Merkwürdigerweise sieht es aus wie Sie die KeyboardInterrupt
in den Kindern als auch zu behandeln haben. Ich würde dies als geschrieben erwartet zu arbeiten ... versuchen slowly_square
an sich ändernde:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
Das sollte funktionieren wie erwartet.