Source code for range_streams.codecs.png.stream

from __future__ import annotations

import struct
import zlib

from ranges import Range

from ...stream import RangeStream
from .data import PngChunkInfo, PngData
from .reconstruct import reconstruct_idat

__all__ = ["PngStream"]


[docs]class PngStream(RangeStream): """ As for RangeStream, but if `scan_ihdr` is True, then immediately call :meth:`~range_streams.codecs.png.PngStream.scan_ihdr` on initialisation (which will perform the necessary range request to read PNG metadata from its IHDR chunk), setting various attributes on the :attr:`~range_streams.codecs.png.PngStream.data.IHDR` object. Populating these attributes can be postponed [until manually calling :meth:`~range_streams.codecs.png.PngStream.scan_ihdr` and :meth:`~range_streams.codecs.png.PngStream.enumerate_chunks`] to avoid sending any range requests at initialisation. """
[docs] def __init__( self, url: str, client=None, byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, single_request: bool = True, force_async: bool = False, chunk_size: int | None = None, raise_response: bool = True, scan_ihdr: bool = True, enumerate_chunks: bool = True, ): """ Set up a stream for the PNG file at ``url``, with either an initial range to be requested (HTTP partial content request), or if left as the empty range (default: ``Range(0,0)``) a HEAD request will be sent instead, so as to set the total size of the target file on the :attr:`~range_streams.stream.RangeStream.total_bytes` property. By default (if ``client`` is left as ``None``) a fresh :class:`httpx.Client` will be created for each stream. The ``byte_range`` can be specified as either a :class:`~ranges.Range` object, or 2-tuple of integers (``(start, end)``), interpreted either way as a half-closed interval ``[start, end)``, as given by Python's built-in :class:`range`. The ``pruning_level`` controls the policy for overlap handling (``0`` will resize overlapped ranges, ``1`` will delete overlapped ranges, and ``2`` will raise an error when a new range is added which overlaps a pre-existing range). If ``single_request`` is ``True`` (default: ``True``), then the behaviour when an empty ``byte_range`` is passed instead becomes to send a standard streaming GET request (not a partial content request at all), and instead the class will then facilitate an interface that 'simulates' these calls, i.e. as if each time :meth:`~range_streams.stream.RangeStream.add` was used the range requests were being returned instantly (as everything needed was already obtained on the first request at initialisation). More performant when reading a stream linearly, and defaults to ``True`` in the PNG codec as chunks are read linearly. - See docs for the :meth:`~range_streams.stream.RangeStream.handle_overlap` method for further details. Args: url : (:class:`str`) The URL of the file to be streamed client : (:class:`httpx.Client` | ``None``) The HTTPX client to use for HTTP requests byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range of positions on the file to be requested pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'), or ``2`` ('strict') single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. force_async : (:class:`bool` | ``None``) Whether to require the client to be ``httpx.AsyncClient``, and if no client is given, to create one on initialisation. (Experimental/WIP) scan_ihdr : (:class:`bool`) Whether to scan the IHDR chunk on initialisation enumerate_chunks : (:class:`bool`) Whether to step through each chunk (read its metadata, and proceed until all chunks have been identified) upon initialisation chunk_size : (:class:`int` | ``None``) The chunk size used for the ``httpx.Response.iter_raw`` response byte iterators raise_response : (:class:`bool`) Whether to raise HTTP status code exceptions """ if force_async: # Mutually exclusive scan_ihdr = enumerate_chunks = False super().__init__( url=url, client=client, byte_range=byte_range, pruning_level=pruning_level, single_request=single_request, force_async=force_async, chunk_size=chunk_size, raise_response=raise_response, ) if force_async: self.data = PngData() else: if enumerate_chunks: self.populate_chunks() self.data = PngData() if scan_ihdr: self.scan_ihdr()
[docs] def populate_chunks(self): """ Call :meth:`~range_streams.codecs.png.PngStream.enumerate_chunks` and store in the internal :attr:`~range_streams.codecs.png.PngStream._chunks` attribute, accessible through the :attr:`~range_streams.codecs.png.PngStream.chunks` property. If the :attr:`~range_streams.codecs.png.PngStream.chunks` property is called 'prematurely', to avoid an access error it will 'proactively' call this method before returning the gated internal attribute. """ self._chunks: dict[str, list[PngChunkInfo]] = self.enumerate_chunks()
@property def chunks(self): """ 'Gate' to the internal :attr:`~range_streams.codecs.png.PngStream._chunks` attribute. If this property is called before the internal attribute is set, ('prematurely'), to avoid an access error it will 'proactively' call :meth:`~range_streams.codecs.png.PngStream.populate_chunks` before returning the gated internal attribute. """ if not hasattr(self, "_chunks"): self.populate_chunks() return self._chunks
[docs] def scan_ihdr(self): """ Request a range on the stream corresponding to the IHDR chunk, and populate the :attr:`~range_streams.codecs.png.PngStream.data.IHDR` object (an instance of :class:`IHDRChunk` from the :mod:`range_streams.codecs.png.data` module) according to the spec. """ ihdr_rng = Range(self.data.IHDR.start_pos, self.data.IHDR.end_pos) if self.client_is_async: self.add_async(ihdr_rng) else: self.add(ihdr_rng) ihdr_bytes = self.active_range_response.read() ihdr_u = struct.unpack(self.data.IHDR.struct, ihdr_bytes) if None in ihdr_u: raise ValueError(f"Got a null from unpacking IHDR bytes {ihdr_u}") self.data.IHDR.width = ihdr_u[self.data.IHDR.parts._IHDR_WIDTH] self.data.IHDR.height = ihdr_u[self.data.IHDR.parts._IHDR_HEIGHT] self.data.IHDR.bit_depth = ihdr_u[self.data.IHDR.parts._IHDR_BIT_DEPTH] self.data.IHDR.colour_type = ihdr_u[self.data.IHDR.parts._IHDR_COLOUR_TYPE] self.data.IHDR.compression = ihdr_u[self.data.IHDR.parts._IHDR_COMPRESSION] self.data.IHDR.filter_method = ihdr_u[self.data.IHDR.parts._IHDR_FILTER_METHOD] self.data.IHDR.interlacing = ihdr_u[self.data.IHDR.parts._IHDR_INTERLACING]
[docs] def verify_sync(self, msg=""): if self.client_is_async: raise ValueError(f"Synchronous client check failed{msg}")
[docs] def verify_async(self, msg=""): if not self.client_is_async: raise ValueError(f"Asynchronous client check failed{msg}")
[docs] def enumerate_chunks(self) -> dict[str, list[PngChunkInfo]]: """ Parse the length and type chunks, then skip past the chunk data and CRC chunk, so as to enumerate all chunks in the PNG (but request and read as little as possible). Build a dictionary of all chunks with keys of the chunk type (four letter strings) and values of lists (since some chunks e.g. IDAT can appear multiple times in the PNG). See `the official specification <http://www.libpng.org/pub/png/spec/1.2/PNG-Chunks.html>`_ for full details (or `Wikipedia <https://en.wikipedia.org/wiki/ Portable_Network_Graphics#%22Chunks%22_within_the_file>`_, or `the W3C <https://www.w3.org/TR/PNG/#5Chunk-layout>`_). """ self.verify_sync(msg=": call `enumerate_chunks_async` on an async PngStream") png_signature = 8 # PNG files start with an 8-byte signature chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk chunks: dict[str, list[PngChunkInfo]] = {} chunk_start = png_signature # Skip PNG file signature to reach first chunk chunk_type: str | None = None # initialise for while loop condition while chunk_type != "IEND": if chunks: # Increment chunk_start from last iteration # (last chunk's end is this chunk's start) chunk_start = chunk_info.end # type: ignore chunk_length_rng = Range(chunk_start, chunk_start + chunk_preamble_size) self.add(chunk_length_rng) b = self.active_range_response.read() chunk_len = struct.unpack(">I", b[:4])[0] chunk_type = b[4:].decode("ascii") assert chunk_type is not None # appease mypy chunks.setdefault(chunk_type, []) chunk_info = PngChunkInfo( start=chunk_start, type=chunk_type, length=chunk_len ) chunks[chunk_type].append(chunk_info) return chunks
[docs] async def enumerate_chunks_async(self) -> dict[str, list[PngChunkInfo]]: """ Parse the length and type chunks, then skip past the chunk data and CRC chunk, so as to enumerate all chunks in the PNG (but request and read as little as possible). Build a dictionary of all chunks with keys of the chunk type (four letter strings) and values of lists (since some chunks e.g. IDAT can appear multiple times in the PNG). See `the official specification <http://www.libpng.org/pub/png/spec/1.2/PNG-Chunks.html>`_ for full details (or `Wikipedia <https://en.wikipedia.org/wiki/ Portable_Network_Graphics#%22Chunks%22_within_the_file>`_, or `the W3C <https://www.w3.org/TR/PNG/#5Chunk-layout>`_). """ self.verify_async(msg=": call `enumerate_chunks` on a synchronous PngStream") png_signature = 8 # PNG files start with an 8-byte signature chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk chunks: dict[str, list[PngChunkInfo]] = {} chunk_start = png_signature # Skip PNG file signature to reach first chunk chunk_type: str | None = None # initialise for while loop condition while chunk_type != "IEND": if chunks: # Increment chunk_start from last iteration # (last chunk's end is this chunk's start) chunk_start = chunk_info.end # type: ignore chunk_length_rng = Range(chunk_start, chunk_start + chunk_preamble_size) await self.add_async(chunk_length_rng) b = await self.active_range_response.aread() chunk_len = struct.unpack(">I", b[:4])[0] chunk_type = b[4:].decode("ascii") assert chunk_type is not None # appease mypy chunks.setdefault(chunk_type, []) chunk_info = PngChunkInfo( start=chunk_start, type=chunk_type, length=chunk_len ) chunks[chunk_type].append(chunk_info) return chunks
[docs] def get_chunk_data(self, chunk_info: PngChunkInfo) -> bytes: if self.client_is_async: self.add_async(chunk_info.data_range) else: self.add(chunk_info.data_range) b = self.active_range_response.read() return b
[docs] def get_idat_data(self) -> list[int]: """ Decompress the IDAT chunk(s) and concatenate, then confirm the length is exactly equal to ``height * (1 + width * bit_depth)``, and filter it (removing the filter byte at the start of each scanline) using :func:`reconstruct_idat`. """ if self.data.IHDR.colour_type is None: self.scan_ihdr() height = self.data.IHDR.height width = self.data.IHDR.width channels = self.data.IHDR.channel_count assert height is not None and width is not None and channels is not None expected_length = height * (1 + width * channels) b = zlib.decompress( b"".join( self.get_chunk_data(chunk_info) for chunk_info in self.chunks["IDAT"] ) ) if len(b) != expected_length: raise ValueError(f"Expected {expected_length} but got {len(b)}") return reconstruct_idat( idat_bytes=b, channels=channels, height=height, width=width )
[docs] def has_chunk(self, chunk_type: str) -> bool: """ Determine whether the given chunk type is one of the chunks defined in the PNG. If the chunks have not yet been parsed, they will first be enumerated. """ return chunk_type in self.chunks
@property def alpha_as_direct(self): """ To avoid distinguishing 'direct' image transparency (in IDAT) from 'indirect' (or computed, from tRNS) palette transparency, check for a colour map and then check for a tRNS chunk to determine overall whether this image has an alpha channel in whichever way. """ if not hasattr(self.data.IHDR, "_has_alpha_channel"): self.scan_ihdr() # parse the IHDR chunk if not already done _ = self.data.IHDR.channel_count # Ensure colour type is processed # To avoid handling palettes as done in PyPNG, give alpha "directly" # https://github.com/drj11/pypng/blob/main/code/png.py#L1948-L1953 has_alpha = self.data.IHDR._has_alpha_channel # based on colour type if not has_alpha and self.data.IHDR._has_colourmap: # Allow alpha to switch on if tRNS chunk present has_alpha |= self.has_chunk(chunk_type="tRNS") return has_alpha
[docs] def any_semitransparent_idat(self, nonzero: bool = True): """ Whether there are any non-255 values in the alpha channel of the PNG, determined from IDAT chunk alone. If not, the alpha channel serves no purpose in practice, and the image may be considered non-transparent. If ``nonzero`` is True (the default), check for semitransparent, rather than nontransparent values (i.e. ``0 < A < 255`` rather than ``0 <= A < 255``). Note: presumes :meth:`~range_streams.codecs.png.PngStream.alpha_as_direct` has already been called, so the image is known to have 4 channels. Args: nonzero : Whether to return ``True`` only if the image has 'intermediate' (between 0 and 255) values, otherwise whether they're below 255. """ A = self.get_idat_data()[3::4] # alpha channel values return any(0 < v < 255 for v in A) if nonzero else any(v < 255 for v in A)
@property def channel_count_as_direct(self): """ If the image is indexed on a palette, then the channel count in the IHDR will be 1 even though the underlying sample contains 3 channels (R,G,B). To avoid distinguishing 'direct' image channels (in IDAT) from 'indirect' (or computed, from tRNS) palette channels, check for a colour map and then check for a tRNS chunk to determine overall whether this image has an extra channel for transparency. """ if self.data.IHDR.channel_count is None: self.scan_ihdr() # parse the IHDR chunk if not already done # To avoid handling palettes as done in PyPNG, give channel count "directly" # https://github.com/drj11/pypng/blob/main/code/png.py#L1948-L1953 channel_count = self.data.IHDR.channel_count # based on colour type if self.data.IHDR._has_colourmap: # Allow alpha to switch on if tRNS chunk present channel_count = 3 + int(self.alpha_as_direct) return channel_count @property def bit_depth_as_direct(self): """ Indexed images may report an IHDR bit depth other than 8, however the PLTE uses 8 bits per sample regardless of image bit depth, so override it to avoid distinguishing 'direct' bit depth from 'indirect' palette bit depth. """ if self.data.IHDR.bit_depth is None: self.scan_ihdr() # parse the IHDR chunk if not already done # To avoid handling palettes as done in PyPNG, give bit depth "directly" # https://github.com/drj11/pypng/blob/main/code/png.py#L1948-L1953 return 8 if self.data.IHDR._has_colourmap else self.data.IHDR.bit_depth