Python setInterval() に相当しますか?
-
01-10-2019 - |
質問
Python には JavaScript に似た機能がありますか? setInterval()
?
ありがとう
正しい解決策はありません
他のヒント
これはあなたが探していた正しいスニペットかもしれません:
import threading
def set_interval(func, sec):
def func_wrapper():
set_interval(func, sec)
func()
t = threading.Timer(sec, func_wrapper)
t.start()
return t
素晴らしくシンプルにしてください。
import threading
def setInterval(func,time):
e = threading.Event()
while not e.wait(time):
func()
def foo():
print "hello"
# using
setInterval(foo,5)
# output:
hello
hello
.
.
.
編集:このコードは非ブロッキングです
import threading
class ThreadJob(threading.Thread):
def __init__(self,callback,event,interval):
'''runs the callback function after interval seconds
:param callback: callback function to invoke
:param event: external event for controlling the update operation
:param interval: time in seconds after which are required to fire the callback
:type callback: function
:type interval: int
'''
self.callback = callback
self.event = event
self.interval = interval
super(ThreadJob,self).__init__()
def run(self):
while not self.event.wait(self.interval):
self.callback()
event = threading.Event()
def foo():
print "hello"
k = ThreadJob(foo,event,2)
k.start()
print "It is non-blocking"
これは、開始して停止できるバージョンです。ブロックしていません。また、実行時間エラーが追加されていないため、グリッチもありません(たとえば、オーディオなどの非常に短い間隔で長期にわたる実行に重要です)
import time, threading
StartTime=time.time()
def action() :
print('action ! -> time : {:.1f}s'.format(time.time()-StartTime))
class setInterval :
def __init__(self,interval,action) :
self.interval=interval
self.action=action
self.stopEvent=threading.Event()
thread=threading.Thread(target=self.__setInterval)
thread.start()
def __setInterval(self) :
nextTime=time.time()+self.interval
while not self.stopEvent.wait(nextTime-time.time()) :
nextTime+=self.interval
self.action()
def cancel(self) :
self.stopEvent.set()
# start action every 0.6s
inter=setInterval(0.6,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-StartTime))
# will stop interval in 5s
t=threading.Timer(5,inter.cancel)
t.start()
出力は次のとおりです。
just after setInterval -> time : 0.0s
action ! -> time : 0.6s
action ! -> time : 1.2s
action ! -> time : 1.8s
action ! -> time : 2.4s
action ! -> time : 3.0s
action ! -> time : 3.6s
action ! -> time : 4.2s
action ! -> time : 4.8s
変化する Nailxx少し答えて、あなたは答えを得ました!
from threading import Timer
def hello():
print "hello, world"
Timer(30.0, hello).start()
Timer(30.0, hello).start() # after 30 seconds, "hello, world" will be printed
sched
モジュールは、一般的なPythonコードのこれらの能力を提供します。ただし、そのドキュメントが示唆するように、コードがマルチスレッドされている場合、使用する方が理にかなっている可能性があります。 threading.Timer
代わりにクラス。
私はこれがあなたが望んでいることだと思います:
#timertest.py
import sched, time
def dostuff():
print "stuff is being done!"
s.enter(3, 1, dostuff, ())
s = sched.scheduler(time.time, time.sleep)
s.enter(3, 1, dostuff, ())
s.run()
繰り返し方式の最後にスケジューラに別のエントリを追加すると、それはただ続けます。
シンプルなsetinterval Utils
from threading import Timer
def setInterval(timer, task):
isStop = task()
if not isStop:
Timer(timer, setInterval, [timer, task]).start()
def hello():
print "do something"
return False # return True if you want to stop
if __name__ == "__main__":
setInterval(2.0, hello) # every 2 seconds, "do something" will be printed
最近、私はあなたと同じ問題を抱えています。そして、私はこれらの溶液を見つけます:
1.ライブラリを使用できます。 Threading.time(これは上記の紹介があります)
2.ライブラリを使用できます。 スケジュール(これには上記の紹介もあります)
3.ライブラリを使用できます。 高度なPythonスケジューラ(お勧め)
上記の方法は、間隔をキャンセルできるようにする必要があるため、私にとっては完全にはできませんでした。この機能をクラスに変えて、次のことを思いつきました。
class setInterval():
def __init__(self, func, sec):
def func_wrapper():
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
func()
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
def cancel(self):
self.t.cancel()
を使用する上記のいくつかの回答 func_wrapper
そして threading.Timer
確かに機能しますが、間隔が呼び出されるたびに新しいスレッドが生成され、メモリの問題が発生する点が異なります。
以下の基本的な例 だいたい 別のスレッドに間隔を置くことで同様のメカニズムを実装しました。指定された間隔でスリープします。コードに入る前に、次の制限事項に注意する必要があります。
JavaScript はシングルスレッドなので、内部の関数が
setInterval
が起動されると、同時に他に何も動作しません (ワーカー スレッドを除きますが、一般的な使用例について話しましょう)setInterval
. 。したがって、スレッドは安全です。ただし、この実装では、threading.rLock
.以下の実装では、
time.sleep
間隔をシミュレートしますが、実行時間を追加します。func
, 、この合計時間 間隔 あなたが期待しているものよりも大きいかもしれません。したがって、ユースケースによっては、「睡眠時間を短く」する必要があるかもしれません(通話にかかる時間を差し引いたもの)。func
)私だけ だいたい これをテストしましたが、私のようにグローバル変数を使用するべきではありません。システムに適合するように自由に調整してください。
話はこれくらいにして、コードを次に示します。
# Python 2.7
import threading
import time
class Interval(object):
def __init__(self):
self.daemon_alive = True
self.thread = None # keep a reference to the thread so that we can "join"
def ticktock(self, interval, func):
while self.daemon_alive:
time.sleep(interval)
func()
num = 0
def print_num():
global num
num += 1
print 'num + 1 = ', num
def print_negative_num():
global num
print '-num = ', num * -1
intervals = {} # keep track of intervals
g_id_counter = 0 # roughly generate ids for intervals
def set_interval(interval, func):
global g_id_counter
interval_obj = Interval()
# Put this interval on a new thread
t = threading.Thread(target=interval_obj.ticktock, args=(interval, func))
t.setDaemon(True)
interval_obj.thread = t
t.start()
# Register this interval so that we can clear it later
# using roughly generated id
interval_id = g_id_counter
g_id_counter += 1
intervals[interval_id] = interval_obj
# return interval id like it does in JavaScript
return interval_id
def clear_interval(interval_id):
# terminate this interval's while loop
intervals[interval_id].daemon_alive = False
# kill the thread
intervals[interval_id].thread.join()
# pop out the interval from registry for reusing
intervals.pop(interval_id)
if __name__ == '__main__':
num_interval = set_interval(1, print_num)
neg_interval = set_interval(3, print_negative_num)
time.sleep(10) # Sleep 10 seconds on main thread to let interval run
clear_interval(num_interval)
clear_interval(neg_interval)
print "- Are intervals all cleared?"
time.sleep(3) # check if both intervals are stopped (not printing)
print "- Yup, time to get beers"
期待される出力:
num + 1 = 1
num + 1 = 2
-num = -2
num + 1 = 3
num + 1 = 4
num + 1 = 5
-num = -5
num + 1 = 6
num + 1 = 7
num + 1 = 8
-num = -8
num + 1 = 9
num + 1 = 10
-num = -10
Are intervals all cleared?
Yup, time to get beers
私のPython 3モジュール jsinterval.py
役に立ちます!ここにあります:
"""
Threaded intervals and timeouts from JavaScript
"""
import threading, sys
__all__ = ['TIMEOUTS', 'INTERVALS', 'setInterval', 'clearInterval', 'setTimeout', 'clearTimeout']
TIMEOUTS = {}
INTERVALS = {}
last_timeout_id = 0
last_interval_id = 0
class Timeout:
"""Class for all timeouts."""
def __init__(self, func, timeout):
global last_timeout_id
last_timeout_id += 1
self.timeout_id = last_timeout_id
TIMEOUTS[str(self.timeout_id)] = self
self.func = func
self.timeout = timeout
self.threadname = 'Timeout #%s' %self.timeout_id
def run(self):
func = self.func
delx = self.__del__
def func_wrapper():
func()
delx()
self.t = threading.Timer(self.timeout/1000, func_wrapper)
self.t.name = self.threadname
self.t.start()
def __repr__(self):
return '<JS Timeout set for %s seconds, launching function %s on timeout reached>' %(self.timeout, repr(self.func))
def __del__(self):
self.t.cancel()
class Interval:
"""Class for all intervals."""
def __init__(self, func, interval):
global last_interval_id
self.interval_id = last_interval_id
INTERVALS[str(self.interval_id)] = self
last_interval_id += 1
self.func = func
self.interval = interval
self.threadname = 'Interval #%s' %self.interval_id
def run(self):
func = self.func
interval = self.interval
def func_wrapper():
timeout = Timeout(func_wrapper, interval)
self.timeout = timeout
timeout.run()
func()
self.t = threading.Timer(self.interval/1000, func_wrapper)
self.t.name = self.threadname
self.t.run()
def __repr__(self):
return '<JS Interval, repeating function %s with interval %s>' %(repr(self.func), self.interval)
def __del__(self):
self.timeout.__del__()
def setInterval(func, interval):
"""
Create a JS Interval: func is the function to repeat, interval is the interval (in ms)
of executing the function.
"""
temp = Interval(func, interval)
temp.run()
idx = int(temp.interval_id)
del temp
return idx
def clearInterval(interval_id):
try:
INTERVALS[str(interval_id)].__del__()
del INTERVALS[str(interval_id)]
except KeyError:
sys.stderr.write('No such interval "Interval #%s"\n' %interval_id)
def setTimeout(func, timeout):
"""
Create a JS Timeout: func is the function to timeout, timeout is the timeout (in ms)
of executing the function.
"""
temp = Timeout(func, timeout)
temp.run()
idx = int(temp.timeout_id)
del temp
return idx
def clearTimeout(timeout_id):
try:
TIMEOUTS[str(timeout_id)].__del__()
del TIMEOUTS[str(timeout_id)]
except KeyError:
sys.stderr.write('No such timeout "Timeout #%s"\n' %timeout_id)
コード編集: メモリリークを修正しました(@benjaminzによって発見されました)。これで、すべてのスレッドが端にクリーンアップされます。なぜこのリークが起こるのですか?それは、暗黙の(または明示的な)参照のために起こります。私の場合、 TIMEOUTS
と INTERVALS
. 。タイムアウト自動的に(このパッチの後)、関数を呼び出してからセルフキルを呼び出す関数ラッパーを使用するため、自動的に(このパッチの後)。しかし、これはどのように起こりますか?すべての参照が削除されない場合を除き、オブジェクトはメモリから削除することはできません gc
モジュールが使用されます。説明:(私のコードで)タイムアウト/間隔への不要な参照を作成する方法はありません。リファラーは1つだけです TIMEOUTS
/INTERVALS
dicts。また、中断または終了すると(タイムアウトのみが途切れることなく終了できます)、それ自体への唯一の既存の参照:対応するDICT要素を削除します。クラスは完全にカプセル化されています __all__
, 、したがって、メモリリーク用のスペースはありません。
ここでは、スレッドを使用してイベントオブジェクトを定期的に信号する低い時間ドリフトソリューションです。スレッドの実行()は、タイムアウトを待っている間、ほとんど何もしません。したがって、低い時間ドリフト。
# Example of low drift (time) periodic execution of a function.
import threading
import time
# Thread that sets 'flag' after 'timeout'
class timerThread (threading.Thread):
def __init__(self , timeout , flag):
threading.Thread.__init__(self)
self.timeout = timeout
self.stopFlag = False
self.event = threading.Event()
self.flag = flag
# Low drift run(); there is only the 'if'
# and 'set' methods between waits.
def run(self):
while not self.event.wait(self.timeout):
if self.stopFlag:
break
self.flag.set()
def stop(self):
stopFlag = True
self.event.set()
# Data.
printCnt = 0
# Flag to print.
printFlag = threading.Event()
# Create and start the timer thread.
printThread = timerThread(3 , printFlag)
printThread.start()
# Loop to wait for flag and print time.
while True:
global printCnt
# Wait for flag.
printFlag.wait()
# Flag must be manually cleared.
printFlag.clear()
print(time.time())
printCnt += 1
if printCnt == 3:
break;
# Stop the thread and exit.
printThread.stop()
printThread.join()
print('Done')
上記の回答のほとんどは、スレッドを適切にシャットダウンしません。 jupyterノートブックの使用中、明示的な割り込みが送信されたとき、スレッドがまだ実行されていてさらに悪いことに気づきました。1つのスレッドランニング、2、4などで開始し続けます。メインスレッドで無限のループを実行して中断を処理して、sigintやsigtermイベントを聴く
- ドリフトはありません
- キャンセル可能
- SigintとSigtermを非常にうまく処理します
- 実行ごとに新しいスレッドを作成しません
お気軽に改善を提案してください
import time
import threading
import signal
# Record the time for the purposes of demonstration
start_time=time.time()
class ProgramKilled(Exception):
"""
An instance of this custom exception class will be thrown everytime we get an SIGTERM or SIGINT
"""
pass
# Raise the custom exception whenever SIGINT or SIGTERM is triggered
def signal_handler(signum, frame):
raise ProgramKilled
# This function serves as the callback triggered on every run of our IntervalThread
def action() :
print('action ! -> time : {:.1f}s'.format(time.time()-start_time))
# https://stackoverflow.com/questions/2697039/python-equivalent-of-setinterval
class IntervalThread(threading.Thread) :
def __init__(self,interval,action, *args, **kwargs) :
super(IntervalThread, self).__init__()
self.interval=interval
self.action=action
self.stopEvent=threading.Event()
self.start()
def run(self) :
nextTime=time.time()+self.interval
while not self.stopEvent.wait(nextTime-time.time()) :
nextTime+=self.interval
self.action()
def cancel(self) :
self.stopEvent.set()
def main():
# Handle SIGINT and SIFTERM with the help of the callback function
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# start action every 1s
inter=IntervalThread(1,action)
print('just after setInterval -> time : {:.1f}s'.format(time.time()-start_time))
# will stop interval in 500s
t=threading.Timer(500,inter.cancel)
t.start()
# https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
while True:
try:
time.sleep(1)
except ProgramKilled:
print("Program killed: running cleanup code")
inter.cancel()
break
if __name__ == "__main__":
main()
Pythonでは物事が異なって機能します:あなたはどちらもする必要があります sleep()
(現在のスレッドをブロックする場合)または新しいスレッドを起動します。見る http://docs.python.org/library/threading.html
から Pythonドキュメント:
from threading import Timer
def hello():
print "hello, world"
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed