Pergunta

Eu estou chamando uma função em Python que eu sei que pode parar e me forçar a reiniciar o script.

Como faço para chamar a função ou o que eu envolvê-la em modo que se leva mais de 5 segundos o script cancela-la e faz outra coisa?

Outras dicas

Você pode usar multiprocessing.Process para fazer exatamente isso.

Código

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

Como faço para chamar a função ou o que eu envolvê-la em modo que se leva mais de 5 segundos o script cancela-lo?

Eu postei um essência que resolve esta questão / problema com um decorador e uma threading.Timer. Aqui está com uma avaria.

Importações e configurações para compatibilidade

Foi testado com Python 2 e 3. Ele também deve funcionar em Unix / Linux e Windows.

Primeiro as importações. Estes tentativa de manter o código consistente, independentemente da versão Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

código independente Use versão:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Agora temos importados nossa funcionalidade da biblioteca padrão.

exit_after decorador

Em seguida temos uma função para encerrar o main() do segmento infantil:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

E aqui está o decorador-se:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Uso

E aqui está o uso que responde diretamente a sua pergunta sobre sair depois de 5 segundos:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demonstração:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

A segunda chamada de função não vai terminar, em vez do processo deve sair com um traceback!

KeyboardInterrupt não sempre parar um fio de dormir

Note que o sono não será sempre interrompido por uma interrupção de teclado, em Python 2 no Windows, por exemplo:.

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nem é provável que o código de interrupção correndo em extensões a menos que explicitamente verifica PyErr_CheckSignals(), consulte Cython, Python e KeyboardInterrupt ignorado

Gostaria de evitar dormir um fio mais de um segundo, em qualquer caso -. Isso é uma eternidade no tempo do processador

Como faço para chamar a função ou o que eu envolvê-la em modo que se leva mais de 5 segundos o script cancela e faz outra coisa?

Para pegá-lo e fazer outra coisa, você pode pegar o KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

Eu tenho uma proposta diferente, que é uma função pura (com o mesmo API como a sugestão de threading) e parece funcionar bem (com base em sugestões sobre este tópico)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

Eu corri em toda esta discussão na busca de uma chamada de tempo limite em testes de unidade. Eu não encontrei nada simples nas respostas ou pacotes 3rd party então eu escrevi o decorador abaixo você pode cair para a direita em código:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Então é tão simples como isto para o tempo limite de um teste ou qualquer função que você gosta:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

Há uma série de sugestões, mas nenhum usando concurrent.futures, que acho que é a forma mais legível para lidar com isso.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simples de ler e manter.

Nós fazemos uma piscina, apresentar um único processo e aguarde até 5 segundos antes de levantar um TimeoutError que você pode pegar e manusear, contudo, você necessário.

Nativo para python 3.2+ e portadas para 2,7 (pip instalar futuros).

A comutação entre threads e processos é tão simples como substituir ProcessPoolExecutor com ThreadPoolExecutor.

Se você quiser terminar o processo no tempo limite gostaria de sugerir olhando para Pebble .

O pacote stopit, encontrado na pypi, parece lidar com tempos de espera também.

Eu gosto do decorador @stopit.threading_timeoutable, que adiciona um parâmetro timeout para a função decorada, que faz o que você espera, ele pára a função.

Confira no pypi: https://pypi.python.org/pypi/stopit

Great, fácil de usar e confiável PyPI projecto Tempo limite-decorador ( https://pypi.org/project/timeout-decorator/ )

instalação :

pip install timeout-decorator

Uso :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

timeout-decorator não funcionam em sistema Windows como, janelas fez bem signal não apoio.

Se você usar o tempo limite-decorador no sistema de janelas que você irá obter o seguinte

AttributeError: module 'signal' has no attribute 'SIGALRM'

Alguns sugeriram a utilização use_signals=False mas não funcionou para mim.

Autor @bitranox criado o seguinte pacote:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Exemplo de código:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Dá a seguinte exceção:

TimeoutError: Function mytest timed out after 5 seconds

Eu sou o autor de wrapt_timeout_decorator

A maioria das soluções apresentadas aqui trabalhar wunderfully sob Linux na primeira vista - porque temos fork () e sinais () -, mas em janelas as coisas parecem um pouco diferente. E quando se trata de subthreads No Linux, você não pode usar mais sinais.

A fim de gerar um processo no Windows, ele precisa ser pickable -. E muitas funções decorados ou métodos de classe não são

Então, você precisa usar um pickler melhor como endro e multiprocess (não pickle e multiprocessamento.) - é por isso que você não pode usar ProcessPoolExecutor (ou apenas com funcionalidade limitada)

Para o próprio tempo limite - Você precisa definir o que tempo limite meios - porque no Windows levará considerável (e não determinável) tempo para gerar o processo. Isto pode ser complicado em tempos de espera curtos. Vamos supor, gerando o processo leva cerca de 0,5 segundos (facilmente !!!). Se você dá um tempo limite de 0,2 segundos o que deve acontecer? Caso o tempo de função após 0,5 + 0.2 segundos (assim que deixe o método de execução para 0,2 segundos)? Ou se o chamado tempo de processo após 0,2 segundos (nesse caso, a função decorado SEMPRE tempo limite, porque nesse tempo que não é sequer gerou)?

Além disso decoradores aninhados pode ser desagradável e você não pode usar sinais em um subthread. Se você quiser criar um verdadeiramente universal, multi-plataforma decorador, Tudo isso precisa ser levado em consideração (e testado).

Outras questões estão passando exceções volta para o chamador, bem como registrar problemas (se usado na função decorados - registro para arquivos em outro processo não é suportado)

Eu tentei cobrir todos os casos de ponta, você pode olhar para o wrapt_timeout_decorator pacote, ou pelo menos testar suas próprias soluções inspirado pelos UnitTests utilizado lá.

@Alexis Eggermont - infelizmente eu não tenho pontos suficientes para comentário - talvez alguém pode notificá-lo -. Eu acho que resolveu o problema de importação

Podemos usar os sinais para o mesmo. Eu acho que o exemplo abaixo será útil para você. É muito simples em comparação com threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

Eu tinha uma necessidade de nestable interrupções programadas (que SIGALARM não pode fazer) que não vai ficar bloqueado por time.sleep (que a abordagem baseada em threads não pode fazer). Eu acabei copiando e levemente modificar o código a partir daqui: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

O código em si:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

e um exemplo de uso:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

Aqui está uma ligeira melhoria para a solução baseada em threads dado.

O código abaixo suportes exceções :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invocando-lo com um segundo tempo 5:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top