質問

Python で関数を呼び出していますが、この関数が停止してスクリプトの再起動が必要になる可能性があることがわかっています。

関数を呼び出すにはどうすればよいですか、または関数を何でラップすれば、5 秒以上かかる場合にスクリプトが関数をキャンセルして別の処理を実行できるようになりますか?

役に立ちましたか?

解決

あなたがUNIX上で実行されている場合は、

あなたは信号のパッケージを使用することができます:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10秒のコールalarm.alarm(10)後、ハンドラが呼び出されます。これは、通常のPythonコードから遮断できる例外を発生させます。

このモジュールは、スレッドとうまく再生されない(が、その後、ない誰?)

注そのタイムアウトが発生したとき、我々は例外を発生するので、それはそのような関数の例は、キャッチされ、関数内で無視されてしまうかもしれ

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue

他のヒント

あなたはまさにそれを行うためにmultiprocessing.Processを使用することができます。

コード

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

関数を呼び出すにはどうすればよいですか、または関数を何でラップすれば、5 秒以上かかる場合にスクリプトが関数をキャンセルできるようになりますか?

投稿しました 要旨 この質問/問題をデコレーターと threading.Timer. 。こちらが内訳付きです。

互換性のためのインポートとセットアップ

Python 2 と 3 でテストされました。Unix/Linux および Windows でも動作するはずです。

まずは輸入品。これらは、Python のバージョンに関係なくコードの一貫性を維持しようとします。

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

バージョンに依存しないコードを使用します。

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

これで、標準ライブラリから機能をインポートできました。

exit_after デコレータ

次に、を終了する関数が必要です。 main() 子スレッドから:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

そして、デコレーター自体は次のとおりです。

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

使用法

5 秒後の終了に関する質問に直接答える使用法を次に示します。

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

デモ:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

2 番目の関数呼び出しは終了せず、代わりにプロセスはトレースバックで終了する必要があります。

KeyboardInterrupt スリープ中のスレッドを常に停止するとは限りません

Windows 上の Python 2 では、スリープはキーボード割り込みによって常に中断されるわけではないことに注意してください。例:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

また、明示的にチェックしない限り、拡張機能で実行されているコードを中断することもありません。 PyErr_CheckSignals(), 、 見る Cython、Python、KeyboardInterrupt は無視される

いずれにせよ、スレッドを 1 秒以上スリープさせることは避けたいと思います。これはプロセッサー時間に長い時間がかかります。

関数を呼び出す方法、または関数を何でラップして、5 秒以上かかる場合にスクリプトによって関数がキャンセルされるようにするにはどうすればよいですか? そして何か他のことをしますか?

これをキャッチして別のことを行うには、KeyboardInterrupt をキャッチします。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

私は(スレッドの提案と同じAPIを持つ)純粋関数であり、(このスレッドの提案に基づいて)正常に動作するようです別の提案を持っている。

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
ユニットテストのタイムアウトコールを検索するときに

私はこのスレッドに出くわしました。あなたが右のコードにドロップすることができます下に私はデコレータを書いたので、私は答えまたはサードパーティのパッケージで、単純な何かを見つけることができませんでした:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

そして、これはあなたが好きテストまたは任意の関数をタイムアウトするように簡単です:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

の提案がたくさんありますが、どれも私はこれを処理するための最も読みやすい方法だと思いますこれは、concurrent.futuresを使用していない。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

スーパーシンプル読んで維持する。

私たちはプールを作り、あなたはしかし、あなたが必要とキャッチし、処理できるTimeoutErrorを上げる前に、5秒まで待機し、その後、単一のプロセスを提出します。

のpythonにネイティブ3.2+および2.7にバックポート(先物をインストールPIP)ます。

スレッドとプロセスの切り替えがProcessPoolExecutorThreadPoolExecutorを交換するのと同じくらい簡単である。

あなたはタイムアウトにプロセスを終了させたい場合は、

私はペブルのに探してお勧めします。

は、PyPIで見つけstopitパッケージは、タイムアウトうまく扱えるようです。

私は何を期待し飾ら関数に@stopit.threading_timeoutableパラメータを追加しtimeoutデコレータを、好き、それが機能を停止します。

は、PyPI上でそれをチェックアウト: https://pypi.python.org/pypi/stopit

素晴らしく、使いやすく、信頼性が高い ピピ プロジェクト タイムアウトデコレータ (https://pypi.org/project/timeout-decorator/)

インストール:

pip install timeout-decorator

使用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

Windowsがうまくtimeout-decoratorをサポートしていませんでした、とsignalは、Windowsシステム上では動作しません。

Windowsシステムでタイムアウトデコレータを使用する場合は、あなたが得る次の

AttributeError: module 'signal' has no attribute 'SIGALRM'

いくつかはuse_signals=Falseを使用することを示唆したが、私のために働いていませんでした。

次のパッケージを作成し@bitranox著者:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

コードサンプルます:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

次の例外を与えます:

TimeoutError: Function mytest timed out after 5 seconds

私はwrapt_timeout_decoratorの著者

ここで紹介するソリューションのほとんどは一見にLinuxでwunderfully仕事 - 私たちはfork()と信号を()持っているので、 - が、Windows上で、物事は少し違って見えます。 それはLinux上でサブスレッドに来るときそして、あなたはもう信号を使用するカントます。

Windowsでプロセスを生成するためには、pickableにする必要がある - と多くの装飾の関数やクラスメソッドではありません。

。 あなたは(限られた機能のみや)ProcessPoolExecutorを使用傾ける理由thatsの。

-

だから、あなたは(酸洗い及びマルチプロセッシングない)ディルとマルチのような、より良いPicklerさんを使用する必要があります。

タイムアウト自身のために - Windows上で、それがプロセスを生成するために、かなり(と算定できない)時間がかかりますので、 - あなたは何を意味するか、タイムアウトを定義する必要があります。これは、短いタイムアウトに注意が必要です。仮定します、プロセスを生成することは(簡単!!!)約0.5秒かかります。あなたは0.2秒のタイムアウトを与えると何が起こるのでしょうか? 0.5 + 0.2秒後に機能タイムアウトは、(その方法は、0.2秒間実行してみましょう)すべきか? または0.2秒後と呼ばれるプロセスのタイムアウトは、(その場合には、装飾された関数は常にタイムアウト、その時にそれも生み出されていないため)すべきか?

また、ネストされたデコレーターは厄介なことができ、あなたはサブスレッドにシグナルを使用傾けます。あなたが本当に普遍的な、クロスプラットフォームのデコレータを作成する場合は、このすべてを考慮して(とテスト)する必要があります。

(装飾された関数の中で使用されている場合 - 別のプロセス内のファイルへのロギングがサポートされていません)

他の問題は、呼び出し元に例外を渡しているだけでなく、伐採の問題

私は、あなたは、パッケージwrapt_timeout_decoratorに見て、あるいは少なくともそこ使用ユニットテストに触発されたあなた自身のソリューションをテストする可能性がある、すべてのエッジケースをカバーしようとします。

@Alexis Eggermont - 残念ながら、私がコメントするのに十分なポイントを持っていけない - 多分他の誰かがあなたに通知することができます - 。私はあなたの輸入問題を解決すると思います。

私たちは、同じのための信号を使用することができます。私は、以下の例では、あなたのために有用であろうと思います。これは、スレッドに比べて非常に簡単です。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

必要がありました ネスト可能 time.sleep によってブロックされない時間指定割り込み (SIGALARM では実行できません) (スレッドベースのアプローチでは実行できません)。結局、ここからコードをコピーして軽く変更しました。 http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

コード自体:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

そして使用例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

ここで指定されたスレッドベースのソリューションに対してわずかな改善である。

サポートの例外の下のコード

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

5秒のタイムアウトでそれを呼び出す

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top