Question

J'appelle une fonction en Python qui, je le sais, risque de caler et de me forcer à redémarrer le script.

Comment puis-je appeler la fonction ou en quoi je l'enveloppe pour que, si cela prend plus de 5 secondes, le script l'annule et fasse autre chose?

Était-ce utile?

La solution

Vous pouvez utiliser le package signal si vous utilisez UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 secondes après l'appel alarm.alarm(10), le gestionnaire est appelé. Cela soulève une exception que vous pouvez intercepter à partir du code Python normal.

Ce module ne fonctionne pas bien avec les threads (mais alors, qui le fait?)

Notez que étant donné que nous levons une exception lorsque le délai d'expiration est dépassé, il est possible qu'elle soit capturée et ignorée à l'intérieur de la fonction, par exemple d'une telle fonction:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

Autres conseils

Vous pouvez utiliser multiprocessing.Process pour faire exactement cela.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
  

Comment appeler la fonction ou en quoi je l'enveloppe pour que le script l'annule si elle dure plus de 5 secondes?

J'ai posté un gist qui résout ce problème / question avec un décorateur et un threading.Timer . La voici avec une panne.

Importations et configurations pour la compatibilité

Il a été testé avec Python 2 et 3. Il devrait également fonctionner sous Unix / Linux et Windows.

D'abord les importations. Celles-ci tentent de maintenir la cohérence du code quelle que soit la version de Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Utiliser un code indépendant de la version:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Nous avons maintenant importé nos fonctionnalités de la bibliothèque standard.

exit_after décorateur

Ensuite, nous avons besoin d’une fonction pour terminer le main() depuis le thread enfant:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

Et voici le décorateur lui-même:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Utilisation

Et voici l'utilisation qui répond directement à votre question sur la sortie après 5 secondes!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Démo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Le deuxième appel de fonction ne se terminera pas, mais le processus devrait se terminer avec un retour en arrière!

KeyboardInterrupt n'arrête pas toujours un thread en veille

Notez que la veille ne sera pas toujours interrompue par une interruption du clavier, sous Python 2 sous Windows, par exemple:

.
@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

Il n'est pas non plus susceptible d'interrompre le code exécuté dans des extensions, à moins que cela ne vérifie explicitement PyErr_CheckSignals(). Voir Cython, Python et KeyboardInterrupt ignored <. / a>

J'éviterais de laisser dormir un fil plus d'une seconde, quoi qu'il en soit - c'est un peu plus long que le temps processeur.

  

Comment appeler la fonction ou comment l'enrouler afin que le script l'annule si le script dure plus de 5 secondes et effectue autre chose?

Pour attraper et faire autre chose, vous pouvez attraper le KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

J'ai une proposition différente qui est une fonction pure (avec la même API que la suggestion de threading) et semble fonctionner correctement (sur la base de suggestions sur ce fil)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

J'ai rencontré ce fil lors de la recherche d'un appel de délai d'attente sur des tests unitaires. Je n'ai rien trouvé de simple dans les réponses ni dans les paquets d'une tierce partie. J'ai donc écrit au décorateur ci-dessous, vous pouvez y insérer directement le code:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Ensuite, il est aussi simple que cela d'exclure un test ou une fonction de votre choix:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

Le package stopit, disponible sur pypi, semble bien gérer les délais d'expiration.

J'aime le @stopit.threading_timeoutable décorateur, qui ajoute un paramètre timeout à la fonction décorée, qui fait ce que vous attendez, il arrête la fonction.

Découvrez-le sur pypi: https://pypi.python.org/pypi/stopit

Excellent projet PyPi , facile à utiliser et fiable décorateur de délai d'attente ( https://pypi.org/project/timeout-decorator/ )

installation :

pip install timeout-decorator

Utilisation :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator ne fonctionne pas sur le système Windows car Windows ne prend pas bien en charge signal.

Si vous utilisez timeout-decorator dans le système Windows, vous obtiendrez ce qui suit

AttributeError: module 'signal' has no attribute 'SIGALRM'

Certains ont suggéré d'utiliser use_signals=False mais n'ont pas fonctionné pour moi.

Author @bitranox a créé le package suivant:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Exemple de code:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Donne l'exception suivante:

TimeoutError: Function mytest timed out after 5 seconds

Je suis l'auteur de wrapt_timeout_decorator

La plupart des solutions présentées ici fonctionnent parfaitement sous Linux au premier abord - parce que nous avons fork () et signaux () - mais sous Windows, les choses sont un peu différentes. Et en ce qui concerne les sous-thèmes sous Linux, vous ne pouvez plus utiliser les signaux.

Pour générer un processus sous Windows, il doit être sélectionnable - et de nombreuses fonctions décorées ou méthodes de classe ne le sont pas.

Vous devez donc utiliser un meilleur pickler, comme l'aneth et le multitraitement (et non le pickle et le multitraitement) - c'est pourquoi vous ne pouvez pas utiliser ProcessPoolExecutor (ou uniquement avec des fonctionnalités limitées).

Pour le délai d'expiration lui-même - Vous devez définir le terme de délai d'expiration - car sous Windows, le processus prend un temps considérable (et non déterminable). Cela peut être délicat sur de brefs délais. Supposons que le processus prend environ 0,5 seconde (facilement !!!). Si vous donnez un délai d'attente de 0,2 seconde, que devrait-il se passer? La fonction doit-elle expirer après 0,5 + 0,2 seconde (laissez donc la méthode s'exécuter pendant 0,2 seconde)? Ou bien le processus appelé doit-il expirer après 0,2 seconde (dans ce cas, la fonction décorée sera TOUJOURS expirée, car à ce moment-là, elle n'est même pas engendrée)?

De plus, les décorateurs imbriqués peuvent être méchants et vous ne pouvez pas utiliser les signaux dans un sous-fil. Si vous souhaitez créer un décorateur multi-plateformes véritablement universel, vous devez prendre en compte tout cela (et le tester).

D'autres problèmes consistent à renvoyer des exceptions à l'appelant, ainsi que des problèmes de journalisation (s'ils sont utilisés dans la fonction décorée - la journalisation de fichiers dans un autre processus n'est PAS prise en charge)

.

J'ai essayé de couvrir tous les cas critiques. Vous pouvez regarder dans le paquet wrapt_timeout_decorator ou au moins tester vos propres solutions inspirées des tests unittests utilisés ici.

@Alexis Eggermont - malheureusement, je n'ai pas assez de points à commenter - peut-être que quelqu'un d'autre peut vous avertir - je pense avoir résolu votre problème d'importation.

Nous pouvons utiliser des signaux pour la même chose. Je pense que l'exemple ci-dessous vous sera utile. C'est très simple comparé aux threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

J'avais besoin d'interruptions temporisées imbriquables (ce que SIGALARM ne peut pas faire) qui ne seront pas bloquées par time.sleep (ce que l'approche basée sur les threads ne peut pas faire). J'ai fini par copier et modifier légèrement le code à partir d'ici: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Le code lui-même:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

et un exemple d'utilisation:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

Voici une légère amélioration par rapport à la solution basée sur les threads donnée.

Le code ci-dessous prend en charge les exceptions :

.
def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invocation avec un délai d'attente de 5 secondes:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top