Délai d'attente sur un appel de fonction
-
20-08-2019 - |
Question
J'appelle une fonction en Python qui, je le sais, risque de caler et de me forcer à redémarrer le script.
Comment puis-je appeler la fonction ou en quoi je l'enveloppe pour que, si cela prend plus de 5 secondes, le script l'annule et fasse autre chose?
La solution
Vous pouvez utiliser le package signal si vous utilisez UNIX:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print "Forever is over!"
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print "sec"
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print exc
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
10 secondes après l'appel alarm.alarm(10)
, le gestionnaire est appelé. Cela soulève une exception que vous pouvez intercepter à partir du code Python normal.
Ce module ne fonctionne pas bien avec les threads (mais alors, qui le fait?)
Notez que étant donné que nous levons une exception lorsque le délai d'expiration est dépassé, il est possible qu'elle soit capturée et ignorée à l'intérieur de la fonction, par exemple d'une telle fonction:
def loop_forever():
while 1:
print 'sec'
try:
time.sleep(10)
except:
continue
Autres conseils
Vous pouvez utiliser multiprocessing.Process
pour faire exactement cela.
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate
p.terminate()
p.join()
Comment appeler la fonction ou en quoi je l'enveloppe pour que le script l'annule si elle dure plus de 5 secondes?
J'ai posté un gist qui résout ce problème / question avec un décorateur et un threading.Timer
. La voici avec une panne.
Importations et configurations pour la compatibilité
Il a été testé avec Python 2 et 3. Il devrait également fonctionner sous Unix / Linux et Windows.
D'abord les importations. Celles-ci tentent de maintenir la cohérence du code quelle que soit la version de Python:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
Utiliser un code indépendant de la version:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
Nous avons maintenant importé nos fonctionnalités de la bibliothèque standard.
exit_after
décorateur
Ensuite, nous avons besoin d’une fonction pour terminer le main()
depuis le thread enfant:
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
Et voici le décorateur lui-même:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
Utilisation
Et voici l'utilisation qui répond directement à votre question sur la sortie après 5 secondes!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
Démo:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
Le deuxième appel de fonction ne se terminera pas, mais le processus devrait se terminer avec un retour en arrière!
KeyboardInterrupt
n'arrête pas toujours un thread en veille
Notez que la veille ne sera pas toujours interrompue par une interruption du clavier, sous Python 2 sous Windows, par exemple:
.@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
Il n'est pas non plus susceptible d'interrompre le code exécuté dans des extensions, à moins que cela ne vérifie explicitement PyErr_CheckSignals()
. Voir Cython, Python et KeyboardInterrupt ignored <. / a>
J'éviterais de laisser dormir un fil plus d'une seconde, quoi qu'il en soit - c'est un peu plus long que le temps processeur.
Comment appeler la fonction ou comment l'enrouler afin que le script l'annule si le script dure plus de 5 secondes et effectue autre chose?
Pour attraper et faire autre chose, vous pouvez attraper le KeyboardInterrupt.
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
J'ai une proposition différente qui est une fonction pure (avec la même API que la suggestion de threading) et semble fonctionner correctement (sur la base de suggestions sur ce fil)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
J'ai rencontré ce fil lors de la recherche d'un appel de délai d'attente sur des tests unitaires. Je n'ai rien trouvé de simple dans les réponses ni dans les paquets d'une tierce partie. J'ai donc écrit au décorateur ci-dessous, vous pouvez y insérer directement le code:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
Ensuite, il est aussi simple que cela d'exclure un test ou une fonction de votre choix:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
Il y a beaucoup de suggestions, mais aucune n’utilise la méthode concurrent.futures, ce qui, à mon avis, est le moyen le plus lisible de gérer cela.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
Super simple à lire et à maintenir.
Nous créons un pool, soumettons un processus unique, puis attendons jusqu'à 5 secondes avant de générer une erreur TimeoutError que vous pouvez capturer et gérer à votre guise.
Natif de python 3.2+ et backporté de 2,7 (contrats futurs d'installation).
Pour basculer entre les threads et les processus, il suffit de remplacer ProcessPoolExecutor
par ThreadPoolExecutor
.
Si vous souhaitez mettre fin au processus après l'expiration du délai, je vous suggère de rechercher Pebble .
Le package stopit
, disponible sur pypi, semble bien gérer les délais d'expiration.
J'aime le @stopit.threading_timeoutable
décorateur, qui ajoute un paramètre timeout
à la fonction décorée, qui fait ce que vous attendez, il arrête la fonction.
Découvrez-le sur pypi: https://pypi.python.org/pypi/stopit
Excellent projet PyPi , facile à utiliser et fiable décorateur de délai d'attente ( https://pypi.org/project/timeout-decorator/ )
installation :
pip install timeout-decorator
Utilisation :
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
timeout-decorator
ne fonctionne pas sur le système Windows car Windows ne prend pas bien en charge signal
.
Si vous utilisez timeout-decorator dans le système Windows, vous obtiendrez ce qui suit
AttributeError: module 'signal' has no attribute 'SIGALRM'
Certains ont suggéré d'utiliser use_signals=False
mais n'ont pas fonctionné pour moi.
Author @bitranox a créé le package suivant:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
Exemple de code:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
Donne l'exception suivante:
TimeoutError: Function mytest timed out after 5 seconds
Je suis l'auteur de wrapt_timeout_decorator
La plupart des solutions présentées ici fonctionnent parfaitement sous Linux au premier abord - parce que nous avons fork () et signaux () - mais sous Windows, les choses sont un peu différentes. Et en ce qui concerne les sous-thèmes sous Linux, vous ne pouvez plus utiliser les signaux.
Pour générer un processus sous Windows, il doit être sélectionnable - et de nombreuses fonctions décorées ou méthodes de classe ne le sont pas.
Vous devez donc utiliser un meilleur pickler, comme l'aneth et le multitraitement (et non le pickle et le multitraitement) - c'est pourquoi vous ne pouvez pas utiliser ProcessPoolExecutor (ou uniquement avec des fonctionnalités limitées).
Pour le délai d'expiration lui-même - Vous devez définir le terme de délai d'expiration - car sous Windows, le processus prend un temps considérable (et non déterminable). Cela peut être délicat sur de brefs délais. Supposons que le processus prend environ 0,5 seconde (facilement !!!). Si vous donnez un délai d'attente de 0,2 seconde, que devrait-il se passer? La fonction doit-elle expirer après 0,5 + 0,2 seconde (laissez donc la méthode s'exécuter pendant 0,2 seconde)? Ou bien le processus appelé doit-il expirer après 0,2 seconde (dans ce cas, la fonction décorée sera TOUJOURS expirée, car à ce moment-là, elle n'est même pas engendrée)?
De plus, les décorateurs imbriqués peuvent être méchants et vous ne pouvez pas utiliser les signaux dans un sous-fil. Si vous souhaitez créer un décorateur multi-plateformes véritablement universel, vous devez prendre en compte tout cela (et le tester).
D'autres problèmes consistent à renvoyer des exceptions à l'appelant, ainsi que des problèmes de journalisation (s'ils sont utilisés dans la fonction décorée - la journalisation de fichiers dans un autre processus n'est PAS prise en charge)
.J'ai essayé de couvrir tous les cas critiques. Vous pouvez regarder dans le paquet wrapt_timeout_decorator ou au moins tester vos propres solutions inspirées des tests unittests utilisés ici.
@Alexis Eggermont - malheureusement, je n'ai pas assez de points à commenter - peut-être que quelqu'un d'autre peut vous avertir - je pense avoir résolu votre problème d'importation.
Nous pouvons utiliser des signaux pour la même chose. Je pense que l'exemple ci-dessous vous sera utile. C'est très simple comparé aux threads.
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
J'avais besoin d'interruptions temporisées imbriquables (ce que SIGALARM ne peut pas faire) qui ne seront pas bloquées par time.sleep (ce que l'approche basée sur les threads ne peut pas faire). J'ai fini par copier et modifier légèrement le code à partir d'ici: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
Le code lui-même:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
et un exemple d'utilisation:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'
Voici une légère amélioration par rapport à la solution basée sur les threads donnée.
Le code ci-dessous prend en charge les exceptions :
.def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
Invocation avec un délai d'attente de 5 secondes:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)