Source code for range_streams.codecs.zip.stream

from __future__ import annotations

import io
import struct

from pyzstd import ZstdFile
from ranges import Range

from ...stream import RangeStream
from ..zstd import ZstdTarFile
from .data import COMPRESSIONS, ZipData

__all__ = ["ZipStream", "ZippedFileInfo"]


[docs]class ZipStream(RangeStream): """ As for :class:`~range_streams.stream.RangeStream`, but if ``scan_contents`` is True, then immediately call :meth:`~range_streams.codecs.zip.ZipStream.check_central_dir_rec` on initialisation (which will perform a series of range requests to identify the files in the zip from the End of Central Directory Record and Central Directory Record), setting :attr:`~range_streams.codecs.zip.ZipStream.zipped_files`, and :meth:`~range_streams.stream.RangeStream.add` their file content ranges to the stream. Setting this can be postponed until first access of the :attr:`~range_streams.codecs.zip.ZipStream.filename_list` property (this will not :meth:`~range_streams.stream.RangeStream.add` them to the :class:`~range_streams.codecs.zip.ZipStream`). Once parsed, the file contents are stored as a list of :class:`ZippedFileInfo` objects (in the order they appear in the Central Directory Record) in the :attr:`~range_streams.codecs.zip.ZipStream.zipped_files` attribute. Each of these objects has a :meth:`~ZippedFileInfo.file_range` method which gives the range of its file content bytes within the :class:`~range_streams.codecs.zip.ZipStream`. """
[docs] def __init__( self, url: str, client=None, byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, single_request: bool = False, force_async: bool = False, chunk_size: int | None = None, raise_response: bool = True, scan_contents: bool = True, ): """ Set up a stream for the ZIP archive at ``url``, with either an initial range to be requested (HTTP partial content request), or if left as the empty range (default: ``Range(0,0)``) a HEAD request will be sent instead, so as to set the total size of the target file on the :attr:`~range_streams.stream.RangeStream.total_bytes` property. By default (if ``client`` is left as ``None``) a fresh :class:`httpx.Client` will be created for each stream. The ``byte_range`` can be specified as either a :class:`~ranges.Range` object, or 2-tuple of integers (``(start, end)``), interpreted either way as a half-closed interval ``[start, end)``, as given by Python's built-in :class:`range`. The ``pruning_level`` controls the policy for overlap handling (``0`` will resize overlapped ranges, ``1`` will delete overlapped ranges, and ``2`` will raise an error when a new range is added which overlaps a pre-existing range). If ``single_request`` is ``True`` (default: ``False``), then the behaviour when an empty ``byte_range`` is passed instead becomes to send a standard streaming GET request (not a partial content request at all), and instead the class will then facilitate an interface that 'simulates' these calls, i.e. as if each time :meth:`~range_streams.stream.RangeStream.add` was used the range requests were being returned instantly (as everything needed was already obtained on the first request at initialisation). More performant when reading a stream linearly. - See docs for the :meth:`~range_streams.stream.RangeStream.handle_overlap` method for further details. Args: url : (:class:`str`) The URL of the file to be streamed client : (:class:`httpx.Client` | ``None``) The HTTPX client to use for HTTP requests byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range of positions on the file to be requested pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'), or ``2`` ('strict') single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. force_async : (:class:`bool` | ``None``) Whether to require the client to be ``httpx.AsyncClient``, and if no client is given, to create one on initialisation. (Experimental/WIP) chunk_size : (:class:`int` | ``None``) The chunk size used for the ``httpx.Response.iter_raw`` response byte iterators raise_response : (:class:`bool`) Whether to raise HTTP status code exceptions scan_contents : (:class:`bool`) Whether to scan the archive contents upon initialisation and add the archive's file ranges """ super().__init__( url=url, client=client, byte_range=byte_range, pruning_level=pruning_level, single_request=single_request, force_async=force_async, chunk_size=chunk_size, raise_response=raise_response, ) self.data = ZipData() if scan_contents: self.check_central_dir_rec() self.add_file_ranges()
[docs] def check_head_bytes(self): start_sig = self.data.LOC_F_H.start_sig head_byte_range = Range(0, len(start_sig)) if self.client_is_async: self.add_async(head_byte_range) else: self.add(head_byte_range) start_bytes = self.active_range_response.read() if start_bytes != start_sig: # pragma: no cover # Actually think this will be if zip is empty raise ValueError( f"Invalid zip header sequence {start_bytes=!r}: expected {start_sig!r}" )
[docs] def check_end_of_central_dir_start(self): """ If the zip file lacks a comment, the End Of Central Directory Record will be the last thing in it, so taking the range equal to its expected size and checking for the expected start signature will find it. """ eocd_rng = self.total_range eocd_rng.start = eocd_rng.end - self.data.E_O_CTRL_DIR_REC.get_size() if self.client_is_async: self.add_async(eocd_rng) else: self.add(eocd_rng) eocd_bytes = self.active_range_response.read() start_sig = self.data.E_O_CTRL_DIR_REC.start_sig start_found = eocd_bytes[: len(start_sig)] == start_sig no_comment = eocd_bytes[-2:] == b"\000\000" if start_found and no_comment: self.data.E_O_CTRL_DIR_REC.start_pos = eocd_rng.start else: # self.search_back_to_end_of_central_dir() raise NotImplementedError("Brute force search is deprecated")
[docs] def check_end_of_central_dir_rec(self): """ Using the stored start position of the End Of Central Directory Record (or calculating and storing it if it is not yet set on the object), """ if self.data.E_O_CTRL_DIR_REC.start_pos is None: self.check_end_of_central_dir_start() eocd_rng = self.total_range eocd_rng.start = self.data.E_O_CTRL_DIR_REC.start_pos if self.client_is_async: self.add_async(eocd_rng) else: self.add(eocd_rng) b = self.active_range_response.read()[: self.data.E_O_CTRL_DIR_REC.get_size()] u = struct.unpack(self.data.E_O_CTRL_DIR_REC.struct, b) _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 self.data.CTRL_DIR_REC.entry_count = u[_ECD_ENTRIES_TOTAL] self.data.CTRL_DIR_REC.size = u[_ECD_SIZE] self.data.CTRL_DIR_REC.start_pos = u[_ECD_OFFSET] return
[docs] def check_central_dir_rec(self): """ Read the range corresponding to the Central Directory Record (after :meth:`check_end_of_central_dir_rec` has been called). """ if self.data.CTRL_DIR_REC.size is None: # pragma: no cover self.check_end_of_central_dir_rec() size_cd_full = self.data.CTRL_DIR_REC.size # total size of CDR (all entries) cd_read_offset = 0 # byte offset incremented after each entry self.zipped_files = [] entry_range = range(self.data.CTRL_DIR_REC.entry_count) # type: ignore for entry_i in entry_range: cd_start = self.data.CTRL_DIR_REC.start_pos + cd_read_offset # type: ignore cd_size = self.data.CTRL_DIR_REC.get_size() cd_end = cd_start + cd_size cd_rng = Range(cd_start, cd_end) if self.client_is_async: self.add_async(cd_rng) else: self.add(cd_rng) cd_bytes = self.active_range_response.read() u = struct.unpack(self.data.CTRL_DIR_REC.struct, cd_bytes[:cd_size]) zf_info = ZippedFileInfo.from_central_directory_entry(u) target = self.data.CTRL_DIR_REC.start_sig sig = zf_info.signature if sig != target: # pragma: no cover raise ValueError(f"Bad Central Directory signature at {cd_start}") fn_len = zf_info.filename_length fn_rng = Range(cd_end, cd_end + fn_len) if self.client_is_async: self.add_async(fn_rng) else: self.add(fn_rng) filename = self.active_range_response.read() flags = zf_info.flags if flags & 0x800: # pragma: no cover # UTF-8 file names extension fn_str = filename.decode("utf-8") else: # Historical ZIP filename encoding fn_str = filename.decode("cp437") extra_len = zf_info.extra_field_length comment_len = zf_info.comment_length cd_read_offset += cd_size + fn_len + extra_len + comment_len zf_info = ZippedFileInfo.from_central_directory_entry(u, filename=fn_str) self.zipped_files.append(zf_info) return
[docs] def add_file_ranges(self): for zf_info in self.zipped_files: if self.client_is_async: self.add_async(zf_info.file_range, name=zf_info.filename) else: self.add(zf_info.file_range, name=zf_info.filename)
[docs] def get_central_dir_bytes(self, step=20): """ Using the stored start position of the End Of Central Directory Record (or calculating and storing it if it is not yet set on the object), identify the files in the central directory record by searching backwards from the start of the End of Central Directory Record signature until finding the start of the Central Directory Record. """ if self.data.E_O_CTRL_DIR_REC.start_pos is None: # pragma: no cover self.check_end_of_central_dir_rec() pre_eocd = self.data.E_O_CTRL_DIR_REC.start_pos cent_dir_rng = Range(pre_eocd - step, pre_eocd) if self.client_is_async: self.add_async(cent_dir_rng) else: self.add(cent_dir_rng) target = self.data.CTRL_DIR_REC.start_sig byte_cache = b"" cd_byte_store = b"" cache_miss_size = len(target) - 1 while cent_dir_rng.start > 0: if self.client_is_async: self.add_async(cent_dir_rng) else: self.add(cent_dir_rng) cd_bytes = self.active_range_response.read() cd_byte_store = cd_bytes + cd_byte_store byte_cache = cd_bytes + byte_cache[:cache_miss_size] if target in byte_cache: offset = byte_cache.find(target) self.data.CTRL_DIR_REC.start_pos = cent_dir_rng.start + offset break else: cent_dir_rng.start -= step cent_dir_rng.end -= step else: # pragma: no cover raise ValueError(f"No central directory start signature found") # cent_dir_rng.end = self.data.E_O_CTRL_DIR_REC.start_pos cd_byte_store = cd_byte_store[offset:] return cd_byte_store
@property def filename_list(self) -> list[str]: """ Return only the file name list from the stored list of 2-tuples of (filename, extra bytes). """ if not hasattr(self, "zipped_files"): # pragma: no cover self.check_central_dir_rec() return [f.filename for f in self.zipped_files]
[docs] def decompress_zipped_file( self, zf_info: ZippedFileInfo, method: str | None = None, ext: str | None = None, ): """ Given a :class:`~range_streams.codecs.zip.stream.ZippedFileInfo` object ``zf_info``, and (optionally) its compression method [or else detecting that], decompress its bytes from the stream. Args: zf_info : The compressed bytes method : Compression method (2-3 character abbreviated extension, lower case) ext : File extension to treat the bytes in the ``zf_info`` range as having (an option if ``zf_info`` is not being provided) """ zf_range = zf_info.file_range if method is None: if ext: try: # pragma: no cover method = next( (_ext, m) for _ext, m in COMPRESSIONS.items() if _ext == ext or ext.endswith(_ext) ) # type: ignore except StopIteration: raise ValueError(f"No compression method for extension {ext}") finally: fn_tar = zf_info.filename is not None and ".tar" in zf_info.filename is_tar = fn_tar or ext.startswith(".t") archive = "tar" if is_tar else None else: if zf_info.filename is None: # pragma: no cover raise NotImplementedError( "Cannot detect compression method from file extension" " (no file name provided)" ) try: ext, method = next( (ext, m) for ext, m in COMPRESSIONS.items() if zf_info.filename.endswith(ext) ) except StopIteration: # pragma: no cover raise ValueError(f"Could not detect '{zf_info}' compression method") finally: assert ext is not None # because mypy can't follow my logic is_tar = ext.startswith(".t") or ".tar" in zf_info.filename archive = "tar" if is_tar else None # pragma: no cover elif method not in COMPRESSIONS.values(): # pragma: no cover raise ValueError(f"{method} is not a valid option ({COMPRESSIONS=})") else: # pragma: no cover archive = None # Can't detect an archive without extension, ¯\_(ツ)_/¯ assert method is not None # because mypy can't follow my logic zf_rng = zf_info.file_range if zf_rng not in self.ranges: # pragma: no cover if self.client_is_async: self.add_async(zf_rng) else: self.add(zf_rng) else: self.set_active_range(zf_rng) zf_bytes = self.active_range_response.read() return decompress(zf_bytes, method=method, archive=archive)
def decompress(b: bytes, method: str, archive: str | None = None): """ Decompress the given bytes under the given method. Args: b : The compressed bytes method : The compression method (2-3 character abbreviated extension, lower case) archive : The archive method to extract (either 'zip', 'tar', or None). """ accepted_archive_types = [None, "zip", "tar"] accepted_compression_types = set(COMPRESSIONS.values()) if archive not in accepted_archive_types: # pragma: no cover raise TypeError(f"{archive=} is not one of {accepted_archive_types=}") if method == "gz": # pragma: no cover raise NotImplementedError("No gzip support yet...") elif method == "xz": # pragma: no cover raise NotImplementedError("No xz support yet...") elif method == "bz2": # pragma: no cover raise NotImplementedError("No bz2 support yet...") elif method == "zst": if archive == "tar": d = ZstdTarFile(io.BytesIO(b)) elif archive == "zip": # pragma: no cover raise NotImplementedError("No zip + zst support yet...") else: # pragma: no cover d = ZstdFile(io.BytesIO(b)) else: # pragma: no cover raise TypeError( f"Decompression not implemented for {method} (accepted_compression_types=)" ) return d class CentralDirectoryInfo: _CD_SIGNATURE = 0 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_LOCAL_HEADER_OFFSET = 18 class ZippedFileInfo(CentralDirectoryInfo): """ A class describing a zipped file according to the struct defining its metadata. Only a subset of all the fields are supported here (those useful for identifying and extracting the file contents from a stream). """ def __init__( self, signature: bytes | int, flags: bytes | int, compress_type: bytes | int, compressed_size: bytes | int, uncompressed_size: bytes | int, filename_length: bytes | int, extra_field_length: bytes | int, comment_length: bytes | int, local_header_offset: bytes | int, filename: str | None, ): self.signature = signature self.flags = flags self.compress_type = compress_type self.compressed_size = compressed_size self.uncompressed_size = uncompressed_size self.filename_length = filename_length self.extra_field_length = extra_field_length self.comment_length = comment_length self.local_header_offset = local_header_offset self.filename = filename def __repr__(self): return ( f"{self.__class__.__name__}" f" '{self.filename if self.filename is not None else ''}'" f" @ {self.local_header_offset!r}: {self.compressed_size!r}B" ) @classmethod def from_central_directory_entry( cls, cd_entry: tuple, filename: str | None = None, ): """ Instantiate directly from an unpacked central directory struct (describing the zipped file entry in a standardised entry order). """ signature = cd_entry[cls._CD_SIGNATURE] flags = cd_entry[cls._CD_FLAG_BITS] compress_type = cd_entry[cls._CD_COMPRESS_TYPE] compressed_size = cd_entry[cls._CD_COMPRESSED_SIZE] uncompressed_size = cd_entry[cls._CD_UNCOMPRESSED_SIZE] filename_length = cd_entry[cls._CD_FILENAME_LENGTH] extra_field_length = cd_entry[cls._CD_EXTRA_FIELD_LENGTH] comment_length = cd_entry[cls._CD_COMMENT_LENGTH] local_header_offset = cd_entry[cls._CD_LOCAL_HEADER_OFFSET] return cls( signature=signature, flags=flags, compress_type=compress_type, compressed_size=compressed_size, uncompressed_size=uncompressed_size, filename_length=filename_length, extra_field_length=extra_field_length, comment_length=comment_length, local_header_offset=local_header_offset, filename=filename, ) @property def file_range(self): sig_start = self.local_header_offset start = sig_start + ZipData().LOC_F_H.get_size() + self.filename_length end = start + self.compressed_size return Range(start, end)