質問
大量のデータ(いくつかのギグ)があります。Pythonでzipファイルに書き込む必要があります。 ZipFileの.writestrメソッドに渡すために一度にすべてをメモリに読み込むことはできません。一時ファイルを使用してすべてをディスクに送り込んでから読み直したくはありません。
ジェネレータまたはファイルのようなオブジェクトをZipFileライブラリにフィードする方法はありますか?または、この機能がサポートされていないように見える理由がありますか?
zipファイルとは、zipファイルを意味します。 Python zipfileパッケージでサポートされています。
解決
唯一の解決策は、ファイルを圧縮してバッファから読み取るために使用するメソッドを書き換えることです。これを標準ライブラリに追加するのは簡単です。私はそれがまだ行われていないことに驚く。インターフェース全体をオーバーホールする必要があるという多くの合意があり、それが漸進的な改善を妨げているようです。
import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
def writebuffered(self, zipinfo, buffer):
zinfo = zipinfo
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = self.fp.tell()
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
他のヒント
クリスB.回答して完全なソリューションを作成しました。ここに他の誰かが興味を持っている場合があります:
import os
import threading
from zipfile import *
import zlib, binascii, struct
class ZipEntryWriter(threading.Thread):
def __init__(self, zf, zinfo, fileobj):
self.zf = zf
self.zinfo = zinfo
self.fileobj = fileobj
zinfo.file_size = 0
zinfo.flag_bits = 0x00
zinfo.header_offset = zf.fp.tell()
zf._writecheck(zinfo)
zf._didModify = True
zinfo.CRC = 0
zinfo.compress_size = compress_size = 0
zf.fp.write(zinfo.FileHeader())
super(ZipEntryWriter, self).__init__()
def run(self):
zinfo = self.zinfo
zf = self.zf
file_size = 0
CRC = 0
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = self.fileobj.read(1024 * 8)
if not buf:
self.fileobj.close()
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
zf.fp.write(buf)
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
zf.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
position = zf.fp.tell()
zf.fp.seek(zinfo.header_offset + 14, 0)
zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
zf.fp.seek(position, 0)
zf.filelist.append(zinfo)
zf.NameToInfo[zinfo.filename] = zinfo
class EnhZipFile(ZipFile, object):
def _current_writer(self):
return hasattr(self, 'cur_writer') and self.cur_writer or None
def assert_no_current_writer(self):
cur_writer = self._current_writer()
if cur_writer and cur_writer.isAlive():
raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
def write(self, filename, arcname=None, compress_type=None):
self.assert_no_current_writer()
super(EnhZipFile, self).write(filename, arcname, compress_type)
def writestr(self, zinfo_or_arcname, bytes):
self.assert_no_current_writer()
super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
def close(self):
self.finish_entry()
super(EnhZipFile, self).close()
def start_entry(self, zipinfo):
"""
Start writing a new entry with the specified ZipInfo and return a
file like object. Any data written to the file like object is
read by a background thread and written directly to the zip file.
Make sure to close the returned file object, before closing the
zipfile, or the close() would end up hanging indefinitely.
Only one entry can be open at any time. If multiple entries need to
be written, make sure to call finish_entry() before calling any of
these methods:
- start_entry
- write
- writestr
It is not necessary to explicitly call finish_entry() before closing
zipfile.
Example:
zf = EnhZipFile('tmp.zip', 'w')
w = zf.start_entry(ZipInfo('t.txt'))
w.write("some text")
w.close()
zf.close()
"""
self.assert_no_current_writer()
r, w = os.pipe()
self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
self.cur_writer.start()
return os.fdopen(w, 'w')
def finish_entry(self, timeout=None):
"""
Ensure that the ZipEntry that is currently being written is finished.
Joins on any background thread to exit. It is safe to call this method
multiple times.
"""
cur_writer = self._current_writer()
if not cur_writer or not cur_writer.isAlive():
return
cur_writer.join(timeout)
if __name__ == "__main__":
zf = EnhZipFile('c:/tmp/t.zip', 'w')
import time
w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
w.write("Line1\n")
w.write("Line2\n")
w.close()
zf.finish_entry()
w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
w.write("Some text\n")
w.close()
zf.close()
gzip.GzipFileは、データをgzip圧縮されたチャンクに書き込みます。これは、ファイルから読み取られた行数に従ってチャンクのサイズを設定できます。
例:
file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
chunks.append(line)
if len(chunks) >= X:
file.write("".join(chunks))
file.flush()
chunks = []
基本的な圧縮はzlib.compressobjによって行われます。 ZipFile(MacOSX上のPython 2.5の下ではコンパイルされているようです)。 Python 2.3バージョンは次のとおりです。
圧縮ファイルが8kチャンクでビルドされていることがわかります。多くのソースファイル属性(非圧縮サイズなど)がzipファイルヘッダーに記録されるため、ソースファイル情報の取り出しは複雑です。
def write(self, filename, arcname=None, compress_type=None):
"""Put the bytes from filename into the archive under the name
arcname."""
st = os.stat(filename)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
# Create ZipInfo instance to store file information
if arcname is None:
zinfo = ZipInfo(filename, date_time)
else:
zinfo = ZipInfo(arcname, date_time)
zinfo.external_attr = st[0] << 16L # Unix attributes
if compress_type is None:
zinfo.compress_type = self.compression
else:
zinfo.compress_type = compress_type
self._writecheck(zinfo)
fp = open(filename, "rb")
zinfo.flag_bits = 0x00
zinfo.header_offset = self.fp.tell() # Start of header bytes
# Must overwrite CRC and sizes with correct data later
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
zinfo.file_size = file_size = 0
self.fp.write(zinfo.FileHeader())
zinfo.file_offset = self.fp.tell() # Start of file bytes
if zinfo.compress_type == ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
zlib.DEFLATED, -15)
else:
cmpr = None
while 1:
buf = fp.read(1024 * 8)
if not buf:
break
file_size = file_size + len(buf)
CRC = binascii.crc32(buf, CRC)
if cmpr:
buf = cmpr.compress(buf)
compress_size = compress_size + len(buf)
self.fp.write(buf)
fp.close()
if cmpr:
buf = cmpr.flush()
compress_size = compress_size + len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Seek backwards and write CRC and file sizes
position = self.fp.tell() # Preserve current position in file
self.fp.seek(zinfo.header_offset + 14, 0)
self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
zinfo.file_size))
self.fp.seek(position, 0)
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
いくつかの(多くの?ほとんどの?)圧縮アルゴリズムは、全体ファイル全体の冗長性に基づいています。
圧縮ライブラリの中には、ファイルに最適な圧縮アルゴリズムに基づいて選択するものがあります。
ZipFileモジュールはこれを行うと考えているため、一度に断片だけでなくファイル全体を表示したいと考えています。
したがって、メモリにロードするためのジェネレーターや大きなファイルでは機能しません。それはZipfileライブラリの制限を説明するでしょう。
Python 3.5で変更(公式ドキュメントより): シークできないストリームへの書き込みのサポート。
これは、 zipfile.ZipFile
で、ファイル全体をメモリに保存しないストリームを使用できることを意味します。このようなストリームは、移動をサポートしていませんデータボリューム全体。
これはシンプルなジェネレーターです:
from zipfile import ZipFile, ZipInfo
def zipfile_generator(path, stream):
with ZipFile(stream, mode='w') as zf:
z_info = ZipInfo.from_file(path)
with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
for chunk in iter(lambda: entry.read(16384), b''):
dest.write(chunk)
# Yield chunk of the zip file stream in bytes.
yield stream.get()
# ZipFile was closed.
yield stream.get()
path
は、大きなファイルまたはディレクトリ、または pathlike
オブジェクトの文字列パスです。
stream
は、このようなクラスの unseekable ストリームインスタンスです(公式ドキュメント):
from io import RawIOBase
class UnseekableStream(RawIOBase):
def __init__(self):
self._buffer = b''
def writable(self):
return True
def write(self, b):
if self.closed:
raise ValueError('Stream was closed!')
self._buffer += b
return len(b)
def get(self):
chunk = self._buffer
self._buffer = b''
return chunk
このコードはオンラインで試すことができます: https://repl.it/@IvanErgunov/zipfilegenerator
ZipInfo
なしでジェネレーターを作成し、大きなファイルを手動で読み取りおよび分割する別の方法もあります。 queue.Queue()
オブジェクトを UnseekableStream()
オブジェクトに渡し、このキューに別のスレッドで書き込むことができます。次に、現在のスレッドで、このキューから繰り返してチャンクを読み取ることができます。 ドキュメント
P.S。 allanleiによるPython Zipstream は時代遅れで信頼性の低い方法です。公式に行われる前に、シークできないストリームのサポートを追加する試みでした。
gzipライブラリは、圧縮のためにファイルのようなオブジェクトを受け取ります。
class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])
まだzipファイルに含めるための名目上のファイル名を提供する必要がありますが、データソースをfileobjに渡すことができます。
(この回答はDamnsweetの回答とは異なります。焦点は、増分的に書き込まれる圧縮ファイルではなく、増分的に読み取られるデータソースにある必要があります。)
元の質問者はGzipを受け入れないようになりました:-(
Python 2.7を使用すると、ファイルのzipファイルにデータを追加できます:
http://docs.python.org/2/library/ zipfile#zipfile.ZipFile.writestr
これは2017年です。まだこれをエレガントに実行したい場合は、 allanleiによるPython Zipstream を使用してください。 これまでのところ、それはおそらくそれを達成するための唯一のよく書かれたライブラリです。
この質問に誰かがつまずいた場合、これは2017年でもPython 2.7に関連していますが、ここでは真のストリーミングzipファイルの実用的なソリューションを示します。秘密は、汎用ビットフラグのビット3を設定することです( https:// pkwareを参照してください) .cachefly.net / webdocs / casestudies / APPNOTE.TXT セクション4.3.9.1)。
この実装は常にZIP64スタイルのファイルを作成し、任意の大きなファイルに対してストリーミングが機能することに注意してください。中央ディレクトリレコードのzip64終了を強制するtoいハックが含まれているため、プロセスによって書き込まれたすべてのzipファイルがZIP64スタイルになることに注意してください。
import io
import zipfile
import zlib
import binascii
import struct
class ByteStreamer(io.BytesIO):
'''
Variant on BytesIO which lets you write and consume data while
keeping track of the total filesize written. When data is consumed
it is removed from memory, keeping the memory requirements low.
'''
def __init__(self):
super(ByteStreamer, self).__init__()
self._tellall = 0
def tell(self):
return self._tellall
def write(self, b):
orig_size = super(ByteStreamer, self).tell()
super(ByteStreamer, self).write(b)
new_size = super(ByteStreamer, self).tell()
self._tellall += (new_size - orig_size)
def consume(self):
bytes = self.getvalue()
self.seek(0)
self.truncate(0)
return bytes
class BufferedZipFileWriter(zipfile.ZipFile):
'''
ZipFile writer with true streaming (input and output).
Created zip files are always ZIP64-style because it is the only safe way to stream
potentially large zip files without knowing the full size ahead of time.
Example usage:
>>> def stream():
>>> bzfw = BufferedZip64FileWriter()
>>> for arc_path, buffer in inputs: # buffer is a file-like object which supports read(size)
>>> for chunk in bzfw.streambuffer(arc_path, buffer):
>>> yield chunk
>>> yield bzfw.close()
'''
def __init__(self, compression=zipfile.ZIP_DEFLATED):
self._buffer = ByteStreamer()
super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
date_time=time.localtime(time.time())[:6])
zinfo.compress_type = self.compression
zinfo.external_attr = 0o600 << 16 # ?rw-------
else:
zinfo = zinfo_or_arcname
zinfo.file_size = file_size = 0
zinfo.flag_bits = 0x08 # Streaming mode: crc and size come after the data
zinfo.header_offset = self.fp.tell()
self._writecheck(zinfo)
self._didModify = True
zinfo.CRC = CRC = 0
zinfo.compress_size = compress_size = 0
self.fp.write(zinfo.FileHeader())
if zinfo.compress_type == zipfile.ZIP_DEFLATED:
cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
else:
cmpr = None
while True:
buf = buffer.read(chunksize)
if not buf:
break
file_size += len(buf)
CRC = binascii.crc32(buf, CRC) & 0xffffffff
if cmpr:
buf = cmpr.compress(buf)
compress_size += len(buf)
self.fp.write(buf)
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
if cmpr:
buf = cmpr.flush()
compress_size += len(buf)
self.fp.write(buf)
zinfo.compress_size = compress_size
compressed_bytes = self._buffer.consume()
if compressed_bytes:
yield compressed_bytes
else:
zinfo.compress_size = file_size
zinfo.CRC = CRC
zinfo.file_size = file_size
# Write CRC and file sizes after the file data
# Always write as zip64 -- only safe way to stream what might become a large zipfile
fmt = '<LQQ'
self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
self.fp.flush()
self.filelist.append(zinfo)
self.NameToInfo[zinfo.filename] = zinfo
yield self._buffer.consume()
# The close method needs to be patched to force writing a ZIP64 file
# We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
def close(self):
tmp = zipfile.ZIP_FILECOUNT_LIMIT
zipfile.ZIP_FILECOUNT_LIMIT = 0
super(BufferedZipFileWriter, self).close()
zipfile.ZIP_FILECOUNT_LIMIT = tmp
return self._buffer.consume()