Frage

Ich bin eine Funktion in Python aufrufen, die ich kenne kann stoppen und zwingen mich, um das Skript zu starten.

Wie rufe ich die Funktion oder das, was ich sie wickeln in so dass, wenn es länger dauert als 5 Sekunden das Skript bricht es und tut etwas anderes?

War es hilfreich?

Lösung

Sie können die Signal Paket, wenn Sie auf UNIX ausgeführt werden:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 Sekunden nach dem Aufruf alarm.alarm(10) wird der Handler aufgerufen. Dies wirft eine Ausnahme, dass Sie aus dem regulären Python-Code abfangen können.

Dieses Modul spielt nicht gut mit Gewinden (aber wer tut?)

Beachten Sie, dass , da wir eine Ausnahme, wenn Timeout geschieht erhöhen, kann es innerhalb der Funktion, zum Beispiel für eine solche Funktion am Ende aufgefangen und ignoriert:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

Andere Tipps

Sie können multiprocessing.Process verwenden, um genau das zu tun.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
  

Wie rufe ich die Funktion oder das, was ich sie wickeln in so dass, wenn es länger dauert als 5 Sekunden das Skript es bricht?

I posted a Kern , die diese Frage / Problem mit einem Dekorateur und einem threading.Timer löst. Hier ist es mit einer Panne.

Importe und Setups für die Kompatibilität

Es wurde mit Python 2 getestet und 3. Es soll auch unter Unix / Linux und Windows.

Zuerst werden die Importe. Dieser Versuch, den Code konsistent zu halten, unabhängig von der Python-Version:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Mit Version unabhängiger Code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Jetzt haben wir unsere Funktionalität aus der Standardbibliothek importiert.

exit_after Dekorateur

Als nächstes brauchen wir eine Funktion, um die main() vom Kind Thread zu beenden:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

Und hier ist der Dekorateur selbst:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Verwendung

Und hier ist die Nutzung, die Ihre Frage direkt nach 5 Sekunden über Verlassen antwortet:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Der zweite Funktionsaufruf wird nicht zu Ende, sondern der Prozess mit einem Rückverfolgungs verlassen soll!

KeyboardInterrupt nicht immer ein schlafender Thread stoppen

, dass der Schlaf Hinweis wird nicht immer durch eine Tastatur-Interrupt unterbrochen werden, auf Python 2 unter Windows, z.

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

noch ist es wahrscheinlich Code ausgeführt wird in Erweiterungen zu unterbrechen, wenn es nicht ausdrücklich für PyErr_CheckSignals() prüft finden Sie unter Cython, Python und KeyboardInterrupt ignoriert

ich vermeiden würde ein Thread mehr als eine Sekunde, in jedem Fall schlafen -. Das ist ein eon in Prozessorzeit

  

Wie rufe ich die Funktion oder was mache ich es umwickeln, so dass, wenn es länger dauert als 5 Sekunden das Skript bricht es und tut etwas anderes?

Um es zu fangen und etwas anderes tun, können Sie die KeyboardInterrupt fangen.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

Ich habe einen anderen Vorschlag, der eine reine Funktion (mit der gleichen API wie Einfädeln Vorschlag) ist und scheint gut zu funktionieren (basierend auf Vorschläge auf diesem Thread)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

ich auf diesen Thread lief, wenn für einen Timeout Anruf auf Unit-Tests zu suchen. Ich habe nichts einfach in den Antworten oder 3rd-Party-Pakete finden, damit ich die Dekorateur schrieb unten rechts in Code fallen können:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Dann ist es so einfach, wie dies ein Test oder eine beliebige Funktion Timeout Sie mögen:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

Es gibt viele Vorschläge, aber keine mit concurrent.futures, die ich denke, die meisten lesbar Weg, dies zu handhaben.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super einfach zu lesen und zu erhalten.

Wir machen einen Pool, einen einzelnen Prozess vorlegen und dann auf 5 Sekunden warten, bevor eine Timeouterror Erhöhung, die Sie fangen konnte und zu handhaben aber man benötigt.

Native zu Python 3.2 + und zurückportiert auf 2,7 (PiP-Futures installieren).

Umschalten zwischen Threads und Prozessen ist so einfach wie ersetzen ProcessPoolExecutor mit ThreadPoolExecutor.

Wenn Sie den Prozess auf Timeout zu beenden, ich suchen würde vorschlagen, in Pebble .

Das stopit Paket, auf pypi gefunden, scheint Timeouts gut zu handhaben.

ich den @stopit.threading_timeoutable Dekorateur mag, die einen timeout Parameter an die eingerichtete Funktion fügt, das tut, was Sie erwarten, stoppt er die Funktion.

Überprüfen Sie es heraus auf pypi: https://pypi.python.org/pypi/stopit

Große, einfach zu bedienen und zuverlässig PyPI Projekt Timeout-Dekorateur ( https://pypi.org/project/timeout-decorator/ )

Installation :

pip install timeout-decorator

Verwendung :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator funktioniert nicht auf Windows-System als, Fenster nicht signal unterstützt haben gut.

Wenn Sie Timeout-Dekorateur in Windows-System verwenden, erhalten Sie die folgende

AttributeError: module 'signal' has no attribute 'SIGALRM'

Einige schlugen vor, use_signals=False zu verwenden, aber nicht für mich gearbeitet.

Autor @bitranox das folgende Paket erstellt:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Code-Beispiel:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Gibt die folgende Ausnahme:

TimeoutError: Function mytest timed out after 5 seconds

Ich bin der Autor des wrapt_timeout_decorator

Die meisten der Lösungen hier vorgestellten Arbeit wunderfully unter Linux auf den ersten Blick - denn wir haben Gabel () und Signale () - aber an den Fenstern die Dinge sehen etwas anders aus. Und wenn es um subthreads auf Linux kommt, kann nicht Sie verwenden, um Signale mehr.

Um einen Prozess unter Windows, um zu laichen, es braucht aufsammelbare zu sein -. Und viele eingerichteten Funktionen oder Klassenmethoden sind nicht

Also Sie brauchen einen besseren pickler wie Dill verwenden und Multi-Prozess (nicht beizen und Multiprocessing) -. Das ist, warum Sie ProcessPoolExecutor (oder nur mit eingeschränkter Funktionalität) verwenden können nicht

Für das Timeout selbst - Sie müssen definieren, was Timeout bedeutet - denn unter Windows wird es erhebliche nehmen (und nicht bestimmbar) Zeit, um den Prozess zu laichen. Dies kann auf kurzen Timeouts schwierig sein. Nehmen wir an, das Laichen der Prozess etwa 0,5 Sekunden dauert (leicht !!!). Wenn Sie ein Timeout von 0,2 Sekunden, was geben soll passieren? Sollte die Funktion Auszeit nach 0,5 + 0,2 Sekunden (also lassen Sie die Methode Lauf 0,2 Sekunden)? Oder sollte die genannte Prozesszeit aus nach 0,2 Sekunden (in diesem Fall wird die eingerichtete Funktion IMMER Timeout, weil in dieser Zeit ist es nicht einmal gelaicht)?

Auch verschachtelte Dekorateure können böse sein und Sie kann nicht verwenden, um Signale in einem subthread. Wenn Sie möchten einen wirklich universellen, plattformübergreifende Dekorateur zu schaffen, all dies berücksichtigt werden muss (und getestet).

Weitere Themen Ausnahmen an den Aufrufer sind vorbei, sowie Protokollierungsprobleme (wenn in der dekorierten Funktion verwendet werden - in einem anderen Prozess zu Dateien Protokollierung wird nicht unterstützt)

Ich habe versucht, alle Grenzfälle abzudecken, Sie könnten das Paket wrapt_timeout_decorator schauen, oder zumindest testen Ihre eigenen Lösungen durch die Unittests dort verwendet inspiriert.

@Alexis Eggermont - vielleicht jemand anderes können Sie mitteilen, - - leider genug Punkte zu kommentieren Ich habe nicht. Ich glaube, ich Ihren Import Problem gelöst

Wir können Signale für das gleiche verwenden. Ich denke, das unter Beispiel für Sie von Nutzen sein wird. Es ist sehr einfach im Vergleich zu Fäden.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

hatte ich eine Notwendigkeit für nestbarer Weckalarme (die SIGALARM nicht tun können), die nicht von time.sleep blockiert wird erhalten (die der Thread-basierten Ansatz kann nicht tun). Ich landete Kopieren und leicht modifiziert Code von hier: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Der Code selbst:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

und ein Verwendungsbeispiel:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

Hier ist eine leichte Verbesserung der gegebenen Thread-basierte Lösung.

Der folgende Code unterstützt Ausnahmen :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Unter Berufung auf mit einer 5 Sekunden Timeout:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top