문제

나는 파이썬에서 함수를 호출하여 스크립트를 정지시키고 강제로 다시 시작할 수 있습니다.

5 초 이상이 걸리면 스크립트가 취소되고 다른 일을하도록 함수를 어떻게 호출합니까?

도움이 되었습니까?

해결책

당신은 그것을 사용할 수 있습니다 신호 패키지 UNIX에서 실행중인 경우 :

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

전화 후 10 초 alarm.alarm(10), 핸들러가 호출됩니다. 이는 일반적인 파이썬 코드를 가로 채울 수 있다는 예외가 발생합니다.

이 모듈은 스레드와 잘 작동하지 않습니다 (그러나 누가 그렇습니까?)

주목하십시오 시간 초과가 발생할 때 예외를 제기하기 때문에 함수 내부에서 잡히거나 무시할 수 있습니다 (예 : 예를 들어 그러한 함수).

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

다른 팁

당신이 사용할 수있는 multiprocessing.Process 정확히 그렇게합니다.

암호

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

5 초 이상이 걸리면 스크립트가 취소되도록 함수를 어떻게 호출합니까?

나는 a를 게시했다 요점 이 질문/문제를 데코레이터와 threading.Timer. 여기에는 고장이 있습니다.

호환성을위한 가져 오기 및 설정

Python 2와 3으로 테스트되었습니다. Unix/Linux 및 Windows에서도 작동해야합니다.

먼저 수입. Python 버전에 관계없이 코드를 일관성있게 유지하려고합니다.

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

버전 독립 코드 사용 :

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

이제 우리는 표준 라이브러리에서 기능을 가져 왔습니다.

exit_after 데코레이터

다음으로 종료하려면 기능이 필요합니다 main() 어린이 스레드에서 :

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

그리고 여기 데코레이터 자체가 있습니다.

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

용법

그리고 5 초 후 종료에 대한 질문에 직접 답변하는 사용법이 있습니다! :

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

데모:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

두 번째 함수 호출은 완료되지 않으며 프로세스는 추적으로 종료해야합니다!

KeyboardInterrupt 항상 수면 실을 멈추는 것은 아닙니다

Windows의 Python 2에서 키보드 인터럽트에 의해 수면이 항상 중단되는 것은 아닙니다.

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

명시 적으로 확인하지 않는 한 확장에서 실행되는 코드를 방해 할 가능성이 없습니다. PyErr_CheckSignals(), 보다 Cython, Python 및 Keyboardinterrupt는 무시되었습니다

나는 어떤 경우에도 1 초 이상 스레드를 자지 않을 것입니다. 그것은 프로세서 시간의 EON입니다.

5 초 이상이 걸리면 스크립트가 취소되도록 함수를 호출하는 방법 또는 무엇을 랩핑합니까? 그리고 다른 것이 있습니까?

그것을 잡고 다른 일을하려면 키보드 인터럽트를 잡을 수 있습니다.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

순수한 기능 (스레딩 제안과 동일한 API를 가진)이있는 다른 제안이 있으며 (이 스레드에 대한 제안을 기반으로) 잘 작동하는 것 같습니다.

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

단위 테스트에서 시간 초과 호출을 검색 할 때이 스레드를 가로 질러 달렸습니다. 답변이나 제 3 자 패키지에서 간단한 것을 찾지 못했기 때문에 아래 데코레이터를 썼습니다.

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

그런 다음 테스트 또는 원하는 기능을 시간을 제한하는 것만 큼 간단합니다.

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

많은 제안이 있지만 동시성을 사용하는 것은 없습니다.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

읽고 유지하기가 매우 간단합니다.

우리는 수영장을 만들고, 단일 프로세스를 제출 한 다음, 당신이 필요로하는 동안 잡을 수있는 타임 우어러를 올리기 전에 최대 5 초를 기다립니다.

Python 3.2+에 기초하고 2.7로 백포링되었습니다 (PIP 설치 선물).

스레드와 프로세스 사이를 전환하는 것은 교체만큼 간단합니다. ProcessPoolExecutor ~와 함께 ThreadPoolExecutor.

타임 아웃에서 프로세스를 종료하려면 조사하는 것이 좋습니다. 마노.

그만큼 stopit PYPI에서 발견 된 패키지는 시간을 잘 처리하는 것 같습니다.

나는 좋아한다 @stopit.threading_timeoutable 데코레이터, a timeout 장식 된 기능에 대한 매개 변수는 당신이 기대하는 것을 수행하면 기능을 중지합니다.

PYPI에서 확인하십시오 : https://pypi.python.org/pypi/stopit

훌륭하고 사용하기 쉽고 신뢰할 수 있습니다 pypi 프로젝트 타임 아웃 설명자 (https://pypi.org/project/timeout-decorator/)

설치:

pip install timeout-decorator

용법:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator Windows가 지원하지 않았기 때문에 Windows 시스템에서 작동하지 마십시오. signal 잘.

Windows 시스템에서 TimeOut-Decorator를 사용하면 다음을 받게됩니다.

AttributeError: module 'signal' has no attribute 'SIGALRM'

일부는 사용하도록 제안했습니다 use_signals=False 그러나 나를 위해 일하지 않았습니다.

저자 @bitranox는 다음 패키지를 만들었습니다.

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

코드 샘플 :

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

다음 예외를 제공합니다.

TimeoutError: Function mytest timed out after 5 seconds

저는 wrapt_timeout_decorator의 저자입니다

여기에 제시된 대부분의 솔루션은 Linux에서 첫눈에 분명하게 작동합니다. 포크 () 및 신호 () -이기 때문에 창에서는 약간 다르게 보입니다. 그리고 Linux의 하위 스레드에 관해서는 더 이상 신호를 사용할 수 없습니다.

Windows에서 프로세스를 생성하려면 양해질이 필요합니다. 많은 장식 기능이나 클래스 방법은 그렇지 않습니다.

따라서 Dill 및 Multiprocess (피클 및 멀티 프로세싱이 아님)와 같은 더 나은 피커를 사용해야합니다. 따라서 ProcessPooleExecutor (또는 제한된 기능을 가진)를 사용할 수 없습니다.

시간 초과 자체의 경우 - 시간 초과의 수단을 정의해야합니다. 창에서는 프로세스를 생성하는 데 상당한 시간이 걸리고 결정할 수 없기 때문입니다. 짧은 타임 아웃에서는 까다로울 수 있습니다. 프로세스를 산란하는 데 약 0.5 초가 걸립니다 (쉽게 !!!). 0.2 초의 시간 초과를 주면 어떻게해야합니까? 0.5 + 0.2 초 후에 기능 시간이 내려야합니까 (방법이 0.2 초 동안 실행되도록)? 아니면 0.2 초 후 호출 된 프로세스 타임 아웃이어야합니다 (이 경우 장식 된 기능은 항상 타임 아웃을 할 것입니다.

또한 중첩 데코레이터는 불쾌 할 수 있으며 서브 스레드에서 신호를 사용할 수 없습니다. 진정으로 보편적 인 크로스 플랫폼 데코레이터를 만들고 싶다면이 모든 것을 고려하고 테스트해야합니다.

다른 문제는 예외를 발신자에게 전달하고 로깅 문제 (장식 된 기능에 사용되는 경우 - 다른 프로세스에서 파일 로그인이 지원되지 않음)

나는 모든 Edge 케이스를 다루려고했는데, 당신은 패키지 wrapt_timeout_decorator 패키지를 조사하거나 적어도 그곳에서 사용 된 UnitTest에서 영감을 얻은 자신의 솔루션을 테스트 할 수 있습니다.

@alexis eggermont- 불행히도 나는 댓글을 달 수있는 충분한 포인트가 없습니다. 다른 사람이 당신에게 알릴 수있을 것입니다 - 나는 당신의 수입 문제를 해결했다고 생각합니다.

우리는 동일한 신호를 사용할 수 있습니다. 아래의 예는 당신에게 유용 할 것이라고 생각합니다. 스레드에 비해 매우 간단합니다.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

나는 필요했다 중첩 가능 시간에 따라 차단되지 않는 시간이 지정된 인터럽트 (Sigalarm이 할 수없는). 여기에서 코드를 복사하고 가볍게 수정했습니다. http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

코드 자체 :

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

그리고 사용 예 :

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

주어진 스레드 기반 솔루션의 약간의 개선이 있습니다.

아래 코드는 지원합니다 예외:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

5 초 타임 아웃으로 호출 :

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top