ファイルの変更を監視するにはどうすればよいですか?
質問
変更を監視したい別のプロセスによって書き込まれているログファイルがあります。変更が発生するたびに、新しいデータを読み込んで処理を行います。
これを行う最良の方法は何ですか?私は、PyWin32ライブラリから何らかのフックがあることを望んでいました。 win32file.FindNextChangeNotification
関数を見つけましたが、特定のファイルを見るように依頼する方法がわかりません。
誰かがこのようなことをしてくれたなら、その方法を聞いて本当に感謝します...
[編集] 私は、ポーリングを必要としないソリューションを求めていたと述べたはずです。
[編集] 呪い!これは、マップされたネットワークドライブでは機能しないようです。私は、Windowsがローカルディスク上で行うようにファイルの更新を「聞いていない」と推測しています。
解決
すでに http://timgolden.me.ukで入手可能なドキュメントをご覧になりましたか。 /python/win32_how_do_i/watch_directory_for_changes.html ? Windowsでのみ動作する必要がある場合、2番目の例はまさにあなたが望むもののようです(ディレクトリのパスを監視したいファイルの1つと交換する場合)。
そうでなければ、おそらく唯一の本当にプラットフォームに依存しないオプションがポーリングになります。
注:これらの解決策は試していません。
他のヒント
ポーリングがあなたにとって十分であれば、「変更された時間」がファイルの統計情報の変更。読むには:
os.stat(filename).st_mtime
(Windowsのネイティブ変更イベントソリューションは、ネットワークドライブなど、すべての状況で機能しないことにも注意してください。)
import os
class Monkey(object):
def __init__(self):
self._cached_stamp = 0
self.filename = '/path/to/file'
def ook(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
マルチプラットフォームソリューションが必要な場合は、 QFileSystemWatcher を確認してください。 ここにサンプルコード(サニタイズされていない):
from PyQt4 import QtCore
@QtCore.pyqtSlot(str)
def directory_changed(path):
print('Directory Changed!!!')
@QtCore.pyqtSlot(str)
def file_changed(path):
print('File Changed!!!')
fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)
Windowsでは動作しないはずです(おそらくcygwinで?)が、UNIXユーザーの場合は、「fcntl」を使用する必要があります。システムコール。 Pythonの例を次に示します。 C(同じ関数名)で記述する必要がある場合、ほとんど同じコードです
import time
import fcntl
import os
import signal
FNAME = "/HOME/TOTO/FILETOWATCH"
def handler(signum, frame):
print "File %s modified" % (FNAME,)
signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME, os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)
while True:
time.sleep(10000)
pyinotify をご覧ください。
inotifyは、新しいLinuxのdnotify(以前の回答から)を置き換え、ディレクトリレベルではなくファイルレベルの監視を許可します。
まあ、ティム・ゴールデンのスクリプトを少しハッキングした後、私は次のものを持っていますが、それは非常にうまくいくようです:
import os
import win32file
import win32con
path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt
def ProcessNewData( newData ):
print "Text added: %s"%newData
# Set up the bits we'll need for output
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Open the file we're interested in
a = open(file_to_watch, "r")
# Throw away any exising log data
a.read()
# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're interested in
for action, file in results:
full_filename = os.path.join (path_to_watch, file)
#print file, ACTIONS.get (action, "Unknown")
if file == file_to_watch:
newText = a.read()
if newText != "":
ProcessNewData( newText )
おそらく、より多くのエラーチェックをロードすることで実行できますが、単にログファイルを監視し、画面に出力する前に何らかの処理を行うため、これはうまく機能します。
ご意見をお寄せいただきありがとうございます-すばらしい!
私にとって最も簡単な解決策は、ウォッチドッグのツールwatchmedoを使用することです
https://pypi.python.org/pypi/watchdog よりディレクトリでsqlファイルを検索し、必要に応じて実行するプロセス。
watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.
チェック 同様の質問に対する私の回答。 Pythonで同じループを試すことができます。 このページの推奨事項:
import time
while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
print line, # already has newline
質問 tail()a file with Python も参照してください。
>まあ、Pythonを使用しているので、ファイルを開いてそこから行を読み続けることができます。
f = open('file.log')
読み込まれた行が空ではないの場合、処理します。
line = f.readline()
if line:
// Do what you want with the line
EOFで readline
を呼び出し続けても問題ないかもしれません。この場合、空の文字列を返すだけです。また、ログファイルに何かが追加されると、必要に応じて、停止した場所から読み取りが続行されます。
イベントまたは特定のライブラリを使用するソリューションを探している場合は、質問でこれを指定してください。それ以外の場合、このソリューションは問題ないと思います。
これは、同じトリックを実行するように見え、ファイル全体をインポートしないKenderのコードの簡易バージョンです。
# Check file for new data.
import time
f = open(r'c:\temp\test.txt', 'r')
while True:
line = f.readline()
if not line:
time.sleep(1)
print 'Nothing New'
else:
print 'Call Function: ', line
ポーリングを使用して依存関係を最小限に抑えた単一のファイルを見るための Deestan (上記):
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self.filename = watch_file
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
print('File changed')
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except:
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
watch_file = 'my_file.txt'
# watcher = Watcher(watch_file) # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
Tim Goldenの記事でご覧いただけるように、ホルストガットマン、WIN32は比較的複雑で、単一のファイルではなくディレクトリを監視します。
.NETである IronPython をご覧になることをお勧めします。 python実装。 IronPythonを使用すると、すべての .NET 機能を使用できます-
System.IO.FileSystemWatcher
単純な Event インターフェースで単一ファイルを処理します。
これは、Linux上で実行されるTim Goldanのスクリプトの別の変更であり、dict(file => time)を使用してファイル変更の単純なウォッチャーを追加します。
使用法:whateverName.py path_to_dir_to_watch
#!/usr/bin/env python
import os, sys, time
def files_to_timestamp(path):
files = [os.path.join(path, f) for f in os.listdir(path)]
return dict ([(f, os.path.getmtime(f)) for f in files])
if __name__ == "__main__":
path_to_watch = sys.argv[1]
print "Watching ", path_to_watch
before = files_to_timestamp(path_to_watch)
while 1:
time.sleep (2)
after = files_to_timestamp(path_to_watch)
added = [f for f in after.keys() if not f in before.keys()]
removed = [f for f in before.keys() if not f in after.keys()]
modified = []
for f in before.keys():
if not f in removed:
if os.path.getmtime(f) != before.get(f):
modified.append(f)
if added: print "Added: ", ", ".join(added)
if removed: print "Removed: ", ", ".join(removed)
if modified: print "Modified ", ", ".join(modified)
before = after
これは、ファイルの変更を確認する例です。それを行う最善の方法ではないかもしれませんが、確かに短い方法です。
ソースに変更が加えられたときにアプリケーションを再起動するための便利なツール。 pygameで遊んでいるときにこれを作成したので、ファイルを保存した直後にエフェクトが発生するのがわかります。
pygameで使用する場合は、「while」ループ内の要素がゲームループ(更新など)に配置されていることを確認してください。そうしないと、アプリケーションが無限ループに陥り、ゲームの更新が表示されません。
file_size_stored = os.stat('neuron.py').st_size
while True:
try:
file_size_current = os.stat('neuron.py').st_size
if file_size_stored != file_size_current:
restart_program()
except:
pass
Webで見つけた再起動コードが必要な場合。ここにあります。 (質問には関係ありませんが、役に立つかもしれません)
def restart_program(): #restart application
python = sys.executable
os.execl(python, python, * sys.argv)
電子にあなたがやりたいことをさせるのを楽しんでください。
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
class myThread (threading.Thread):
def __init__(self, threadID, fileName, directory, origin):
threading.Thread.__init__(self)
self.threadID = threadID
self.fileName = fileName
self.daemon = True
self.dir = directory
self.originalFile = origin
def run(self):
startMonitor(self.fileName, self.dir, self.originalFile)
def startMonitor(fileMonitoring,dirPath,originalFile):
hDir = win32file.CreateFile (
dirPath,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Wait for new data and call ProcessNewData for each new chunk that's
# written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're
# interested in
for action, file_M in results:
full_filename = os.path.join (dirPath, file_M)
#print file, ACTIONS.get (action, "Unknown")
if len(full_filename) == len(fileMonitoring) and action == 3:
#copy to main file
...
これは、1秒あたり1行しか書かないが通常ははるかに少ない入力ファイルを監視するための例です。目標は、指定された出力ファイルに最後の行(最新の書き込み)を追加することです。これを自分のプロジェクトの1つからコピーし、関係のない行をすべて削除しました。不足しているシンボルを入力または変更する必要があります。
from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow # Qt Creator gen'd
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
Ui_MainWindow.__init__(self)
self._fileWatcher = QFileSystemWatcher()
self._fileWatcher.fileChanged.connect(self.fileChanged)
def fileChanged(self, filepath):
QThread.msleep(300) # Reqd on some machines, give chance for write to complete
# ^^ About to test this, may need more sophisticated solution
with open(filepath) as file:
lastLine = list(file)[-1]
destPath = self._filemap[filepath]['dest file']
with open(destPath, 'a') as out_file: # a= append
out_file.writelines([lastLine])
もちろん、包含するQMainWindowクラスは厳密には必要ありません。 QFileSystemWatcherを単独で使用できます。
最も簡単な解決策は、pygtailを使用することです。 https://pypi.python.org/pypi/pygtail
from pygtail import Pygtail
while True:
for line in Pygtail("some.log"):
sys.stdout.write(line)
repyt というシンプルなライブラリを使用することもできます。以下に例を示します。
repyt ./app.py
fswatch が投稿されていないようです。クロスプラットフォームのファイルシステムウォッチャーです。インストールして実行し、プロンプトに従います。
Pythonおよびgolangプログラムで使用しましたが、動作します。
関連する@ 4Oh4ソリューション監視するファイルのリストのスムーズな変更。
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self._cached_stamp_files = {}
self.filenames = watch_files
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
for file in self.filenames:
stamp = os.stat(file).st_mtime
if not file in self._cached_stamp_files:
self._cached_stamp_files[file] = 0
if stamp != self._cached_stamp_files[file]:
self._cached_stamp_files[file] = stamp
# File has changed, so do something...
file_to_read = open(file, 'r')
value = file_to_read.read()
print("value from file", value)
file_to_read.seek(0)
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except Exception as e:
print(e)
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
# pass
watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']
# watcher = Watcher(watch_file) # simple
if __name__ == "__main__":
watcher = Watcher(watch_files, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
Windows固有の機能は知りません。ファイルのMD5ハッシュを1秒/分/時間ごとに取得して(必要な速度に応じて)、最後のハッシュと比較できます。異なる場合は、ファイルが変更されたことがわかり、最新の行を読み上げます。
このようなものを試してみます。
try:
f = open(filePath)
except IOError:
print "No such file: %s" % filePath
raw_input("Press Enter to close window")
try:
lines = f.readlines()
while True:
line = f.readline()
try:
if not line:
time.sleep(1)
else:
functionThatAnalisesTheLine(line)
except Exception, e:
# handle the exception somehow (for example, log the trace) and raise the same exception again
raw_input("Press Enter to close window")
raise e
finally:
f.close()
ループは、最後にファイルが読み取られてから新しい行があるかどうかをチェックします。ある場合は、読み取られて functionThatAnalisesTheLine
関数に渡されます。そうでない場合、スクリプトは1秒待機してからプロセスを再試行します。