Pregunta

Antecedentes :

Estoy limpiando archivos delimitados por tabulaciones grandes (no se pueden guardar en la memoria). A medida que limpio el archivo de entrada, construyo una lista en la memoria; cuando llega a 1,000,000 de entradas (aproximadamente 1GB en memoria) lo clasifico (usando la clave predeterminada a continuación) y escribo la lista en un archivo. Esta clase es para volver a armar los archivos ordenados. Funciona en los archivos que he encontrado hasta ahora. Mi mayor caso, hasta ahora, es fusionar 66 archivos ordenados.

Preguntas:

  1. ¿Hay agujeros en mi lógica (dónde es frágil)?
  2. ¿He implementado el tipo de fusión algoritmo correctamente?
  3. ¿Hay alguna mejora obvia? que se puede hacer?

Datos de ejemplo:

Esta es una abstracción de una línea en uno de estos archivos:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

La conclusión es que uso 'SomeStringId'.lower().replace(' ', '') como mi clave de clasificación.

Código original:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

Editar: Implementando las sugerencias de Brian se me ocurrió la siguiente solución:

Segunda edición: actualizó el código según sugerencia de John Machin :

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

Áspero Prueba

Uso de los mismos archivos de entrada (2,2 GB de datos):

  • La clase SortedFileMerger tomó 51 minutos (3068.4 segundos)
  • La solución de
  • Brian tomó 40 minutos (2408.5 segundos)
  • Después de agregar sugerencias de John Machin , el código de la solución tomó 36 minutos (2214.0 segundos)
¿Fue útil?

Solución

Tenga en cuenta que en python2.6, heapq tiene una nueva merge función que hará esto por usted.

Para manejar la función de tecla personalizada, puede envolver el iterador de archivo con algo que lo decore para que se compare en función de la clave, y luego eliminarlo:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[Editar] Incluso en versiones anteriores de python, probablemente valga la pena simplemente implementar la fusión desde el módulo heapq posterior. Es Python puro, y se ejecuta sin modificaciones en Python2.5, y dado que utiliza un montón para obtener el siguiente mínimo, debería ser muy eficiente al fusionar grandes cantidades de archivos.

Debería poder simplemente copiar heapq.py desde una instalación de python2.6, copiarlo a su fuente como " heapq26.py " y use " from heapq26 import merge " - no hay 2.6 características específicas utilizadas en él. Alternativamente, puede copiar la función de fusión (reescribir las llamadas de heappop, etc. para hacer referencia al módulo de python2.5 heapq).

Otros consejos

< < Esta & Quot; respuesta & Quot; es un comentario sobre el código resultante del interrogador original > >

Sugerencia: el uso de eval () es ummmm y lo que está haciendo restringe a la persona que llama a usar lambda: la extracción de claves puede requerir más de una línea y, en cualquier caso, no necesita la misma función para el preliminar paso de clasificación?

Así que reemplace esto:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

con esto:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):    
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top