Вопрос

Я вызываю функцию на Python, которая, как я знаю, может заглохнуть и заставить меня перезапустить скрипт.

Как мне вызвать функцию или во что мне ее обернуть, чтобы, если это заняло больше 5 секунд, скрипт отменял ее и делал что-то еще?

Это было полезно?

Решение

Вы можете использовать сигнал пакет, если вы работаете в UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

через 10 секунд после звонка alarm.alarm(10), вызывается обработчик.Это вызывает исключение, которое вы можете перехватить из обычного кода Python.

Этот модуль плохо работает с потоками (но тогда, кто это делает?)

Обратите внимание , что поскольку мы создаем исключение, когда происходит тайм-аут, оно может в конечном итоге быть перехвачено и проигнорировано внутри функции, например, одной из таких функций:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

Другие советы

Вы можете использовать multiprocessing.Process чтобы сделать именно это.

Код

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

Как мне вызвать функцию или во что мне ее обернуть, чтобы, если это заняло больше 5 секунд, скрипт ее отменил?

Я опубликовал суть это решает этот вопрос / проблему с помощью декоратора и threading.Timer.Вот оно с пробоем.

Импорт и настройки для обеспечения совместимости

Он был протестирован с помощью Python 2 и 3.Он также должен работать под Unix / Linux и Windows.

Сначала импорт.Они пытаются сохранить согласованность кода независимо от версии Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Используйте код, не зависящий от версии:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Теперь мы импортировали нашу функциональность из стандартной библиотеки.

exit_after декоратор

Далее нам нужна функция для завершения main() из дочернего потока:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

А вот и сам декоратор:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Использование

И вот использование, которое напрямую отвечает на ваш вопрос о выходе через 5 секунд!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

ДЕМОНСТРАЦИЯ:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Второй вызов функции не завершится, вместо этого процесс должен завершиться с обратным отслеживанием!

KeyboardInterrupt не всегда останавливает спящий поток

Обратите внимание, что режим ожидания не всегда будет прерываться прерыванием клавиатуры в Python 2 в Windows, например:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

это также вряд ли приведет к прерыванию кода, выполняемого в расширениях, если только оно явно не проверяет наличие PyErr_CheckSignals(), видеть Cython, Python и KeyboardInterrupt игнорируются

В любом случае я бы избегал прерывания потока более чем на секунду - это целая вечность процессорного времени.

Как мне вызвать функцию или во что мне ее обернуть, чтобы, если это заняло больше 5 секунд, скрипт ее отменил и делает что-то еще?

Чтобы перехватить это и сделать что-то еще, вы можете перехватить KeyboardInterrupt .

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

У меня есть другое предложение, которое представляет собой чистую функцию (с тем же API, что и предложение о потоковой передаче) и, кажется, работает нормально (на основе предложений в этом потоке)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

Я наткнулся на этот поток при поиске тайм-аута вызова в модульных тестах.Я не нашел ничего простого в ответах или сторонних пакетах, поэтому я написал декоратор ниже, который вы можете перейти прямо в код:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Тогда это так же просто, как это - отложить тест или любую функцию, которая вам нравится:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

Есть много предложений, но ни одно не использует concurrent.futures, что, я думаю, является наиболее разборчивым способом справиться с этим.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Очень прост в чтении и обслуживании.

Мы создаем пул, отправляем один процесс, а затем ждем до 5 секунд, прежде чем вызвать ошибку TimeoutError, которую вы могли бы перехватить и обработать так, как вам нужно.

Родной для python 3.2+ и перенесенный в 2.7 (фьючерсы на установку pip).

Переключение между потоками и процессами так же просто, как замена ProcessPoolExecutor с ThreadPoolExecutor.

Если вы хотите завершить процесс по таймауту, я бы предложил изучить Галька.

В stopit пакет, найденный в pypi, похоже, хорошо справляется с тайм-аутами.

Мне нравится @stopit.threading_timeoutable декоратор, который добавляет timeout параметр для оформленной функции, которая делает то, что вы ожидаете, она останавливает функцию.

Проверьте это на pypi: https://pypi.python.org/pypi/stopit

Отличный, простой в использовании и надежный PyPI ( ПиПи ) проект тайм-аут-декоратор (https://pypi.org/project/timeout-decorator/)

установка:

pip install timeout-decorator

Использование:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator не работает в системе Windows , поскольку Windows не поддерживала signal что ж.

Если вы используете timeout-decorator в системе Windows, вы получите следующее

AttributeError: module 'signal' has no attribute 'SIGALRM'

Некоторые предлагали использовать use_signals=False но у меня это не сработало.

Автор @bitranox создал следующий пакет:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Пример кода:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Выдает следующее исключение:

TimeoutError: Function mytest timed out after 5 seconds

Я являюсь автором wrapt_timeout_decorator

Большинство представленных здесь решений на первый взгляд замечательно работают под Linux - потому что у нас есть fork() и signals(), - но в Windows все выглядит немного по-другому.И когда дело доходит до подпотоков в Linux, вы больше не можете использовать Сигналы.

Чтобы запустить процесс под Windows, он должен быть доступен для выбора, а многие оформленные функции или методы класса таковыми не являются.

Таким образом, вам нужно использовать лучший рассол, такой как dill и multiprocess (не рассол и многопроцессорность) - вот почему вы не можете использовать ProcessPoolExecutor (или только с ограниченной функциональностью).

Что касается самого тайм-аута - вам нужно определить, что означает тайм-аут - потому что в Windows для запуска процесса потребуется значительное (и не поддающееся определению) время.Это может быть непросто при коротких тайм-аутах.Давайте предположим, что запуск процесса занимает около 0,5 секунды (легко !!!).Если вы дадите тайм - аут в 0,2 секунды , что должно произойти ?Должно ли время ожидания функции истекать через 0,5 + 0,2 секунды (поэтому позвольте методу работать в течение 0,2 секунды)?Или время ожидания вызываемого процесса должно истечь через 0,2 секунды (в этом случае оформленная функция ВСЕГДА будет иметь тайм-аут, потому что за это время она даже не была создана)?

Кроме того, вложенные декораторы могут быть неприятными, и вы не можете использовать сигналы в подпотоке.Если Вы хотите создать действительно универсальный кроссплатформенный декоратор, все это необходимо принять во внимание (и протестировать).

Другими проблемами являются передача исключений обратно вызывающей стороне, а также проблемы с протоколированием (если используется в оформленной функции - протоколирование в файлы в другом процессе НЕ поддерживается)

Я попытался охватить все крайние случаи, вы могли бы заглянуть в пакет wrapt_timeout_decorator или, по крайней мере, протестировать свои собственные решения, вдохновленные используемыми там unittests.

@Alexis Eggermont - к сожалению, у меня недостаточно баллов для комментариев - возможно, кто-то другой сможет уведомить Вас - я думаю, что решил вашу проблему с импортом.

Мы можем использовать сигналы для того же самого.Я думаю, приведенный ниже пример будет вам полезен.Это очень просто по сравнению с потоками.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

У меня была потребность в гнездящийся временные прерывания (которые SIGALARM не может выполнить), которые не будут заблокированы time.sleep (чего не может сделать подход, основанный на потоках).В итоге я скопировал и слегка изменил код отсюда: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Сам код:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

и пример использования:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

Вот небольшое улучшение по сравнению с данным решением, основанным на потоках.

Приведенный ниже код поддерживает исключения:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Вызываю его с 5-секундным таймаутом:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top