質問

変更を監視したい別のプロセスによって書き込まれているログファイルがあります。変更が発生するたびに、新しいデータを読み込んで処理を行います。

これを行う最良の方法は何ですか?私は、PyWin32ライブラリから何らかのフックがあることを望んでいました。 win32file.FindNextChangeNotification 関数を見つけましたが、特定のファイルを見るように依頼する方法がわかりません。

誰かがこのようなことをしてくれたなら、その方法を聞いて本当に感謝します...

[編集] 私は、ポーリングを必要としないソリューションを求めていたと述べたはずです。

[編集] 呪い!これは、マップされたネットワークドライブでは機能しないようです。私は、Windowsがローカルディスク上で行うようにファイルの更新を「聞いていない」と推測しています。

役に立ちましたか?

解決

すでに http://timgolden.me.ukで入手可能なドキュメントをご覧になりましたか。 /python/win32_how_do_i/watch_directory_for_changes.html ? Windowsでのみ動作する必要がある場合、2番目の例はまさにあなたが望むもののようです(ディレクトリのパスを監視したいファイルの1つと交換する場合)。

そうでなければ、おそらく唯一の本当にプラットフォームに依存しないオプションがポーリングになります。

注:これらの解決策は試していません。

他のヒント

ウォッチドッグを使用してみましたか?

  

ファイルシステムイベントを監視するPython APIライブラリとシェルユーティリティ。

     

ディレクトリ監視が簡単になりました

     
      
  • クロスプラットフォームAPI。
  •   
  • ディレクトリの変更に応じてコマンドを実行するシェルツール。
  •   
     

クイックスタート ...

の簡単な例ですぐに始めましょう

ポーリングがあなたにとって十分であれば、「変更された時間」がファイルの統計情報の変更。読むには:

os.stat(filename).st_mtime

(Windowsのネイティブ変更イベントソリューションは、ネットワークドライブなど、すべての状況で機能しないことにも注意してください。)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

マルチプラットフォームソリューションが必要な場合は、 QFileSystemWatcher を確認してください。 ここにサンプルコード(サニタイズされていない):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

Windowsでは動作しないはずです(おそらくcygwinで?)が、UNIXユーザーの場合は、「fcntl」を使用する必要があります。システムコール。 Pythonの例を次に示します。 C(同じ関数名)で記述する必要がある場合、ほとんど同じコードです

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

pyinotify をご覧ください。

inotifyは、新しいLinuxのdnotify(以前の回答から)を置き換え、ディレクトリレベルではなくファイルレベルの監視を許可します。

まあ、ティム・ゴールデンのスクリプトを少しハッキングした後、私は次のものを持っていますが、それは非常にうまくいくようです:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

おそらく、より多くのエラーチェックをロードすることで実行できますが、単にログファイルを監視し、画面に出力する前に何らかの処理を行うため、これはうまく機能します。

ご意見をお寄せいただきありがとうございます-すばらしい!

私にとって最も簡単な解決策は、ウォッチドッグのツールwatchmedoを使用することです

https://pypi.python.org/pypi/watchdog よりディレクトリでsqlファイルを検索し、必要に応じて実行するプロセス。

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

チェック 同様の質問に対する私の回答。 Pythonで同じループを試すことができます。 このページの推奨事項:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

質問 tail()a file with Python も参照してください。

>

まあ、Pythonを使用しているので、ファイルを開いてそこから行を読み続けることができます。

f = open('file.log')

読み込まれた行が空ではないの場合、処理します。

line = f.readline()
if line:
    // Do what you want with the line

EOFで readline を呼び出し続けても問題ないかもしれません。この場合、空の文字列を返すだけです。また、ログファイルに何かが追加されると、必要に応じて、停止した場所から読み取りが続行されます。

イベントまたは特定のライブラリを使用するソリューションを探している場合は、質問でこれを指定してください。それ以外の場合、このソリューションは問題ないと思います。

これは、同じトリックを実行するように見え、ファイル全体をインポートしないKenderのコードの簡易バージョンです。

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line

ポーリングを使用して依存関係を最小限に抑えた単一のファイルを見るための Deestan (上記):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

Tim Goldenの記事でご覧いただけるように、ホルストガットマン、WIN32は比較的複雑で、単一のファイルではなくディレクトリを監視します。

.NETである IronPython をご覧になることをお勧めします。 python実装。 IronPythonを使用すると、すべての .NET 機能を使用できます-

System.IO.FileSystemWatcher

単純な Event インターフェースで単一ファイルを処理します。

これは、Linux上で実行されるTim Goldanのスクリプトの別の変更であり、dict(file => time)を使用してファイル変更の単純なウォッチャーを追加します。

使用法:whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print "Watching ", path_to_watch

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print "Added: ", ", ".join(added)
        if removed: print "Removed: ", ", ".join(removed)
        if modified: print "Modified ", ", ".join(modified)

        before = after

これは、ファイルの変更を確認する例です。それを行う最善の方法ではないかもしれませんが、確かに短い方法です。

ソースに変更が加えられたときにアプリケーションを再起動するための便利なツール。 pygameで遊んでいるときにこれを作成したので、ファイルを保存した直後にエフェクトが発生するのがわかります。

pygameで使用する場合は、「while」ループ内の要素がゲームループ(更新など)に配置されていることを確認してください。そうしないと、アプリケーションが無限ループに陥り、ゲームの更新が表示されません。

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

Webで見つけた再起動コードが必要な場合。ここにあります。 (質問には関係ありませんが、役に立つかもしれません)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

電子にあなたがやりたいことをさせるのを楽しんでください。

ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...

これは、1秒あたり1行しか書かないが通常ははるかに少ない入力ファイルを監視するための例です。目標は、指定された出力ファイルに最後の行(最新の書き込み)を追加することです。これを自分のプロジェクトの1つからコピーし、関係のない行をすべて削除しました。不足しているシンボルを入力または変更する必要があります。

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

もちろん、包含するQMainWindowクラスは厳密には必要ありません。 QFileSystemWatcherを単独で使用できます。

最も簡単な解決策は、pygtailを使用することです。     https://pypi.python.org/pypi/pygtail

from pygtail import Pygtail

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)

repyt というシンプルなライブラリを使用することもできます。以下に例を示します。

repyt ./app.py

fswatch が投稿されていないようです。クロスプラットフォームのファイルシステムウォッチャーです。インストールして実行し、プロンプトに従います。

Pythonおよびgolangプログラムで使用しましたが、動作します。

関連する@ 4Oh4ソリューション監視するファイルのリストのスムーズな変更。

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going

Windows固有の機能は知りません。ファイルのMD5ハッシュを1秒/分/時間ごとに取得して(必要な速度に応じて)、最後のハッシュと比較できます。異なる場合は、ファイルが変更されたことがわかり、最新の行を読み上げます。

このようなものを試してみます。

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

ループは、最後にファイルが読み取られてから新しい行があるかどうかをチェックします。ある場合は、読み取られて functionThatAnalisesTheLine 関数に渡されます。そうでない場合、スクリプトは1秒待機してからプロセスを再試行します。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top