Pythonのジェネレーターからzipファイルを作成しますか?

StackOverflow https://stackoverflow.com/questions/297345

  •  08-07-2019
  •  | 
  •  

質問

大量のデータ(いくつかのギグ)があります。Pythonでzipファイルに書き込む必要があります。 ZipFileの.writestrメソッドに渡すために一度にすべてをメモリに読み込むことはできません。一時ファイルを使用してすべてをディスクに送り込んでから読み直したくはありません。

ジェネレータまたはファイルのようなオブジェクトをZipFileライブラリにフィードする方法はありますか?または、この機能がサポートされていないように見える理由がありますか?

zipファイルとは、zipファイルを意味します。 Python zipfileパッケージでサポートされています。

役に立ちましたか?

解決

唯一の解決策は、ファイルを圧縮してバッファから読み取るために使用するメソッドを書き換えることです。これを標準ライブラリに追加するのは簡単です。私はそれがまだ行われていないことに驚く。インターフェース全体をオーバーホールする必要があるという多くの合意があり、それが漸進的な改善を妨げているようです。

import zipfile, zlib, binascii, struct
class BufferedZipFile(zipfile.ZipFile):
    def writebuffered(self, zipinfo, buffer):
        zinfo = zipinfo

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(1024 * 8)
            if not buf:
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            self.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = self.fp.tell()
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo

他のヒント

クリスB.回答して完全なソリューションを作成しました。ここに他の誰かが興味を持っている場合があります:

import os
import threading
from zipfile import *
import zlib, binascii, struct

class ZipEntryWriter(threading.Thread):
    def __init__(self, zf, zinfo, fileobj):
        self.zf = zf
        self.zinfo = zinfo
        self.fileobj = fileobj

        zinfo.file_size = 0
        zinfo.flag_bits = 0x00
        zinfo.header_offset = zf.fp.tell()

        zf._writecheck(zinfo)
        zf._didModify = True

        zinfo.CRC = 0
        zinfo.compress_size = compress_size = 0
        zf.fp.write(zinfo.FileHeader())

        super(ZipEntryWriter, self).__init__()

    def run(self):
        zinfo = self.zinfo
        zf = self.zf
        file_size = 0
        CRC = 0

        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None
        while True:
            buf = self.fileobj.read(1024 * 8)
            if not buf:
                self.fileobj.close()
                break

            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)

            zf.fp.write(buf)

        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            zf.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        position = zf.fp.tell()
        zf.fp.seek(zinfo.header_offset + 14, 0)
        zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
        zf.fp.seek(position, 0)
        zf.filelist.append(zinfo)
        zf.NameToInfo[zinfo.filename] = zinfo

class EnhZipFile(ZipFile, object):

    def _current_writer(self):
        return hasattr(self, 'cur_writer') and self.cur_writer or None

    def assert_no_current_writer(self):
        cur_writer = self._current_writer()
        if cur_writer and cur_writer.isAlive():
            raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)

    def write(self, filename, arcname=None, compress_type=None):
        self.assert_no_current_writer()
        super(EnhZipFile, self).write(filename, arcname, compress_type)

    def writestr(self, zinfo_or_arcname, bytes):
        self.assert_no_current_writer()
        super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)

    def close(self):
        self.finish_entry()
        super(EnhZipFile, self).close()

    def start_entry(self, zipinfo):
        """
        Start writing a new entry with the specified ZipInfo and return a
        file like object. Any data written to the file like object is
        read by a background thread and written directly to the zip file.
        Make sure to close the returned file object, before closing the
        zipfile, or the close() would end up hanging indefinitely.

        Only one entry can be open at any time. If multiple entries need to
        be written, make sure to call finish_entry() before calling any of
        these methods:
        - start_entry
        - write
        - writestr
        It is not necessary to explicitly call finish_entry() before closing
        zipfile.

        Example:
            zf = EnhZipFile('tmp.zip', 'w')
            w = zf.start_entry(ZipInfo('t.txt'))
            w.write("some text")
            w.close()
            zf.close()
        """
        self.assert_no_current_writer()
        r, w = os.pipe()
        self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
        self.cur_writer.start()
        return os.fdopen(w, 'w')

    def finish_entry(self, timeout=None):
        """
        Ensure that the ZipEntry that is currently being written is finished.
        Joins on any background thread to exit. It is safe to call this method
        multiple times.
        """
        cur_writer = self._current_writer()
        if not cur_writer or not cur_writer.isAlive():
            return
        cur_writer.join(timeout)

if __name__ == "__main__":
    zf = EnhZipFile('c:/tmp/t.zip', 'w')
    import time
    w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
    w.write("Line1\n")
    w.write("Line2\n")
    w.close()
    zf.finish_entry()
    w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
    w.write("Some text\n")
    w.close()
    zf.close()

gzip.GzipFileは、データをgzip圧縮されたチャンクに書き込みます。これは、ファイルから読み取られた行数に従ってチャンクのサイズを設定できます。

例:

file = gzip.GzipFile('blah.gz', 'wb')
sourcefile = open('source', 'rb')
chunks = []
for line in sourcefile:
  chunks.append(line)
  if len(chunks) >= X: 
      file.write("".join(chunks))
      file.flush()
      chunks = []

基本的な圧縮はzlib.compressobjによって行われます。 ZipFile(MacOSX上のPython 2.5の下ではコンパイルされているようです)。 Python 2.3バージョンは次のとおりです。

圧縮ファイルが8kチャンクでビルドされていることがわかります。多くのソースファイル属性(非圧縮サイズなど)がzipファイルヘッダーに記録されるため、ソースファイル情報の取り出しは複雑です。

def write(self, filename, arcname=None, compress_type=None):
    """Put the bytes from filename into the archive under the name
    arcname."""

    st = os.stat(filename)
    mtime = time.localtime(st.st_mtime)
    date_time = mtime[0:6]
    # Create ZipInfo instance to store file information
    if arcname is None:
        zinfo = ZipInfo(filename, date_time)
    else:
        zinfo = ZipInfo(arcname, date_time)
    zinfo.external_attr = st[0] << 16L      # Unix attributes
    if compress_type is None:
        zinfo.compress_type = self.compression
    else:
        zinfo.compress_type = compress_type
    self._writecheck(zinfo)
    fp = open(filename, "rb")

    zinfo.flag_bits = 0x00
    zinfo.header_offset = self.fp.tell()    # Start of header bytes
    # Must overwrite CRC and sizes with correct data later
    zinfo.CRC = CRC = 0
    zinfo.compress_size = compress_size = 0
    zinfo.file_size = file_size = 0
    self.fp.write(zinfo.FileHeader())
    zinfo.file_offset = self.fp.tell()      # Start of file bytes
    if zinfo.compress_type == ZIP_DEFLATED:
        cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
             zlib.DEFLATED, -15)
    else:
        cmpr = None
    while 1:
        buf = fp.read(1024 * 8)
        if not buf:
            break
        file_size = file_size + len(buf)
        CRC = binascii.crc32(buf, CRC)
        if cmpr:
            buf = cmpr.compress(buf)
            compress_size = compress_size + len(buf)
        self.fp.write(buf)
    fp.close()
    if cmpr:
        buf = cmpr.flush()
        compress_size = compress_size + len(buf)
        self.fp.write(buf)
        zinfo.compress_size = compress_size
    else:
        zinfo.compress_size = file_size
    zinfo.CRC = CRC
    zinfo.file_size = file_size
    # Seek backwards and write CRC and file sizes
    position = self.fp.tell()       # Preserve current position in file
    self.fp.seek(zinfo.header_offset + 14, 0)
    self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
          zinfo.file_size))
    self.fp.seek(position, 0)
    self.filelist.append(zinfo)
    self.NameToInfo[zinfo.filename] = zinfo

いくつかの(多くの?ほとんどの?)圧縮アルゴリズムは、全体ファイル全体の冗長性に基づいています。

圧縮ライブラリの中には、ファイルに最適な圧縮アルゴリズムに基づいて選択するものがあります。

ZipFileモジュールはこれを行うと考えているため、一度に断片だけでなくファイル全体を表示したいと考えています。

したがって、メモリにロードするためのジェネレーターや大きなファイルでは機能しません。それはZipfileライブラリの制限を説明するでしょう。

Python 3.5で変更(公式ドキュメントより): シークできないストリームへの書き込みのサポート

これは、 zipfile.ZipFile で、ファイル全体をメモリに保存しないストリームを使用できることを意味します。このようなストリームは、移動をサポートしていませんデータボリューム全体。

これはシンプルなジェネレーターです:

from zipfile import ZipFile, ZipInfo

def zipfile_generator(path, stream):
    with ZipFile(stream, mode='w') as zf:
        z_info = ZipInfo.from_file(path)
        with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
            for chunk in iter(lambda: entry.read(16384), b''):
                dest.write(chunk)
                # Yield chunk of the zip file stream in bytes.
                yield stream.get()
    # ZipFile was closed.
    yield stream.get()

path は、大きなファイルまたはディレクトリ、または pathlike オブジェクトの文字列パスです。

stream は、このようなクラスの unseekable ストリームインスタンスです(公式ドキュメント):

from io import RawIOBase

class UnseekableStream(RawIOBase):
    def __init__(self):
        self._buffer = b''

    def writable(self):
        return True

    def write(self, b):
        if self.closed:
            raise ValueError('Stream was closed!')
        self._buffer += b
        return len(b)

    def get(self):
        chunk = self._buffer
        self._buffer = b''
        return chunk

このコードはオンラインで試すことができます: https://repl.it/@IvanErgunov/zipfilegenerator


ZipInfo なしでジェネレーターを作成し、大きなファイルを手動で読み取りおよび分割する別の方法もあります。 queue.Queue()オブジェクトを UnseekableStream()オブジェクトに渡し、このキューに別のスレッドで書き込むことができます。次に、現在のスレッドで、このキューから繰り返してチャンクを読み取ることができます。 ドキュメント

を参照してください。

P.S。 allanleiによるPython Zipstream は時代遅れで信頼性の低い方法です。公式に行われる前に、シークできないストリームのサポートを追加する試みでした。

gzipライブラリは、圧縮のためにファイルのようなオブジェクトを受け取ります。

class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])

まだzipファイルに含めるための名目上のファイル名を提供する必要がありますが、データソースをfileobjに渡すことができます。

(この回答はDamnsweetの回答とは異なります。焦点は、増分的に書き込まれる圧縮ファイルではなく、増分的に読み取られるデータソースにある必要があります。)

元の質問者はGzipを受け入れないようになりました:-(

Python 2.7を使用すると、ファイルのzipファイルにデータを追加できます:

http://docs.python.org/2/library/ zipfile#zipfile.ZipFile.writestr

この質問に誰かがつまずいた場合、これは2017年でもPython 2.7に関連していますが、ここでは真のストリーミングzipファイルの実用的なソリューションを示します。秘密は、汎用ビットフラグのビット3を設定することです( https:// pkwareを参照してください) .cachefly.net / webdocs / casestudies / APPNOTE.TXT セクション4.3.9.1)。

この実装は常にZIP64スタイルのファイルを作成し、任意の大きなファイルに対してストリーミングが機能することに注意してください。中央ディレクトリレコードのzip64終了を強制するtoいハックが含まれているため、プロセスによって書き込まれたすべてのzipファイルがZIP64スタイルになることに注意してください。

import io
import zipfile
import zlib
import binascii
import struct

class ByteStreamer(io.BytesIO):
    '''
    Variant on BytesIO which lets you write and consume data while
    keeping track of the total filesize written. When data is consumed
    it is removed from memory, keeping the memory requirements low.
    '''
    def __init__(self):
        super(ByteStreamer, self).__init__()
        self._tellall = 0

    def tell(self):
        return self._tellall

    def write(self, b):
        orig_size = super(ByteStreamer, self).tell()
        super(ByteStreamer, self).write(b)
        new_size = super(ByteStreamer, self).tell()
        self._tellall += (new_size - orig_size)

    def consume(self):
        bytes = self.getvalue()
        self.seek(0)
        self.truncate(0)
        return bytes

class BufferedZipFileWriter(zipfile.ZipFile):
    '''
    ZipFile writer with true streaming (input and output).
    Created zip files are always ZIP64-style because it is the only safe way to stream
    potentially large zip files without knowing the full size ahead of time.

    Example usage:
    >>> def stream():
    >>>     bzfw = BufferedZip64FileWriter()
    >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
    >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
    >>>             yield chunk
    >>>     yield bzfw.close()
    '''
    def __init__(self, compression=zipfile.ZIP_DEFLATED):
        self._buffer = ByteStreamer()
        super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)

    def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
        if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
            zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                    date_time=time.localtime(time.time())[:6])
            zinfo.compress_type = self.compression
            zinfo.external_attr = 0o600 << 16     # ?rw-------
        else:
            zinfo = zinfo_or_arcname

        zinfo.file_size = file_size = 0
        zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
        zinfo.header_offset = self.fp.tell()

        self._writecheck(zinfo)
        self._didModify = True

        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        self.fp.write(zinfo.FileHeader())
        if zinfo.compress_type == zipfile.ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
        else:
            cmpr = None

        while True:
            buf = buffer.read(chunksize)
            if not buf:
                break

            file_size += len(buf)
            CRC = binascii.crc32(buf, CRC) & 0xffffffff
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size += len(buf)

            self.fp.write(buf)
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes

        if cmpr:
            buf = cmpr.flush()
            compress_size += len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
            compressed_bytes = self._buffer.consume()
            if compressed_bytes:
                yield compressed_bytes
        else:
            zinfo.compress_size = file_size

        zinfo.CRC = CRC
        zinfo.file_size = file_size

        # Write CRC and file sizes after the file data
        # Always write as zip64 -- only safe way to stream what might become a large zipfile
        fmt = '<LQQ'
        self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))

        self.fp.flush()
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
        yield self._buffer.consume()

    # The close method needs to be patched to force writing a ZIP64 file
    # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
    def close(self):
        tmp = zipfile.ZIP_FILECOUNT_LIMIT
        zipfile.ZIP_FILECOUNT_LIMIT = 0
        super(BufferedZipFileWriter, self).close()
        zipfile.ZIP_FILECOUNT_LIMIT = tmp
        return self._buffer.consume()
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top