我正在 Python 中调用一个函数,我知道该函数可能会停止并迫使我重新启动脚本。

如何调用该函数或者将其包装在什么中,以便如果花费时间超过 5 秒,脚本就会取消该函数并执行其他操作?

有帮助吗?

解决方案

您可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

呼叫alarm.alarm(10)后10秒,该处理程序被调用。这就提出了可以由经常Python代码截获一个异常。

本模块不使用线程发挥好(但当时,谁做?)

注意该信息由于我们提出一个异常时,发生超时,则最终可能会捕获并在函数内部忽略,例如,一个这样的功能的:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

其他提示

您可以使用multiprocessing.Process做到这一点。

<强>代码

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
  

如何调用该函数或者我该怎么把它包装,这样,如果需要超过5秒的脚本取消吗?

我贴要旨与一个装饰和threading.Timer解决了这个问题/难题。这是一个细分。

进口和设置为兼容性

据与Python 2测试和3还应该在Unix / Linux和Windows。

工作

首先,进口。这些尝试以保持代码一致不管Python版本:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用版本无关的代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在我们已经从标准库中导入我们的功能。

exit_after装饰

接下来,我们需要一个函数来终止来自子线程的main()

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

这里是装饰本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

用法

及这里的直接回答你的约5秒后退出问题的用法:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

在第二函数调用将无法完成,代替这个过程应该与回溯退出!

KeyboardInterrupt并不总是停止休眠线程

请注意,睡眠不会总是通过键盘的中断,有关Python 2在Windows上,e.g:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

也不是可能中断扩展运行的代码,除非明确规定检查PyErr_CheckSignals(),请参阅用Cython,Python和一个KeyboardInterrupt忽略

我将避免睡线程超过一秒,在任何情况下 - 这是在处理器时间的EON

  

我如何调用该函数或者我该怎么把它包装,这样,如果需要超过5秒的脚本取消其并做别的东西吗?

要抓住它,做别的事情,你可以赶上一个KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

我有不同的提案,其为纯函数(具有相同的API螺纹建议)和似乎(基于在此线程建议)很好地工作

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

我关于单元测试超时呼叫搜索当跨越该线程运行。我没有发现任何的答案或第三方包装简单,所以我写了下面的装饰你可以将右转入代码:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后,它的这样简单超时你喜欢的测试或任何功能:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

stopit包,PyPI上发现,似乎处理超时良好。

我喜欢@stopit.threading_timeoutable装饰,增加了一个timeout参数的装饰功能,它做了你所期望的,它停止的功能。

检查出来PyPI上: https://pypi.python.org/pypi/stopit

很棒,易于使用且可靠 皮皮 项目 超时装饰器 (https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator不要在Windows系统中工作的,窗户不支持signal很好。

如果您在Windows系统中使用超时装饰,你会得到以下

AttributeError: module 'signal' has no attribute 'SIGALRM'

一些建议使用use_signals=False却没有为我工作。

@bitranox创建了以下包作者:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

提供了以下例外:

TimeoutError: Function mytest timed out after 5 seconds

我wrapt_timeout_decorator的作者

大多数这里介绍的解决方案的wunderfully Linux下的第一眼工作 - 因为我们有fork()和信号() - 但在Windows的东西看起来有点不同。 而且,当涉及到Linux上的子线程,你不能使用的信号了。

为了产卵Windows下的一个过程,它需要可拾取 - 和许多装饰功能或类方法不

所以,你需要使用一个更好的pickler喜欢莳萝和多进程(未腌多) - 这就是为什么你不能使用ProcessPoolExecutor(或仅具有有限功能)

有关超时本身 - 您需要定义超时意味着 - 因为在Windows上,将需要相当长的(而不是确定的)时间产卵的过程。这可以在较短的超时棘手。让我们假设,产卵的过程需要约0.5秒(容易!)。如果你给的0.2秒的超时会发生什么? 应0.5±0.2秒后的函数超时(因此,让0.2秒钟方法运行)? 还是应该0.2秒后,这个过程被称为超时(在这种情况下,装饰函数总是超时,因为在那个时候它甚至没有催生)?

另外嵌套装饰可以是肮脏和你在一个子thread不能使用的信号。如果你想创建一个真正具有普遍性的,跨平台的装饰,这一切都需要加以考虑(和测试)。

其它问题要传送异常返回给调用者,以及日志记录问题(如果在装饰功能中使用 - 记录到文件中的另一过程是不支持)

我试图覆盖所有边缘的情况下,你可能会考虑包装wrapt_timeout_decorator,或者至少测试自己的解决方案,通过使用存在的单元测试代码的启发。

@Alexis Eggermont - 不幸的是我没有足够的积分来评论 - 也许别人可以通知你 - 我想我解决你的问题,进口

我们可以使用的信号为相同的。我想下面的例子将是对您有用。相比于线程这是非常简单的。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

我需要 可嵌套的 定时中断(SIGALARM 无法做到)不会被 time.sleep 阻塞(基于线程的方法无法做到)。我最终从这里复制并稍微修改了代码: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

和一个使用示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

下面是略有改善给定的基于线程的溶液。

下面支撑的异常的代码

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

用5秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top