Вопрос

Некоторые файлы чтения (Readleines ()) функции в Python
Скопируйте содержимое файла в память (в виде списка)

Мне нужно обработать файл, который слишком велик, чтобы
быть скопированным в память и как такова необходимость использовать
указатель файла (для доступа к файлу один байт
За один раз) - как в C getc ().

Дополнительное требование, которое у меня есть, это то, что
Я хотел бы перемотать указатель файла на предыдущий
Байты, как в c ungetc ().

Есть ли способ сделать это в Python?

Также в Python я могу прочитать одну строку в
Время с readline ()

Есть ли способ прочитать предыдущую строку
идти назад?

Это было полезно?

Решение

Если вы хотите использовать указатель файла напрямую (я думаю, что предложение Mike Graham лучше), вы можете использовать файл объекта стремиться() Метод, который позволяет устанавливать внутренний указатель, в сочетании с читать() Способ, который поддерживает аргумент опционов, указывающий, сколько байтов вы хотели бы прочитать.

Другие советы

  • Вам не нужны указатели файлов, которые не нужны Python или хотите.

  • Чтобы пройти через файловую строку по строке, не прочитав все в память, просто повторяйте на сам объект файла, т. Е.

     with open(filename, "r") as f:
         for line in f:
             ...
    

    С использованием readlines как правило, следует избегать.

  • Возвращаясь к линейке - это не то, что вы можете сделать супер легко. Если вам никогда не нужно вернуться более одной строки, проверьте pairwise Рецепт в itertools документация.

Хорошо, вот что я придумал. Спасибо Бренда для идеи построения класса.
Спасибо Джош за идею использовать Chike Functions искать () и прочитать ()

#!/bin/python

# Usage: BufRead.py inputfile

import sys, os, string
from inspect import currentframe

# Debug function usage
#
# if DEBUG:
#   debugLogMsg(currentframe().f_lineno,currentframe().f_code.co_filename)
#   print ...
def debugLogMsg(line,file,msg=""):
  print "%s:%s %s" % (file,line,msg)

# Set DEBUG off.
DEBUG = 0

class BufRead: 
  def __init__(self,filename): 
    self.__filename           = filename 
    self.__file               = open(self.__filename,'rb') 
    self.__fileposition       = self.__file.tell()
    self.__file.seek(0, os.SEEK_END)
    self.__filesize           = self.__file.tell() 
    self.__file.seek(self.__fileposition, os.SEEK_SET) 

  def close(self): 
    if self.__file is not None: 
      self.__file.close() 
      self.__file = None 

  def seekstart(self):
    if self.__file == None:
      self.__file.seek(0, os.SEEK_SET)
      self.__fileposition = self.__file.tell()

  def seekend(self):
    if self.__file == None:
      self.__file.seek(-1, os.SEEK_END)
      self.__fileposition = self.__file.tell()

  def getc(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()
    if self.__fileposition < self.__filesize:
      byte = self.__file.read(1)
      self.__fileposition = self.__file.tell()
      return byte
    else:
      return None

  def ungetc(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()
    if self.__fileposition > 0:
      self.__fileposition = self.__fileposition - 1
      self.__file.seek(self.__fileposition, os.SEEK_SET)
      byte = self.__file.read(1)
      self.__file.seek(self.__fileposition, os.SEEK_SET)
      return byte
    else:
      return None

  # uses getc() and ungetc()
  def getline(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()

    if self.__fileposition < self.__filesize:
      startOfLine = False
      line = ""

      while True:
        if self.__fileposition == 0:
          startOfLine = True
          break
        else:
          c = self.ungetc()
          if c == '\n':
            c = self.getc()
            startOfLine = True
            break

      if startOfLine:
        c = self.getc()
        if c == '\n':
          return '\n'
        else:
          self.ungetc()

        while True:
          c = self.getc()
          if c == '\n':
            line += c
            c = self.getc()
            if c == None:
              return line
            if c == '\n':
              self.ungetc()
            return line
          elif c == None:
            return line
          else:
            line += c
    else:
      return None

  # uses getc() and ungetc()
  def ungetline(self):
    if self.__file == None:
      return None 
    self.__fileposition = self.__file.tell()

    if self.__fileposition > 0:
      endOfLine = False
      line = ""

      while True:
        if self.__fileposition == self.__filesize:
          endOfLine = True
          break
        else:
          c = self.getc()
          if c == '\n':
            c = self.ungetc()
            endOfLine = True
            break

      if endOfLine:
        c = self.ungetc()
        if c == '\n':
          return '\n'
        else:
          self.getc()

        while True:
          c = self.ungetc()
          if c == None:
            return line
          if c == '\n':
            line += c
            c = self.ungetc()
            if c == None:
              return line
            if c == '\n':
              self.getc()
            return line
          elif c == None:
            return line
          else:
            line = c + line
    else:
      return None

def main():
  if len(sys.argv) == 2:
    print sys.argv[1]
    b = BufRead(sys.argv[1])

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING GETC                    \n' \
      '----------------------------------\n')

    while True:
      c = b.getc()
      if c == None: 
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(c)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING UNGETC                  \n' \
      '----------------------------------\n')

    while True:
      c = b.ungetc()
      if c == None: 
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(c)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING GETLINE                 \n' \
      '----------------------------------\n')

    b.seekstart()

    while True:
      line = b.getline()
      if line == None:
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(line)

    sys.stdout.write(
      '----------------------------------\n' \
      '- TESTING UNGETLINE               \n' \
      '----------------------------------\n')

    b.seekend()

    while True:
      line = b.ungetline()
      if line == None:
        sys.stdout.write('\n')
        break
      else:
        sys.stdout.write(line)

    b.close()

if __name__=="__main__": main()

Напишите класс чтения и ввода буферов для вас, и внедряйте ungetc на нем - что-то вроде это возможно (предупреждение: непроверенные, написанные во время компиляции):

class BufRead:
    def __init__(self,filename):
        self.filename = filename
        self.fn = open(filename,'rb')
        self.buffer = []
        self.bufsize = 256
        self.ready = True
    def close(self):
        if self.fn is not None:
            self.fn.close()
            self.fn = None
        self.ready = False
    def read(self,size=1):
        l = len(self.buffer)
        if not self.ready: return None
        if l <= size:
            s = self.buffer[:size]
            self.buffer = self.buffer[size:]
            return s
        s = self.buffer
        size = size - l
        self.buffer = self.fn.read(min(self.bufsize,size))
        if self.buffer is None or len(self.buffer) == 0:
            self.ready = False
            return s
        return s + self.read(size)
    def ungetc(self,ch):
        if self.buffer is None:
            self.buffer = [ch]
        else:   
            self.buffer.append(ch)
        self.ready = True

Я не хочу делать миллиарды небуферизованного одиночного файла CHAR CHAR READ PLUS, я хотел путь
Для отладки позиции указателя файла. Следовательно, я решил вернуть позицию файла
В дополнение к Char или линии и использовать MMAP, чтобы сопоставить файл в память. (и дайте MMAP
Рукоятка пейджинга) Я думаю, что это будет немного проблемой, если файл действительно, действительно большой. (Как и больше, чем объем физической памяти), вот когда MMAP начнет войти в виртуальную память, и вещи могут быть действительно медленными. На данный момент он обрабатывает файл 50 МБ примерно через 4 мин.

import sys, os, string, re, time
from mmap import mmap

class StreamReaderDb: 
  def __init__(self,stream):
    self.__stream             = mmap(stream.fileno(), os.path.getsize(stream.name))  
    self.__streamPosition     = self.__stream.tell()
    self.__stream.seek(0                    , os.SEEK_END)
    self.__streamSize         = self.__stream.tell() 
    self.__stream.seek(self.__streamPosition, os.SEEK_SET) 

  def setStreamPositionDb(self,streamPosition):
    if self.__stream == None:
      return None 
    self.__streamPosition = streamPosition
    self.__stream.seek(self.__streamPosition, os.SEEK_SET)

  def streamPositionDb(self):
    if self.__stream == None:
      return None 
    return self.__streamPosition

  def streamSize(self):
    if self.__stream == None:
      return None 
    return self.__streamSize

  def close(self): 
    if self.__stream is not None: 
      self.__stream.close() 
      self.__stream = None 

  def seekStart(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(0)

  def seekEnd(self):
    if self.__stream == None:
      return None 
    self.__stream.seek(-1, os.SEEK_END)
    self.setStreamPositionDb(self.__stream.tell())

  def getcDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())
    if self.streamPositionDb() < self.streamSize():
      byte = self.__stream.read(1)
      self.setStreamPositionDb(self.__stream.tell())
      return byte,self.streamPositionDb()
    else:
      return None,self.streamPositionDb()

  def unGetcDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())
    if self.streamPositionDb() > 0:
      self.setStreamPositionDb(self.streamPositionDb() - 1)
      byte = self.__stream.read(1)
      self.__stream.seek(self.streamPositionDb(), os.SEEK_SET)
      return byte,self.streamPositionDb()
    else:
      return None,self.streamPositionDb()

  def seekLineStartDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            c,fp = self.getcDb()
            return fp
    else:
      return None

  def seekPrevLineEndDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            return fp
    else:
      return None

  def seekPrevLineStartDb(self):
    if self.__stream == None:
      return None
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() < self.streamSize():
      # Back up to the start of the line
      while True:
        if self.streamPositionDb() == 0:
          return self.streamPositionDb() 
        else:
          c,fp = self.unGetcDb()
          if c == '\n':
            return self.seekLineStartDb()
    else:
      return None

  def seekLineEndDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            c,fp = self.unGetcDb()
            return fp
    else:
      return None

  def seekNextLineEndDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            return fp
    else:
      return None

  def seekNextLineStartDb(self):
    if self.__stream == None:
      return None 
    self.setStreamPositionDb(self.__stream.tell())

    if self.streamPositionDb() > 0:
      while True:
        if self.streamPositionDb() == self.streamSize():
          return self.streamPositionDb()
        else:
          c,fp = self.getcDb()
          if c == '\n':
            return self.seekLineStartDb()
    else:
      return None

  # uses getc() and ungetc()
  def getLineDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())

    line = ""

    if self.seekLineStartDb() != None:
      c,fp = self.getcDb()
      if c == '\n':
        return c,self.streamPositionDb()
      else:
        self.unGetcDb()

      while True:
        c,fp = self.getcDb()
        if c == '\n':
          line += c
          c,fp = self.getcDb()
          if c == None:
            return line,self.streamPositionDb()
          self.unGetcDb()
          return line,self.streamPositionDb()
        elif c == None:
          return line,self.streamPositionDb()
        else:
          line += c
    else:
      return None,self.streamPositionDb()

  # uses getc() and ungetc()
  def unGetLineDb(self):
    if self.__stream == None:
      return None,None 
    self.setStreamPositionDb(self.__stream.tell())

    line = ""

    if self.seekLineEndDb() != None:
      c,fp = self.unGetcDb()
      if c == '\n':
        return c,self.streamPositionDb()
      else:
        self.getcDb()

      while True:
        c,fp = self.unGetcDb()
        if c == None:
          return line,self.streamPositionDb()
        if c == '\n':
          line += c
          c,fp = self.unGetcDb()
          if c == None:
            return line,self.streamPositionDb()
          self.getcDb()
          return line,self.streamPositionDb()
        elif c == None:
          return line,self.streamPositionDb()
        else:
          line = c + line
    else:
      return None,self.streamPositionDb()

Вопрос был первоначально вызван моим потребностью построить лексический анализатор.
getc () и ungetc () сначала полезны (чтобы получить ошибки прочитаны путь и
построить государственный автомат) после выполнения состояния,
getc () и ungetc () стать ответственностью, поскольку они слишком долго читают непосредственно из хранения.

Когда штата машина была завершена (отладка любых проблем IO,
Завершен штаты), я оптимизировал лексический анализатор.

Чтение исходного файла в кусках (или страницах) в память и работает
Государственная машина на каждой странице дает лучший результат времени.

Я обнаружил, что значительное время сохраняется, если getc () и ungetc () не используются
читать из файла напрямую.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top