Pregunta

¿El Python tiene una función similar a setInterval() de JavaScript?

Gracias

No hay solución correcta

Otros consejos

Este podría ser el fragmento correcto que estás buscando:

import threading

def set_interval(func, sec):
    def func_wrapper():
        set_interval(func, sec)
        func()
    t = threading.Timer(sec, func_wrapper)
    t.start()
    return t

Sólo mantenerlo bonito y sencillo.

import threading

def setInterval(func,time):
    e = threading.Event()
    while not e.wait(time):
        func()

def foo():
    print "hello"

# using
setInterval(foo,5)

# output:
hello
hello
.
.
.

EDIT: Este código es no bloqueante

import threading

class ThreadJob(threading.Thread):
    def __init__(self,callback,event,interval):
        '''runs the callback function after interval seconds

        :param callback:  callback function to invoke
        :param event: external event for controlling the update operation
        :param interval: time in seconds after which are required to fire the callback
        :type callback: function
        :type interval: int
        '''
        self.callback = callback
        self.event = event
        self.interval = interval
        super(ThreadJob,self).__init__()

    def run(self):
        while not self.event.wait(self.interval):
            self.callback()



event = threading.Event()

def foo():
    print "hello"

k = ThreadJob(foo,event,2)
k.start()

print "It is non-blocking"

Esta es una versión donde se puede iniciar y parar. No está bloqueando. Tampoco hay problema técnico como error de tiempo de ejecución no se añade (importante para la ejecución de largo tiempo con intervalo muy corto como de audio, por ejemplo)

import time, threading

StartTime=time.time()

def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))


class setInterval :
    def __init__(self,interval,action) :
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        thread=threading.Thread(target=self.__setInterval)
        thread.start()

    def __setInterval(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))

# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()

La salida es:

just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s

Cambiar Nailxx 's respuesta un poco y tienes la respuesta!

from threading import Timer

def hello():
    print "hello, world"
    Timer(30.0, hello).start()

Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed

El módulo sched ofrece estas capacidades de código general de Python. Sin embargo, como su documentación sugiere, si es multiproceso su código podría tener más sentido utilizar la clase threading.Timer lugar.

Creo que esto es lo que está buscando:

#timertest.py
import sched, time
def dostuff():
  print "stuff is being done!"
  s.enter(3, 1, dostuff, ())

s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()

Si se agrega otra entrada al programador al final del método de la repetición, sólo va a seguir adelante.

utils setInterval simples

from threading import Timer

def setInterval(timer, task):
    isStop = task()
    if not isStop:
        Timer(timer, setInterval, [timer, task]).start()

def hello():
    print "do something"
    return False # return True if you want to stop

if __name__ == "__main__":
    setInterval(2.0, hello) # every 2 seconds, "do something" will be printed

Recientemente, tengo el mismo problema que usted. Y me encuentro con estos soluation:

1. se puede usar la biblioteca: threading.Time (esto que la introducción arriba)

2. se puede usar la biblioteca: sched (esto tenga introducción anterior también)

3. se puede usar la biblioteca: Programador Python avanzada (Recomendado)

El método anterior no bastante lo haga por mí, ya que tenía que ser capaz de cancelar el intervalo. Di vuelta a la función en una clase y ocurrió la siguiente:

class setInterval():
    def __init__(self, func, sec):
        def func_wrapper():
            self.t = threading.Timer(sec, func_wrapper)
            self.t.start()
            func()
        self.t = threading.Timer(sec, func_wrapper)
        self.t.start()

    def cancel(self):
        self.t.cancel()

Algunas de las respuestas anteriores que los usos y func_wrapper threading.Timer hecho el trabajo, salvo que se genera un nuevo hilo cada vez que un intervalo se llama, que está causando problemas de memoria.

El ejemplo básico abajo más o menos implementado un mecanismo similar al poner intervalo en un hilo separado. Tiene capacidad en el intervalo dado. Antes de saltar en el código, aquí están algunas de las limitaciones que es necesario tener en cuenta:

  1. JavaScript es de un solo subproceso, por lo que cuando la función dentro setInterval se dispara, nada más va a trabajar al mismo tiempo (con exclusión de subproceso de trabajo, pero vamos a hablar de casos de uso general de setInterval. Por lo tanto, roscado es seguro. Pero aquí en esta aplicación, puede encontrarse con condiciones de carrera a menos que use un threading.rLock.

  2. La ejecución cuáles usos time.sleep a intervalos de simular, pero añadiendo el tiempo de ejecución de func, el tiempo total para este intervalo de puede ser mayor que lo que espera. Así que dependiendo de los casos de uso, es posible que desee a "dormir menos" (tiempo tomado para llamar menos la func)

  3. Sólo más o menos probado esto, y que definitivamente no debe utilizar variables globales como lo hice, siento libre de modificarlo para que se ajuste en su sistema.


Basta de hablar, aquí está el código:

# Python 2.7
import threading
import time


class Interval(object):
    def __init__(self):
        self.daemon_alive = True
        self.thread = None # keep a reference to the thread so that we can "join"

    def ticktock(self, interval, func):
        while self.daemon_alive:
            time.sleep(interval)
            func()

num = 0
def print_num():
    global num
    num += 1
    print 'num + 1 = ', num

def print_negative_num():
    global num
    print '-num = ', num * -1

intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals

def set_interval(interval, func):
    global g_id_counter

    interval_obj = Interval()
    # Put this interval on a new thread
    t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
    t.setDaemon(True)
    interval_obj.thread = t
    t.start()

    # Register this interval so that we can clear it later
    # using roughly generated id
    interval_id = g_id_counter
    g_id_counter += 1
    intervals[interval_id] = interval_obj

    # return interval id like it does in JavaScript
    return interval_id

def clear_interval(interval_id):
    # terminate this interval's while loop
    intervals[interval_id].daemon_alive = False
    # kill the thread
    intervals[interval_id].thread.join()
    # pop out the interval from registry for reusing
    intervals.pop(interval_id)

if __name__ == '__main__':
    num_interval = set_interval(1, print_num)
    neg_interval = set_interval(3, print_negative_num)

    time.sleep(10) # Sleep 10 seconds on main thread to let interval run
    clear_interval(num_interval)
    clear_interval(neg_interval)
    print "- Are intervals all cleared?"
    time.sleep(3) # check if both intervals are stopped (not printing)
    print "- Yup, time to get beers"

Resultado esperado:

num + 1 =  1
num + 1 =  2
-num =  -2
 num + 1 =  3
num + 1 =  4
num + 1 =  5
-num =  -5
num + 1 =  6
num + 1 =  7
num + 1 =  8
-num =  -8
num + 1 =  9
num + 1 =  10
-num =  -10
Are intervals all cleared?
Yup, time to get beers

Mi Python 3 jsinterval.py módulo será útil! Aquí está:

"""
Threaded intervals and timeouts from JavaScript
"""

import threading, sys

__all__ =  ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']

TIMEOUTS  = {}
INTERVALS = {}

last_timeout_id  = 0
last_interval_id = 0

class Timeout:
    """Class for all timeouts."""
    def __init__(self, func, timeout):
        global last_timeout_id
        last_timeout_id += 1
        self.timeout_id = last_timeout_id
        TIMEOUTS[str(self.timeout_id)] = self
        self.func = func
        self.timeout = timeout
        self.threadname = 'Timeout #%s' %self.timeout_id

    def run(self):
        func = self.func
        delx = self.__del__
        def func_wrapper():
            func()
            delx()
        self.t = threading.Timer(self.timeout/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.start()

    def __repr__(self):
        return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))

    def __del__(self):
        self.t.cancel()

class Interval:
    """Class for all intervals."""
    def __init__(self, func, interval):
        global last_interval_id
        self.interval_id = last_interval_id
        INTERVALS[str(self.interval_id)] = self
        last_interval_id += 1
        self.func = func
        self.interval = interval
        self.threadname = 'Interval #%s' %self.interval_id

    def run(self):
        func = self.func
        interval = self.interval
        def func_wrapper():
            timeout = Timeout(func_wrapper, interval)
            self.timeout = timeout
            timeout.run()
            func()
        self.t = threading.Timer(self.interval/1000, func_wrapper)
        self.t.name = self.threadname
        self.t.run()

    def __repr__(self):
        return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)

    def __del__(self):
        self.timeout.__del__()

def setInterval(func, interval):
    """
    Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
    of executing the function.
    """
    temp = Interval(func, interval)
    temp.run()
    idx = int(temp.interval_id)
    del temp
    return idx


def clearInterval(interval_id):
    try:
        INTERVALS[str(interval_id)].__del__()
        del INTERVALS[str(interval_id)]
    except KeyError:
        sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)

def setTimeout(func, timeout):
    """
    Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
    of executing the function.
    """
    temp = Timeout(func, timeout)
    temp.run()
    idx = int(temp.timeout_id)
    del temp
    return idx


def clearTimeout(timeout_id):
    try:
        TIMEOUTS[str(timeout_id)].__del__()
        del TIMEOUTS[str(timeout_id)]
    except KeyError:
        sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)

editar código: Fija la pérdida de memoria (descubierto por @benjaminz). Ahora todas las discusiones se limpian al final. ¿Por qué ocurre esta filtración? Ocurre debido a las referencias implícitas (o incluso explícitas). En mi caso, TIMEOUTS y INTERVALS. Tiempos de espera de la limpieza automática (después de este parche), ya que utilizan envoltorio función que llama a la función y luego auto-mata. Pero, ¿cómo sucede esto? Los objetos no se pueden borrar de la memoria a menos que todas las referencias también se borran o se utiliza el módulo gc. Explicando: no hay manera de crear (en mi código) referencias no deseados en los tiempos de espera / intervalos. Sólo tienen un remitente: los dicts TIMEOUTS / INTERVALS. Y, cuando interrumpidos o terminados (sólo los tiempos de espera pueden terminar ininterrumpida) que elimine la referencia solamente existente a sí mismos: su correspondiente elemento de dict. Las clases son perfectamente encapsulados utilizando __all__, así que no hay espacio para pérdidas de memoria.

Aquí es una solución de la deriva de tiempo bajo que utiliza un hilo a la señal periódicamente un objeto de evento. carrera de la rosca () no hace casi nada a la espera de un tiempo de espera; de ahí la baja deriva del tiempo.

# Example of low drift (time) periodic execution of a function.
import threading
import time

# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):

    def __init__(self , timeout , flag):
        threading.Thread.__init__(self)
        self.timeout = timeout
        self.stopFlag = False
        self.event = threading.Event()
        self.flag = flag

    # Low drift run(); there is only the 'if'
    # and 'set' methods between waits.
    def run(self):
        while not self.event.wait(self.timeout):
            if self.stopFlag:
                break
            self.flag.set()

    def stop(self):
        stopFlag = True
        self.event.set()

# Data.
printCnt = 0

# Flag to print.
printFlag = threading.Event()

# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()

# Loop to wait for flag and print time.
while True:

    global printCnt

    # Wait for flag.
    printFlag.wait()
    # Flag must be manually cleared.
    printFlag.clear()
    print(time.time())
    printCnt += 1
    if printCnt == 3:
        break;

# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')

La mayoría de las respuestas anteriores no apague el hilo correctamente. Durante el uso de Jupyter portátil he notado que cuando se envió una interrupción explícita, los hilos se sigue ejecutando y peor aún, que mantendrían la multiplicación a partir de 1 hilo conductor, 2, 4, etc. Mi método continuación se basa en la respuesta por @doom pero limpiamente manijas interrupciones mediante la ejecución de un bucle infinito en el hilo principal para escuchar de SIGINT y eventos SIGTERM

  • No se deriva
  • cancelable
  • Los mangos SIGINT y SIGTERM muy bien
  • Doesnt hacer un nuevo hilo para cada ejecución

No dude en sugerir mejoras

import time
import threading
import signal

# Record the time for the purposes of demonstration 
start_time=time.time()

class ProgramKilled(Exception):
    """
    An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
    """
    pass

# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
    raise ProgramKilled

# This function serves as the callback triggered on every run of our IntervalThread
def action() :
    print('action ! -> time : {:.1f}s'.format(time.time()-start_time))

# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
    def __init__(self,interval,action, *args, **kwargs) :
        super(IntervalThread, self).__init__()
        self.interval=interval
        self.action=action
        self.stopEvent=threading.Event()
        self.start()

    def run(self) :
        nextTime=time.time()+self.interval
        while not self.stopEvent.wait(nextTime-time.time()) :
            nextTime+=self.interval
            self.action()

    def cancel(self) :
        self.stopEvent.set()

def main():

    # Handle SIGINT and SIFTERM with the help of the callback function
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    # start action every 1s
    inter=IntervalThread(1,action)
    print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))

    # will stop interval in 500s
    t=threading.Timer(500,inter.cancel)
    t.start()

    # https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    while True:
        try:
            time.sleep(1)
        except ProgramKilled:
            print("Program killed: running cleanup code")
            inter.cancel()
            break

if __name__ == "__main__":
    main()

Las cosas funcionan de manera diferente en Python: es necesario que sea sleep() (si desea bloquear el flujo actual) o iniciar un nuevo hilo. Ver http://docs.python.org/library/threading.html

Python Documentación :

from threading import Timer

def hello():
    print "hello, world"

t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top