¿Cómo veo un archivo para los cambios?
Pregunta
Tengo un archivo de registro que está siendo escrito por otro proceso que quiero observar para detectar cambios. Cada vez que se produce un cambio, me gustaría leer los nuevos datos para procesarlos.
¿Cuál es la mejor manera de hacer esto? Esperaba que hubiera algún tipo de gancho de la biblioteca PyWin32. He encontrado la función win32file.FindNextChangeNotification
pero no tengo idea de cómo pedirle que vea un archivo específico.
Si alguien ha hecho algo como esto, estaría muy agradecido de escuchar cómo ...
[Editar] Debería haber mencionado que buscaba una solución que no requiera un sondeo.
[Editar] ¡Maldiciones! Parece que esto no funciona en una unidad de red asignada. Supongo que Windows no "escucha" ninguna actualización del archivo como lo hace en un disco local.
Solución
¿Ya has mirado la documentación disponible en http://timgolden.me.uk /python/win32_how_do_i/watch_directory_for_changes.html ? Si solo necesita trabajar con Windows, el segundo ejemplo parece ser exactamente lo que desea (si intercambia la ruta del directorio con la del archivo que desea ver).
De lo contrario, el sondeo probablemente sea la única opción realmente independiente de la plataforma.
Nota: No he probado ninguna de estas soluciones.
Otros consejos
¿Intentaste usar Watchdog ?
Biblioteca de Python API y utilidades de shell para monitorear eventos del sistema de archivos.
La supervisión de directorios simplificada con
- Una API multiplataforma.
- Una herramienta de shell para ejecutar comandos en respuesta a cambios en el directorio.
Comience rápidamente con un ejemplo simple en Inicio rápido ...
Si el sondeo es lo suficientemente bueno para ti, solo observaría si la " hora modificada " cambios de estadísticas del archivo. Para leerlo:
os.stat(filename).st_mtime
(Tenga en cuenta que la solución de eventos de cambio nativo de Windows no funciona en todas las circunstancias, por ejemplo, en unidades de red).
import os
class Monkey(object):
def __init__(self):
self._cached_stamp = 0
self.filename = '/path/to/file'
def ook(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
Si desea una solución multiplataforma, verifique QFileSystemWatcher . Aquí un código de ejemplo (no saneado):
from PyQt4 import QtCore
@QtCore.pyqtSlot(str)
def directory_changed(path):
print('Directory Changed!!!')
@QtCore.pyqtSlot(str)
def file_changed(path):
print('File Changed!!!')
fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)
No debería funcionar en Windows (¿quizás con cygwin?), pero para usuarios de Unix, debe usar el " fcntl " llamada al sistema. Aquí hay un ejemplo en Python. En su mayoría es el mismo código si necesita escribirlo en C (los mismos nombres de funciones)
import time
import fcntl
import os
import signal
FNAME = "/HOME/TOTO/FILETOWATCH"
def handler(signum, frame):
print "File %s modified" % (FNAME,)
signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME, os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)
while True:
time.sleep(10000)
Consulte pyinotify .
inotify reemplaza a dnotify (de una respuesta anterior) en linuxes más recientes y permite la supervisión a nivel de archivo en lugar de a nivel de directorio.
Después de un poco de pirateo del guión de Tim Golden, tengo lo siguiente que parece funcionar bastante bien:
import os
import win32file
import win32con
path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt
def ProcessNewData( newData ):
print "Text added: %s"%newData
# Set up the bits we'll need for output
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
path_to_watch,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Open the file we're interested in
a = open(file_to_watch, "r")
# Throw away any exising log data
a.read()
# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're interested in
for action, file in results:
full_filename = os.path.join (path_to_watch, file)
#print file, ACTIONS.get (action, "Unknown")
if file == file_to_watch:
newText = a.read()
if newText != "":
ProcessNewData( newText )
Probablemente podría hacerlo con una carga más de comprobación de errores, pero para simplemente ver un archivo de registro y procesarlo antes de escupirlo a la pantalla, esto funciona bien.
Gracias a todos por tu aporte, ¡excelente!
La solución más simple para mí es usar la herramienta watchmed de watchmedo
De https://pypi.python.org/pypi/watchdog Ahora tengo un proceso que busca los archivos sql en un directorio y los ejecuta si es necesario.
watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.
Marque mi respuesta a una pregunta similar . Podrías probar el mismo bucle en Python. Esta página sugiere:
import time
while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
print line, # already has newline
También vea la pregunta tail () un archivo con Python .
Bueno, ya que estás usando Python, puedes abrir un archivo y seguir leyendo las líneas.
f = open('file.log')
Si la línea leída es no vacía , la procesa.
line = f.readline()
if line:
// Do what you want with the line
Puede que falte en que está bien seguir llamando a readline
en el EOF. Solo seguirá devolviendo una cadena vacía en este caso. Y cuando se agrega algo al archivo de registro, la lectura continuará desde donde se detuvo, según sea necesario.
Si está buscando una solución que use eventos o una biblioteca en particular, especifique esto en su pregunta. De lo contrario, creo que esta solución está bien.
Aquí hay una versión simplificada del código de Kender que parece hacer el mismo truco y no importa el archivo completo:
# Check file for new data.
import time
f = open(r'c:\temp\test.txt', 'r')
while True:
line = f.readline()
if not line:
time.sleep(1)
print 'Nothing New'
else:
print 'Call Function: ', line
Para ver un solo archivo con sondeo y dependencias mínimas, aquí hay un ejemplo completo, basado en la respuesta de Deestan (arriba):
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self.filename = watch_file
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
stamp = os.stat(self.filename).st_mtime
if stamp != self._cached_stamp:
self._cached_stamp = stamp
# File has changed, so do something...
print('File changed')
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except:
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
watch_file = 'my_file.txt'
# watcher = Watcher(watch_file) # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
Como se puede ver en artículo de Tim Golden , señalado por Horst Gutmann , WIN32 es relativamente complejo y mira directorios, no un solo archivo.
Me gustaría sugerirle que busque IronPython , que es un .NET implementación de python. Con IronPython puede usar toda la funcionalidad .NET , incluida
System.IO.FileSystemWatcher
Que maneja archivos individuales con una sencilla interfaz de Evento .
Esta es otra modificación de la secuencia de comandos de Tim Goldan que se ejecuta en Linux y agrega un observador simple para la modificación de archivos mediante el uso de un dict (archivo = > hora).
uso: lo que seaNombre.py ruta_de_dir_dir_de_tiempo
#!/usr/bin/env python
import os, sys, time
def files_to_timestamp(path):
files = [os.path.join(path, f) for f in os.listdir(path)]
return dict ([(f, os.path.getmtime(f)) for f in files])
if __name__ == "__main__":
path_to_watch = sys.argv[1]
print "Watching ", path_to_watch
before = files_to_timestamp(path_to_watch)
while 1:
time.sleep (2)
after = files_to_timestamp(path_to_watch)
added = [f for f in after.keys() if not f in before.keys()]
removed = [f for f in before.keys() if not f in after.keys()]
modified = []
for f in before.keys():
if not f in removed:
if os.path.getmtime(f) != before.get(f):
modified.append(f)
if added: print "Added: ", ", ".join(added)
if removed: print "Removed: ", ", ".join(removed)
if modified: print "Modified ", ", ".join(modified)
before = after
Este es un ejemplo de verificación de cambios en un archivo. Una que puede no ser la mejor manera de hacerlo, pero seguro que es una forma corta.
Herramienta práctica para reiniciar la aplicación cuando se han realizado cambios en la fuente. Hice esto cuando jugaba con pygame para poder ver los efectos inmediatamente después de guardar el archivo.
Cuando se usa en pygame, asegúrate de que las cosas en el bucle 'while' estén ubicadas en tu ciclo de juego, también conocido como actualización o lo que sea. De lo contrario, su aplicación se atascará en un bucle infinito y no verá la actualización de su juego.
file_size_stored = os.stat('neuron.py').st_size
while True:
try:
file_size_current = os.stat('neuron.py').st_size
if file_size_stored != file_size_current:
restart_program()
except:
pass
En caso de que quisieras el código de reinicio que encontré en la web. Aquí está. (No es relevante para la pregunta, aunque podría ser útil)
def restart_program(): #restart application
python = sys.executable
os.execl(python, python, * sys.argv)
Diviértete haciendo electrones haciendo lo que quieres que hagan.
ACTIONS = {
1 : "Created",
2 : "Deleted",
3 : "Updated",
4 : "Renamed from something",
5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
class myThread (threading.Thread):
def __init__(self, threadID, fileName, directory, origin):
threading.Thread.__init__(self)
self.threadID = threadID
self.fileName = fileName
self.daemon = True
self.dir = directory
self.originalFile = origin
def run(self):
startMonitor(self.fileName, self.dir, self.originalFile)
def startMonitor(fileMonitoring,dirPath,originalFile):
hDir = win32file.CreateFile (
dirPath,
FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None
)
# Wait for new data and call ProcessNewData for each new chunk that's
# written
while 1:
# Wait for a change to occur
results = win32file.ReadDirectoryChangesW (
hDir,
1024,
False,
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
None,
None
)
# For each change, check to see if it's updating the file we're
# interested in
for action, file_M in results:
full_filename = os.path.join (dirPath, file_M)
#print file, ACTIONS.get (action, "Unknown")
if len(full_filename) == len(fileMonitoring) and action == 3:
#copy to main file
...
Este es un ejemplo orientado a ver archivos de entrada que escriben no más de una línea por segundo, pero generalmente mucho menos. El objetivo es agregar la última línea (la escritura más reciente) al archivo de salida especificado. He copiado esto de uno de mis proyectos y acabo de borrar todas las líneas irrelevantes. Tendrás que rellenar o cambiar los símbolos que faltan.
from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow # Qt Creator gen'd
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
Ui_MainWindow.__init__(self)
self._fileWatcher = QFileSystemWatcher()
self._fileWatcher.fileChanged.connect(self.fileChanged)
def fileChanged(self, filepath):
QThread.msleep(300) # Reqd on some machines, give chance for write to complete
# ^^ About to test this, may need more sophisticated solution
with open(filepath) as file:
lastLine = list(file)[-1]
destPath = self._filemap[filepath]['dest file']
with open(destPath, 'a') as out_file: # a= append
out_file.writelines([lastLine])
Por supuesto, la clase QMainWindow que abarca no es estrictamente necesaria, es decir. puede utilizar QFileSystemWatcher solo.
La mejor y más simple solución es usar pygtail: https://pypi.python.org/pypi/pygtail
from pygtail import Pygtail
while True:
for line in Pygtail("some.log"):
sys.stdout.write(line)
También puede usar una biblioteca simple llamada repyt , aquí hay un ejemplo:
repyt ./app.py
Parece que nadie ha publicado fswatch . Es un observador de sistema de archivos multiplataforma. Solo instálalo, ejecútalo y sigue las instrucciones.
Lo he usado con los programas python y golang y simplemente funciona.
Solución relacionada con @ 4Oh4 un cambio suave para ver una lista de archivos;
import os
import sys
import time
class Watcher(object):
running = True
refresh_delay_secs = 1
# Constructor
def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
self._cached_stamp = 0
self._cached_stamp_files = {}
self.filenames = watch_files
self.call_func_on_change = call_func_on_change
self.args = args
self.kwargs = kwargs
# Look for changes
def look(self):
for file in self.filenames:
stamp = os.stat(file).st_mtime
if not file in self._cached_stamp_files:
self._cached_stamp_files[file] = 0
if stamp != self._cached_stamp_files[file]:
self._cached_stamp_files[file] = stamp
# File has changed, so do something...
file_to_read = open(file, 'r')
value = file_to_read.read()
print("value from file", value)
file_to_read.seek(0)
if self.call_func_on_change is not None:
self.call_func_on_change(*self.args, **self.kwargs)
# Keep watching in a loop
def watch(self):
while self.running:
try:
# Look for changes
time.sleep(self.refresh_delay_secs)
self.look()
except KeyboardInterrupt:
print('\nDone')
break
except FileNotFoundError:
# Action on file not found
pass
except Exception as e:
print(e)
print('Unhandled error: %s' % sys.exc_info()[0])
# Call this function each time a change happens
def custom_action(text):
print(text)
# pass
watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']
# watcher = Watcher(watch_file) # simple
if __name__ == "__main__":
watcher = Watcher(watch_files, custom_action, text='yes, changed') # also call custom action function
watcher.watch() # start the watch going
No conozco ninguna función específica de Windows. Puede intentar obtener el hash MD5 del archivo cada segundo / minuto / hora (depende de qué tan rápido lo necesite) y compararlo con el último hash. Cuando difiere, sabe que el archivo se ha modificado y ha leído las líneas más recientes.
Probaría algo como esto.
try:
f = open(filePath)
except IOError:
print "No such file: %s" % filePath
raw_input("Press Enter to close window")
try:
lines = f.readlines()
while True:
line = f.readline()
try:
if not line:
time.sleep(1)
else:
functionThatAnalisesTheLine(line)
except Exception, e:
# handle the exception somehow (for example, log the trace) and raise the same exception again
raw_input("Press Enter to close window")
raise e
finally:
f.close()
El bucle comprueba si hay una o más líneas desde la última vez que se leyó el archivo; si lo hay, se lee y se pasa a la función functionThatAnalisesTheLine
. Si no, el script espera 1 segundo y vuelve a intentar el proceso.