ソートされたファイルをマージするPythonクラス、これをどのように改善できますか?
-
05-07-2019 - |
質問
背景:
タブで区切られた大きなファイル(メモリに保持できません)をクリーニングしています。入力ファイルをクリーンアップするときに、メモリ内にリストを作成します。 1,000,000エントリ(メモリ内で約1GB)に達すると、それをソートし(以下のデフォルトキーを使用)、リストをファイルに書き込みます。このクラスは、ソートされたファイルを元に戻すためのものです。これまでに出会ったファイルで動作します。これまでの私の最大のケースは、66個のソート済みファイルをマージすることです。
質問:
- 私のロジックに穴はありますか(壊れやすい場所)?
- マージソートを実装しました アルゴリズムは正しく?
- 明らかな改善はありますか 作れますか?
サンプルデータ:
これは、次のファイルのいずれかの行を抽象化したものです。
'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'
要点は、ソートキーとして'SomeStringId'.lower().replace(' ', '')
を使用することです。
オリジナルコード:
class SortedFileMerger():
""" A one-time use object that merges any number of smaller sorted
files into one large sorted file.
ARGS:
paths - list of paths to sorted files
output_path - string path to desired output file
dedup - (boolean) remove lines with duplicate keys, default = True
key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
will be prepended by "lambda line: ". This should be the same
key that was used to sort the files being merged!
"""
def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
self.key = eval("lambda line: %s" % key)
self.dedup = dedup
self.handles = [open(path, 'r') for path in paths]
# holds one line from each file
self.lines = [file_handle.readline() for file_handle in self.handles]
self.output_file = open(output_path, 'w')
self.lines_written = 0
self._mergeSortedFiles() #call the main method
def __del__(self):
""" Clean-up file handles.
"""
for handle in self.handles:
if not handle.closed:
handle.close()
if self.output_file and (not self.output_file.closed):
self.output_file.close()
def _mergeSortedFiles(self):
""" Merge the small sorted files to 'self.output_file'. This can
and should only be called once.
Called from __init__().
"""
previous_comparable = ''
min_line = self._getNextMin()
while min_line:
index = self.lines.index(min_line)
comparable = self.key(min_line)
if not self.dedup:
#not removing duplicates
self._writeLine(index)
elif comparable != previous_comparable:
#removing duplicates and this isn't one
self._writeLine(index)
else:
#removing duplicates and this is one
self._readNextLine(index)
previous_comparable = comparable
min_line = self._getNextMin()
#finished merging
self.output_file.close()
def _getNextMin(self):
""" Returns the next "smallest" line in sorted order.
Returns None when there are no more values to get.
"""
while '' in self.lines:
index = self.lines.index('')
if self._isLastLine(index):
# file.readline() is returning '' because
# it has reached the end of a file.
self._closeFile(index)
else:
# an empty line got mixed in
self._readNextLine(index)
if len(self.lines) == 0:
return None
return min(self.lines, key=self.key)
def _writeLine(self, index):
""" Write line to output file and update self.lines
"""
self.output_file.write(self.lines[index])
self.lines_written += 1
self._readNextLine(index)
def _readNextLine(self, index):
""" Read the next line from handles[index] into lines[index]
"""
self.lines[index] = self.handles[index].readline()
def _closeFile(self, index):
""" If there are no more lines to get in a file, it
needs to be closed and removed from 'self.handles'.
It's entry in 'self.lines' also need to be removed.
"""
handle = self.handles.pop(index)
if not handle.closed:
handle.close()
# remove entry from self.lines to preserve order
_ = self.lines.pop(index)
def _isLastLine(self, index):
""" Check that handles[index] is at the eof.
"""
handle = self.handles[index]
if handle.tell() == os.path.getsize(handle.name):
return True
return False
編集: Brian からの提案を実装しました。次の解決策を思い付きました:
2番目の編集: John Machin の提案に従ってコードを更新しました:
def decorated_file(f, key):
""" Yields an easily sortable tuple.
"""
for line in f:
yield (key(line), line)
def standard_keyfunc(line):
""" The standard key function in my application.
"""
return line.split('\t', 2)[1].replace(' ', '').lower()
def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
""" Does the same thing SortedFileMerger class does.
"""
files = map(open, paths) #open defaults to mode='r'
output_file = open(output_path, 'w')
lines_written = 0
previous_comparable = ''
for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
comparable = line[0]
if previous_comparable != comparable:
output_file.write(line[1])
lines_written += 1
previous_comparable = comparable
return lines_written
ラフテスト
同じ入力ファイル(2.2 GBのデータ)を使用する:
- SortedFileMergerクラスには51 分(3068.4秒)
- Brian のソリューションには40分(2408.5秒)かかりました
- John Machin の提案を追加した後、 ソリューションコードは36分かかりました (2214.0秒)
解決
python2.6では、heapqに新しい merge があることに注意してください。これを行う関数。
カスタムキー関数を処理するには、ファイルイテレータをそれを装飾するものでラップして、キーに基づいて比較し、後で削除することができます:
def decorated_file(f, key):
for line in f:
yield (key(line), line)
filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')
for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
outfile.write(line[1])
[編集] 以前のバージョンのpythonでさえ、おそらく後のheapqモジュールからマージの実装を取得するだけの価値があるでしょう。純粋なpythonであり、python2.5では変更なしで実行されます。ヒープを使用して次の最小値を取得するので、多数のファイルをマージする際に非常に効率的です。
python2.6インストールからheapq.pyを単純にコピーし、<!> quot; heapq26.py <!> quotとしてソースにコピーできるはずです。 <!> quot; from heapq26 import merge
<!> quot;を使用します。 -2.6固有の機能は使用されていません。または、マージ関数をコピーすることもできます(heappopなどの呼び出しを書き換えて、python2.5 heapqモジュールを参照します)。
他のヒント
<!> lt; <!> lt;この<!> quot; answer <!> quot;は、元の質問者の結果コード<!> gt; <!> gt;
に対するコメントです提案:eval()の使用はummmmであり、あなたがしていることは呼び出し側がラムダを使用することを制限している-キーの抽出には複数のライナーが必要な場合があります。ソートステップ?
これを置き換える:
def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
keyfunc = eval("lambda line: %s" % key)
これ:
def my_keyfunc(line):
return line.split('\t', 2)[1].replace(' ', '').lower()
# minor tweaks may speed it up a little
def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):