문제

변경 사항을보고 싶은 다른 프로세스에서 로그 파일이 작성되어 있습니다. 변경이 발생할 때마다 새로운 데이터를 읽고 일부 처리를 수행하고 싶습니다.

이것을하는 가장 좋은 방법은 무엇입니까? Pywin32 라이브러리에서 일종의 갈고리가 있기를 바랐습니다. 나는 찾았다 win32file.FindNextChangeNotification 기능하지만 특정 파일을 보도록 요청하는 방법을 모르겠습니다.

누군가 이런 일을한다면 어떻게되어 정말 감사 할 것입니다 ...

편집하다 나는 폴링이 필요하지 않은 해결책을 따랐다 고 언급 했어야했다.

편집하다 저주! 이것은 매핑 된 네트워크 드라이브에서 작동하지 않는 것 같습니다. Windows가 로컬 디스크에서 수행하는 방식으로 파일의 업데이트를 '듣지 못한다'고 생각합니다.

도움이 되었습니까?

해결책

이미 사용 가능한 문서를 살펴 보셨습니까? http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html? Windows 아래에서만 작동하는 경우 두 번째 예제는 정확히 원하는 것 같습니다 (디렉토리의 경로를보고 싶은 파일 중 하나로 교환하는 경우).

그렇지 않으면, 폴링은 아마도 유일한 플랫폼 독립적 옵션 일 것입니다.

메모: 나는이 솔루션을 시도하지 않았습니다.

다른 팁

사용해 보셨습니까? 지키는 개?

파일 시스템 이벤트를 모니터링하는 Python API 라이브러리 및 쉘 유틸리티.

디렉토리 모니터링이 쉽게 만들어졌습니다

  • 크로스 플랫폼 API.
  • 디렉토리 변경에 대한 응답으로 명령을 실행하는 쉘 도구.

간단한 예를 들어 신속하게 시작하십시오 빠른 시작...

폴링이 충분하다면 "수정 된 시간"파일 스탯이 변경되는지 시청할 것입니다. 그것을 읽으려면 :

os.stat(filename).st_mtime

(또한 Windows Native Change 이벤트 솔루션은 모든 상황에서 (예 : 네트워크 드라이브에서) 작동하지는 않습니다.)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

Multiplatform 솔루션을 원한다면 확인하십시오 QFILESYSTEMWATCHER. 여기서 예제 코드 (소독되지 않음) :

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

Windows에서 작동하지 않아야하지만 (Cygwin과 함께?) UNIX 사용자의 경우 "FCNTL"시스템 호출을 사용해야합니다. 파이썬의 예는 다음과 같습니다. C (동일한 함수 이름)로 작성 해야하는 경우 대부분 동일한 코드입니다.

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

체크 아웃 pyinotify.

Inotify는 최신 Linux에서 Dnotify (이전 답변에서)를 대체하고 디렉토리 수준 모니터링 대신 파일 레벨을 허용합니다.

Tim Golden의 대본을 약간 해킹 한 후에는 다음과 같은 것 같습니다.

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

더 많은 오류 검사로로드 할 수 있지만 로그 파일을보고 화면에 뱉기 전에 처리를 수행하기 위해서는 잘 작동합니다.

당신의 의견에 감사드립니다 - 훌륭한 물건!

저를위한 가장 간단한 솔루션은 Watchdog의 도구 WatchMedo를 사용하는 것입니다.

에서 https://pypi.python.org/pypi/watchdog 이제 디렉토리에서 SQL 파일을 찾아서 필요한 경우 실행하는 프로세스가 있습니다.

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

확인하다 내 대답 a 비슷한 질문. 파이썬에서 동일한 루프를 시도 할 수 있습니다. 이 페이지 제안:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

또한 질문을 참조하십시오 꼬리 () 파이썬이있는 파일.

글쎄, 당신은 python을 사용하고 있기 때문에 파일을 열고 그것을 읽는 줄을 계속 읽을 수 있습니다.

f = open('file.log')

라인이 읽는 경우 비어 있지 않습니다, 당신은 그것을 처리합니다.

line = f.readline()
if line:
    // Do what you want with the line

당신은 계속 전화해도 괜찮다는 것을 놓치고있을 수 있습니다. readline EOF에서. 이 경우 빈 문자열을 계속 반환합니다. 그리고 로그 파일에 무언가가 추가되면, 필요에 따라 중지 된 곳에서 읽기가 계속됩니다.

이벤트 또는 특정 라이브러리를 사용하는 솔루션을 찾고 있다면 질문에이를 지정하십시오. 그렇지 않으면이 솔루션은 괜찮다고 생각합니다.

다음은 동일한 트릭을 수행하고 전체 파일을 가져 오지 않는 Kender 코드의 단순화 된 버전입니다.

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line

폴링 및 최소한의 종속성이있는 단일 파일을 보려면 다음은 데스탄 (위에):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

볼 수 있듯이 팀 골든의 기사, Horst Gutmann, Win32는 비교적 복잡하며 단일 파일이 아닌 디렉토리를 시청합니다.

나는 당신이 조사를 제안하고 싶습니다 Ironpython,, 그것은 a .그물 파이썬 구현. Ironpython을 사용하면 모든 것을 사용할 수 있습니다 .그물 기능 - 포함

System.IO.FileSystemWatcher

단순한 파일을 간단하게 처리합니다 이벤트 상호 작용.

이것은 Linux에서 실행되는 Tim Goldan의 스크립트의 또 다른 수정이며 Dict (file => time)를 사용하여 파일 수정을 간단한 감시자를 추가합니다.

사용법 : 뭐든지 name.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print "Watching ", path_to_watch

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print "Added: ", ", ".join(added)
        if removed: print "Removed: ", ", ".join(removed)
        if modified: print "Modified ", ", ".join(modified)

        before = after

이것은 변경 사항을 확인하는 예입니다. 가장 좋은 방법이 아닐 수도 있지만 확실히 짧은 방법입니다.

소스를 변경했을 때 응용 프로그램을 다시 시작하기위한 편리한 도구. 파일 저장 직후에 효과가 발생하는 것을 볼 수 있도록 Pygame을 사용하여 이것을 만들었습니다.

Pygame에 사용될 때 'Whend'루프의 물건이 게임 루프 일명 업데이트에 배치되어 있는지 확인하십시오. 그렇지 않으면 응용 프로그램이 무한 루프에 갇히게되며 게임 업데이트가 표시되지 않습니다.

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

웹에서 찾은 재시작 코드를 원한다면 여기있어. (질문과 관련이 없지만 유용 할 수 있습니다)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

전자가 원하는 일을하는 재미를 즐기십시오.

ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...

다음은 초당 한 줄을 작성하지 않고 일반적으로 훨씬 적은 입력 파일을 시청하는 예제입니다. 목표는 마지막 줄 (가장 최근의 쓰기)을 지정된 출력 파일에 추가하는 것입니다. 나는 이것을 내 프로젝트 중 하나에서 복사하고 모든 관련없는 줄을 삭제했습니다. 누락 된 기호를 작성하거나 변경해야합니다.

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

물론, Qmainwindow 클래스를 포괄하는 것은 엄격하게 필요하지 않습니다. qfilesystemwatcher 만 사용할 수 있습니다.

가장 가장 간단한 솔루션은 Pygtail을 사용하는 것입니다. https://pypi.python.org/pypi/pygtail

from pygtail import Pygtail

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)

간단한 라이브러리를 사용할 수도 있습니다 repyt, 예는 다음과 같습니다.

repyt ./app.py

아무도 게시하지 않은 것 같습니다 fswatch. 크로스 플랫폼 파일 시스템 감시자입니다. 설치하고 실행하고 프롬프트를 따르십시오.

나는 그것을 Python 및 Golang 프로그램과 함께 사용했고 그것은 단지 작동합니다.

관련 @4oh4 솔루션 파일 목록이 볼 수있는 원활한 변경;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going

Windows 특정 기능을 모릅니다. 1 초/시간마다 파일의 MD5 해시를 가져 와서 (필요한 지에 따라 다름) 마지막 해시와 비교할 수 있습니다. 다른 경우 파일이 변경되었고 최신 줄을 읽었습니다.

나는 이런 식을 시도 할 것이다.

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

루프는 지난번 파일을 읽은 이후 새 라인이 있는지 확인합니다. functionThatAnalisesTheLine 기능. 그렇지 않은 경우 스크립트는 1 초를 기다리고 프로세스를 다시 시작합니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top