Ungetc in Python
Frage
Einige fungierende Dateien (Readlines ()) Funktionen in Python
Kopieren Sie den Dateiinhalt in den Speicher (als Liste)
Ich muss eine Datei verarbeiten, die zu groß ist
im Speicher kopiert werden und als solche verwenden müssen
Ein Dateizeiger (um auf die Datei ein Byte zugreifen zu können
zu einer Zeit) - wie in C getc ().
Die zusätzliche Anforderung, die ich habe, ist das
Ich möchte den Dateizeiger auf vorherige zurückspulen
Bytes wie in c UngetC ().
Gibt es eine Möglichkeit, dies in Python zu tun?
Auch in Python kann ich eine Zeile bei a lesen
Zeit mit Readline ()
Gibt es eine Möglichkeit, die vorherige Zeile zu lesen?
Rückwärts gehen?
Lösung
Wenn Sie einen Dateizeiger direkt verwenden möchten (ich denke, Mike Grahams Vorschlag ist jedoch besser), können Sie das Dateiobjekt verwenden suchen() Methode, mit der Sie den internen Zeiger in Kombination mit dem festlegen können lesen() Methode, die ein Optionsargument unterstützen, in dem angegeben ist, wie viele Bytes Sie lesen möchten.
Andere Tipps
Sie benötigen keine Dateizeiger, die Python nicht hat oder will.
Um eine Dateizeile per Linie durchzugehen, ohne das Ganze in den Speicher zu lesen, iterieren Sie einfach das Dateiobjekt selbst, dh
with open(filename, "r") as f: for line in f: ...
Verwendung
readlines
ist im Allgemeinen vermieden zu werden.Wenn Sie eine Linie zurückgehen, können Sie supertonisch tun. Wenn Sie nie mehr als eine Zeile zurückgehen müssen, schauen Sie sich das an die
pairwise
Rezept in deritertools
Dokumentation.
Ok, hier ist, was ich mir ausgedacht habe. Vielen Dank an Brenda für die Idee, eine Klasse zu bauen.
Vielen Dank an Josh für die Idee, C -ähnliche Funktionen zu verwenden. Seek () und Read ()
#!/bin/python
# Usage: BufRead.py inputfile
import sys, os, string
from inspect import currentframe
# Debug function usage
#
# if DEBUG:
# debugLogMsg(currentframe().f_lineno,currentframe().f_code.co_filename)
# print ...
def debugLogMsg(line,file,msg=""):
print "%s:%s %s" % (file,line,msg)
# Set DEBUG off.
DEBUG = 0
class BufRead:
def __init__(self,filename):
self.__filename = filename
self.__file = open(self.__filename,'rb')
self.__fileposition = self.__file.tell()
self.__file.seek(0, os.SEEK_END)
self.__filesize = self.__file.tell()
self.__file.seek(self.__fileposition, os.SEEK_SET)
def close(self):
if self.__file is not None:
self.__file.close()
self.__file = None
def seekstart(self):
if self.__file == None:
self.__file.seek(0, os.SEEK_SET)
self.__fileposition = self.__file.tell()
def seekend(self):
if self.__file == None:
self.__file.seek(-1, os.SEEK_END)
self.__fileposition = self.__file.tell()
def getc(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition < self.__filesize:
byte = self.__file.read(1)
self.__fileposition = self.__file.tell()
return byte
else:
return None
def ungetc(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition > 0:
self.__fileposition = self.__fileposition - 1
self.__file.seek(self.__fileposition, os.SEEK_SET)
byte = self.__file.read(1)
self.__file.seek(self.__fileposition, os.SEEK_SET)
return byte
else:
return None
# uses getc() and ungetc()
def getline(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition < self.__filesize:
startOfLine = False
line = ""
while True:
if self.__fileposition == 0:
startOfLine = True
break
else:
c = self.ungetc()
if c == '\n':
c = self.getc()
startOfLine = True
break
if startOfLine:
c = self.getc()
if c == '\n':
return '\n'
else:
self.ungetc()
while True:
c = self.getc()
if c == '\n':
line += c
c = self.getc()
if c == None:
return line
if c == '\n':
self.ungetc()
return line
elif c == None:
return line
else:
line += c
else:
return None
# uses getc() and ungetc()
def ungetline(self):
if self.__file == None:
return None
self.__fileposition = self.__file.tell()
if self.__fileposition > 0:
endOfLine = False
line = ""
while True:
if self.__fileposition == self.__filesize:
endOfLine = True
break
else:
c = self.getc()
if c == '\n':
c = self.ungetc()
endOfLine = True
break
if endOfLine:
c = self.ungetc()
if c == '\n':
return '\n'
else:
self.getc()
while True:
c = self.ungetc()
if c == None:
return line
if c == '\n':
line += c
c = self.ungetc()
if c == None:
return line
if c == '\n':
self.getc()
return line
elif c == None:
return line
else:
line = c + line
else:
return None
def main():
if len(sys.argv) == 2:
print sys.argv[1]
b = BufRead(sys.argv[1])
sys.stdout.write(
'----------------------------------\n' \
'- TESTING GETC \n' \
'----------------------------------\n')
while True:
c = b.getc()
if c == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(c)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING UNGETC \n' \
'----------------------------------\n')
while True:
c = b.ungetc()
if c == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(c)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING GETLINE \n' \
'----------------------------------\n')
b.seekstart()
while True:
line = b.getline()
if line == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(line)
sys.stdout.write(
'----------------------------------\n' \
'- TESTING UNGETLINE \n' \
'----------------------------------\n')
b.seekend()
while True:
line = b.ungetline()
if line == None:
sys.stdout.write('\n')
break
else:
sys.stdout.write(line)
b.close()
if __name__=="__main__": main()
Schreiben Sie eine Klasse, in der die Lese- und Puffereingaben für Sie eingegeben werden, und implementieren Sie UngetC darauf - vielleicht so etwas (Warnung: ungetestet, beim Kompilieren geschrieben):
class BufRead:
def __init__(self,filename):
self.filename = filename
self.fn = open(filename,'rb')
self.buffer = []
self.bufsize = 256
self.ready = True
def close(self):
if self.fn is not None:
self.fn.close()
self.fn = None
self.ready = False
def read(self,size=1):
l = len(self.buffer)
if not self.ready: return None
if l <= size:
s = self.buffer[:size]
self.buffer = self.buffer[size:]
return s
s = self.buffer
size = size - l
self.buffer = self.fn.read(min(self.bufsize,size))
if self.buffer is None or len(self.buffer) == 0:
self.ready = False
return s
return s + self.read(size)
def ungetc(self,ch):
if self.buffer is None:
self.buffer = [ch]
else:
self.buffer.append(ch)
self.ready = True
Ich möchte keine Milliarden von ungepufferten Einzelchar -Datei -Lesungen machen und ich wollte einen Weg
Um die Position des Dateizeigers zu debuggen. Daher habe ich beschlossen, die Dateiposition zurückzugeben
Zusätzlich zu einem Zeichen oder einer Zeile und zur Verwendung von MMAP, um die Datei dem Speicher zuzuordnen. (Und lassen Sie MMAP
Paging handeln) Ich denke, das wird ein Problem mit einem Problem, wenn die Datei wirklich, sehr groß ist. (Wie in größerer als die Menge des physischen Gedächtnisses) Wenn MMAP in das virtuelle Gedächtnis eingeht und die Dinge sehr langsam werden könnten. Im Moment verarbeitet es eine 50 -MB -Datei in ca. 4 min.
import sys, os, string, re, time
from mmap import mmap
class StreamReaderDb:
def __init__(self,stream):
self.__stream = mmap(stream.fileno(), os.path.getsize(stream.name))
self.__streamPosition = self.__stream.tell()
self.__stream.seek(0 , os.SEEK_END)
self.__streamSize = self.__stream.tell()
self.__stream.seek(self.__streamPosition, os.SEEK_SET)
def setStreamPositionDb(self,streamPosition):
if self.__stream == None:
return None
self.__streamPosition = streamPosition
self.__stream.seek(self.__streamPosition, os.SEEK_SET)
def streamPositionDb(self):
if self.__stream == None:
return None
return self.__streamPosition
def streamSize(self):
if self.__stream == None:
return None
return self.__streamSize
def close(self):
if self.__stream is not None:
self.__stream.close()
self.__stream = None
def seekStart(self):
if self.__stream == None:
return None
self.setStreamPositionDb(0)
def seekEnd(self):
if self.__stream == None:
return None
self.__stream.seek(-1, os.SEEK_END)
self.setStreamPositionDb(self.__stream.tell())
def getcDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
byte = self.__stream.read(1)
self.setStreamPositionDb(self.__stream.tell())
return byte,self.streamPositionDb()
else:
return None,self.streamPositionDb()
def unGetcDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
self.setStreamPositionDb(self.streamPositionDb() - 1)
byte = self.__stream.read(1)
self.__stream.seek(self.streamPositionDb(), os.SEEK_SET)
return byte,self.streamPositionDb()
else:
return None,self.streamPositionDb()
def seekLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
c,fp = self.getcDb()
return fp
else:
return None
def seekPrevLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
return fp
else:
return None
def seekPrevLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() < self.streamSize():
# Back up to the start of the line
while True:
if self.streamPositionDb() == 0:
return self.streamPositionDb()
else:
c,fp = self.unGetcDb()
if c == '\n':
return self.seekLineStartDb()
else:
return None
def seekLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
c,fp = self.unGetcDb()
return fp
else:
return None
def seekNextLineEndDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
return fp
else:
return None
def seekNextLineStartDb(self):
if self.__stream == None:
return None
self.setStreamPositionDb(self.__stream.tell())
if self.streamPositionDb() > 0:
while True:
if self.streamPositionDb() == self.streamSize():
return self.streamPositionDb()
else:
c,fp = self.getcDb()
if c == '\n':
return self.seekLineStartDb()
else:
return None
# uses getc() and ungetc()
def getLineDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
line = ""
if self.seekLineStartDb() != None:
c,fp = self.getcDb()
if c == '\n':
return c,self.streamPositionDb()
else:
self.unGetcDb()
while True:
c,fp = self.getcDb()
if c == '\n':
line += c
c,fp = self.getcDb()
if c == None:
return line,self.streamPositionDb()
self.unGetcDb()
return line,self.streamPositionDb()
elif c == None:
return line,self.streamPositionDb()
else:
line += c
else:
return None,self.streamPositionDb()
# uses getc() and ungetc()
def unGetLineDb(self):
if self.__stream == None:
return None,None
self.setStreamPositionDb(self.__stream.tell())
line = ""
if self.seekLineEndDb() != None:
c,fp = self.unGetcDb()
if c == '\n':
return c,self.streamPositionDb()
else:
self.getcDb()
while True:
c,fp = self.unGetcDb()
if c == None:
return line,self.streamPositionDb()
if c == '\n':
line += c
c,fp = self.unGetcDb()
if c == None:
return line,self.streamPositionDb()
self.getcDb()
return line,self.streamPositionDb()
elif c == None:
return line,self.streamPositionDb()
else:
line = c + line
else:
return None,self.streamPositionDb()
Die Frage wurde zunächst durch meine Notwendigkeit, einen lexikalischen Analysator aufzubauen, ausgewiesen.
Getc () und UngetC () sind zunächst nützlich (um die Lesefehler aus dem Weg zu räumen und
Um die Zustandsmaschine zu bauen) nach Abschluss der Zustandsmaschine,
GetC () und UngetC () werden zu einer Haftung, da sie zu lange dauern, um direkt aus dem Speicher zu lesen.
Als die Staatsmaschine abgeschlossen war (debuggierte alle IO -Probleme,
Finalisierte die Zustände), habe ich den lexikalischen Analysator optimiert.
Lesen Sie die Quelldatei in Stücken (oder Seiten) in den Speicher und in Laufen
Die Statusmaschine auf jeder Seite liefert das beste zeitliche Ergebnis.
Ich fand heraus, dass eine beträchtliche Zeit gespeichert wird, wenn GetC () und UNGETC () nicht verwendet werden
direkt aus der Datei lesen.