Question

Arrière-plan:

Je nettoie de gros fichiers délimités par des tabulations (vous ne pouvez pas les conserver en mémoire). En nettoyant le fichier d'entrée, je crée une liste en mémoire; quand il arrive à 1.000.000 entrées (environ 1 Go en mémoire) je le trie (en utilisant la clé par défaut ci-dessous) et écris la liste dans un fichier. Cette classe est destinée à rassembler les fichiers triés. Cela fonctionne sur les fichiers que j'ai rencontrés jusqu'à présent. Mon cas le plus important, à ce jour, est la fusion de 66 fichiers triés.

Questions:

  1. Y a-t-il des trous dans ma logique (où est-il fragile)?
  2. Ai-je implémenté le fusionnement-tri algorithme correctement?
  3. Y a-t-il des améliorations évidentes cela pourrait être fait?

Exemple de données:

Il s’agit d’une abstraction d’une ligne dans l’un de ces fichiers:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

Ce que je retiens, c’est que j’utilise 'SomeStringId'.lower().replace(' ', '') comme clé de tri.

Code d'origine:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

Modifier: Implémentation des suggestions de Brian J'ai proposé la solution suivante:

Deuxième édition: le code de suggestion de John Machin a été mis à jour:

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

Test brutal

Utilisation des mêmes fichiers d'entrée (2,2 Go de données):

  • La classe SortedFileMerger a pris 51 minutes (3068,4 secondes)
  • La solution de Brian a pris 40 minutes (2408,5 secondes)
  • Après avoir ajouté les suggestions de John Machin , le code de solution a pris 36 minutes (2214.0 secondes)
Était-ce utile?

La solution

Notez que dans python2.6, heapq a un nouveau fusion. fonction qui le fera pour vous.

Pour gérer la fonction de clé personnalisée, vous pouvez simplement envelopper l'itérateur de fichier avec quelque chose qui la décore de manière à ce qu'il se compare en fonction de la clé, puis supprimez-le:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[Éditer] Même dans les versions antérieures de Python, il serait probablement intéressant de prendre simplement l'implémentation de la fusion dans le module heapq plus récent. C'est du pur python, qui tourne sans modification dans python2.5, et puisqu'il utilise un tas pour obtenir le prochain minimum, il devrait être très efficace lors de la fusion d'un grand nombre de fichiers.

Vous devriez pouvoir simplement copier le fichier heapq.py d’une installation de python2.6 et le copier sur votre source en tant que & "; heapq26.py &"; et utilisez " from heapq26 import merge " - il n'y a pas de fonctionnalités spécifiques à la version 2.6 utilisées. Alternativement, vous pouvez simplement copier la fonction de fusion (réécrire les appels heappop, etc. pour faire référence au module heapq python2.5).

Autres conseils

< < Cette & Quot; répondre & Quot; est un commentaire sur le code résultant de l'interrogateur initial > >

Suggestion: utiliser eval (), c’est ummmm, et ce que vous faites empêche l’appelant d’utiliser lambda - l’extraction de clé peut nécessiter plus d’une ligne, et dans tous les cas n’avez pas besoin de la même fonction pour la vérification préliminaire. étape de tri?

Donc remplacez ceci:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

avec ceci:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):    
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top