كيف يمكنني مشاهدة ملف التغييرات ؟
سؤال
لدي ملف سجل كتبت بواسطة عملية أخرى التي أريد لمشاهدة التغييرات.في كل مرة يحدث تغيير أود قراءة البيانات الجديدة في القيام ببعض المعالجة على ذلك.
ما هي أفضل طريقة للقيام بذلك ؟ كنت آمل أن يكون هناك نوع من هوك من PyWin32 المكتبة.لقد وجدت win32file.FindNextChangeNotification
وظيفة ولكن ليس لديهم فكرة عن كيفية نسأل لمشاهدة ملف معين.
إذا كان أي شخص يفعل أي شيء مثل هذا سأكون ممتنة حقا أن نسمع كيف...
[عدل] كان يجب أن أذكر أني بعد حل لا يتطلب الاقتراع.
[عدل] اللعنات!يبدو أن هذا لا يعمل على محرك أقراص شبكة.انا التخمين ويندوز لا 'سماع' أي تحديثات ملفات الطريقة على القرص المحلي.
المحلول
هل بدا بالفعل في الوثائق المتاحة على http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html?إذا كنت فقط تحتاج أن تعمل تحت ويندوز 2nd سبيل المثال يبدو أن ما تريده بالضبط (إذا كنت تبادل مسار الدليل مع واحد من الملف الذي ترغب في مشاهدة).
وإلا الاقتراع ربما يكون إلا حقا منصة مستقلة الخيار.
ملاحظة: أنا لم أجرب أي من هذه الحلول.
نصائح أخرى
هل حاولت استخدام الوكالة الدولية للطاقة?
بيثون مكتبة API و شل المرافق العامة لمراقبة ملف أحداث النظام.
دليل الرصد سهلة مع
- عبر منصة API.
- قذيفة أداة لتشغيل الأوامر ردا على تغييرات الدليل.
تبدأ بسرعة مع مثال بسيط في التشغيل السريع...
إذا الاقتراع هي جيدة بما فيه الكفاية بالنسبة لك ، أنا فقط ووتش إن "تعديل الوقت" الملف stat التغييرات.إلى قراءتها:
os.stat(filename).st_mtime
(لاحظ أيضا أن ويندوز أصلية تغيير الحدث الحل لا يعمل في جميع الظروف ، مثلعلى محركات أقراص شبكة الاتصال.)
import os
class Monkey(object):
def __init__(self):
self._cached_stamp = 0
self.filename = '/path/to/file'
def ook(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
إذا كنت تريد المتعدد الحل ، ثم تحقق QFileSystemWatcher.هنا مثال على رمز (لا مطهرة):
from PyQt4 import QtCore
@QtCore.pyqtSlot(str)
def directory_changed(path):
print('Directory Changed!!!')
@QtCore.pyqtSlot(str)
def file_changed(path):
print('File Changed!!!')
fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)
يجب أن لا تعمل على ويندوز (ربما مع cygwin ?), ولكن unix المستخدم ، يجب عليك استخدام "fcntl" استدعاء النظام.هنا مثال في بيثون.انها في معظمها نفس التعليمات البرمجية إذا كنت بحاجة إلى كتابة ذلك في ج (نفس وظيفة الأسماء)
import time
import fcntl
import os
import signal
FNAME = "/HOME/TOTO/FILETOWATCH"
def handler(signum, frame):
print "File %s modified" % (FNAME,)
signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME, os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)
while True:
time.sleep(10000)
تحقق من pyinotify.
inotify محل dnotify (من رد سابق) في أحدث linuxes ويسمح مستوى الملف بدلا من الدليل على مستوى الرصد.
حسنا بعد قليل من القرصنة تيم الذهبي سيناريو لدي التالية والتي يبدو أن تعمل بشكل جيد جدا:
import os
import win32file
import win32con
path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt
def ProcessNewData( newData ):
print "Text added: %s"%newData
# Set up the bits we'll need for output
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Open the file we're interested in
a = open(file_to_watch, "r")
# Throw away any exising log data
a.read()
# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're interested in
for action, file in results:
full_filename = os.path.join (path_to_watch, file)
#print file, ACTIONS.get (action, "Unknown")
if file == file_to_watch:
newText = a.read()
if newText != "":
ProcessNewData( newText )
وربما يمكن أن تفعل مع حمولة أكثر التحقق من الخطأ ، ولكن لمجرد مشاهدة ملف سجل والقيام ببعض المعالجة على ذلك قبل البصق عليه إلى الشاشة ، هذا يعمل بشكل جيد.
شكرا للجميع على المدخلات الخاصة بك - الاشياء العظيمة!
أبسط حل بالنسبة لي هو استخدام الوكالة أداة watchmedo
من https://pypi.python.org/pypi/watchdog لدي الآن العملية التي تبدو حتى sql الملفات في دليل وينفذ لهم إذا لزم الأمر.
watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.
تحقق جوابي إلى مثل هذا السؤال.يمكنك أن تجرب نفس الحلقة في بيثون. هذه الصفحة يشير إلى:
import time
while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
print line, # already has newline
انظر أيضا السؤال ذيل() ملف بايثون.
حسنا, بما أنك تستخدم بايثون ، يمكنك فقط فتح ملف و الحفاظ على قراءة خطوط من ذلك.
f = open('file.log')
إذا كان الخط هو قراءة لا فارغة, أنت العملية.
line = f.readline()
if line:
// Do what you want with the line
قد يكون في عداد المفقودين من أنه موافق للحفاظ على الدعوة readline
في EOF.وسوف تبقى مجرد إرجاع سلسلة فارغة في هذه الحالة.وعندما يكون هناك شيء هو إلحاق ملف سجل القراءة سوف تستمر من حيث توقفت ، كما تحتاج.
إذا كنت تبحث عن الحل الذي يستخدم الأحداث أو المكتبة معينة ، يرجى تحديد هذا في سؤالك.خلاف ذلك, أعتقد أن هذا الحل هو على ما يرام.
هنا هو نسخة مبسطة من Kender رمز الذي يظهر أن تفعل نفس الخدعة و لا استيراد ملف كامل:
# Check file for new data.
import time
f = open(r'c:\temp\test.txt', 'r')
while True:
line = f.readline()
if not line:
time.sleep(1)
print 'Nothing New'
else:
print 'Call Function: ', line
لمشاهدة ملف واحد مع الاقتراع ، والحد الأدنى من تبعيات هنا هو تماما بلورتها خرج على جواب من Deestan (فوق):
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self.filename = watch_file
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
print('File changed')
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except:
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
watch_file = 'my_file.txt'
# watcher = Watcher(watch_file) # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
كما يمكنك أن ترى في تيم الذهبي المادة, وأشار خلال هورست غوتمان, WIN32 معقدة نسبيا و الساعات الدلائل ، وليس ملف واحد.
أود أن أقترح عليك أن ننظر إلى IronPython, وهو .صافي بيثون التنفيذ.مع IronPython يمكنك استخدام جميع .صافي وظائف - بما في ذلك
System.IO.FileSystemWatcher
الذي يتعامل مع ملفات واحدة بسيطة الحدث واجهة.
هذا هو آخر تعديل تيم Goldan هو السيناريو الذي يعمل على لينكس و يضيف بسيطة مراقب تعديل الملف باستخدام ديكت (ملف=>الوقت).
الاستخدام:whateverName.py path_to_dir_to_watch
#!/usr/bin/env python
import os, sys, time
def files_to_timestamp(path):
files = [os.path.join(path, f) for f in os.listdir(path)]
return dict ([(f, os.path.getmtime(f)) for f in files])
if __name__ == "__main__":
path_to_watch = sys.argv[1]
print "Watching ", path_to_watch
before = files_to_timestamp(path_to_watch)
while 1:
time.sleep (2)
after = files_to_timestamp(path_to_watch)
added = [f for f in after.keys() if not f in before.keys()]
removed = [f for f in before.keys() if not f in after.keys()]
modified = []
for f in before.keys():
if not f in removed:
if os.path.getmtime(f) != before.get(f):
modified.append(f)
if added: print "Added: ", ", ".join(added)
if removed: print "Removed: ", ", ".join(removed)
if modified: print "Modified ", ", ".join(modified)
before = after
هذا هو مثال على فحص ملف التغييرات.واحد التي قد لا تكون أفضل طريقة للقيام بذلك, ولكن بالتأكيد هو وسيلة قصيرة.
أداة قوية من أجل إعادة تشغيل التطبيق عندما تم إجراء تغييرات إلى المصدر.أنا جعلت هذا عندما لعب مع pygame حتى أستطيع أن أرى آثار تجري مباشرة بعد حفظ الملف.
عندما تستخدم في pygame تأكد من الاشياء في 'الوقت' حلقة توضع في حلقة اللعبة الملقب تحديث أو أيا كان.وإلا التطبيق الخاص بك سوف تتعثر في حلقة لا نهائية و لن ترى اللعبة تحديث.
file_size_stored = os.stat('neuron.py').st_size
while True:
try:
file_size_current = os.stat('neuron.py').st_size
if file_size_stored != file_size_current:
restart_program()
except:
pass
في حال أردت إعادة تشغيل التعليمات البرمجية التي وجدت على شبكة الإنترنت.ومن هنا.(لا ذات الصلة ، على الرغم من أنها يمكن أن تأتي في متناول اليدين)
def restart_program(): #restart application
python = sys.executable
os.execl(python, python, * sys.argv)
المتعة جعل الإلكترونات تفعل ما تريد منها أن تفعل.
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
class myThread (threading.Thread):
def __init__(self, threadID, fileName, directory, origin):
threading.Thread.__init__(self)
self.threadID = threadID
self.fileName = fileName
self.daemon = True
self.dir = directory
self.originalFile = origin
def run(self):
startMonitor(self.fileName, self.dir, self.originalFile)
def startMonitor(fileMonitoring,dirPath,originalFile):
hDir = win32file.CreateFile (
dirPath,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Wait for new data and call ProcessNewData for each new chunk that's
# written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're
# interested in
for action, file_M in results:
full_filename = os.path.join (dirPath, file_M)
#print file, ACTIONS.get (action, "Unknown")
if len(full_filename) == len(fileMonitoring) and action == 3:
#copy to main file
...
هنا مثال موجهة نحو مشاهدة ملفات الإدخال أن تكتب أكثر من سطر واحد في الثانية ولكن عادة الكثير أقل.الهدف هو إلحاق السطر الأخير (أحدث الكتابة) إلى تحديد ملف الإخراج.لقد نسخت هذا من واحدة من بلدي مشاريع فقط حذف جميع الخطوط غير ذات صلة.سيكون لديك لملء أو تغيير الرموز في عداد المفقودين.
from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow # Qt Creator gen'd
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
Ui_MainWindow.__init__(self)
self._fileWatcher = QFileSystemWatcher()
self._fileWatcher.fileChanged.connect(self.fileChanged)
def fileChanged(self, filepath):
QThread.msleep(300) # Reqd on some machines, give chance for write to complete
# ^^ About to test this, may need more sophisticated solution
with open(filepath) as file:
lastLine = list(file)[-1]
destPath = self._filemap[filepath]['dest file']
with open(destPath, 'a') as out_file: # a= append
out_file.writelines([lastLine])
بالطبع يشمل QMainWindow فئة ليست المطلوبة بدقة ، أي.يمكنك استخدام QFileSystemWatcher وحده.
أفضل وأبسط حل هو استخدام pygtail: https://pypi.python.org/pypi/pygtail
from pygtail import Pygtail
while True:
for line in Pygtail("some.log"):
sys.stdout.write(line)
يمكنك أيضا استخدام بسيط مكتبة تسمى repyt, هنا مثال على ذلك:
repyt ./app.py
يبدو أن لا أحد قد نشرت fswatch.هو عبر منصة نظام الملفات مراقب.فقط تثبيته, تشغيله ، ثم اتبع المطالبات.
كنت استخدمه مع الثعبان golang البرامج انها تعمل فقط.
ذات الصلة @4Oh4 الحل تغيير سلس للحصول على قائمة من الملفات يشاهد ؛
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self._cached_stamp_files = {}
self.filenames = watch_files
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
for file in self.filenames:
stamp = os.stat(file).st_mtime
if not file in self._cached_stamp_files:
self._cached_stamp_files[file] = 0
if stamp != self._cached_stamp_files[file]:
self._cached_stamp_files[file] = stamp
# File has changed, so do something...
file_to_read = open(file, 'r')
value = file_to_read.read()
print("value from file", value)
file_to_read.seek(0)
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except Exception as e:
print(e)
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
# pass
watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']
# watcher = Watcher(watch_file) # simple
if __name__ == "__main__":
watcher = Watcher(watch_files, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
أنا لا أعرف أي ويندوز وظيفة محددة.هل يمكن أن تحاول الحصول على تجزئة MD5 الملف كل ثانية/دقيقة/ساعة (يعتمد على مدى سرعة كنت في حاجة إليها) ومقارنتها آخر التجزئة.عندما يختلف تعرف الملف تم تغيير تقرأ بها أحدث خطوط.
حاولت شيئا من هذا القبيل.
try:
f = open(filePath)
except IOError:
print "No such file: %s" % filePath
raw_input("Press Enter to close window")
try:
lines = f.readlines()
while True:
line = f.readline()
try:
if not line:
time.sleep(1)
else:
functionThatAnalisesTheLine(line)
except Exception, e:
# handle the exception somehow (for example, log the trace) and raise the same exception again
raw_input("Press Enter to close window")
raise e
finally:
f.close()
حلقة يتحقق إذا كان هناك خط جديد(s) منذ آخر مرة كان الملف للقراءة إذا كان هناك, إنها قراءة و تمريرها إلى functionThatAnalisesTheLine
وظيفة.إن لم يكن, النصي ينتظر 1 ثانية ثم إعادة المحاولة العملية.