Source code for range_streams.range_utils

from __future__ import annotations

__all__ = [
    "ranges_in_reg_order",
    "response_ranges_in_reg_order",
    "most_recent_range",
    "range_termini",
    "range_len",
    "range_min",
    "range_max",
    "validate_range",
    "range_span",
    "ALWAYS_SET_TOLD",
]

from typing import TYPE_CHECKING

if TYPE_CHECKING:  # pragma: no cover
    from ranges import RangeDict
    import range_streams  # for RangeStream, RangeResponse

from ranges import Range

ALWAYS_SET_TOLD = True  # if False, only windowed range responses set `.told`


[docs]def ranges_in_reg_order(ranges: RangeDict) -> list[Range]: """Given a :class:`~ranges.RangeDict`, list the ranges in order of registration. Presumes integrity is already checked. Args: ranges : Either the internal or external ranges of a :class:`~range_streams.stream.RangeStream`. """ return [k[0].ranges()[0] for k, v in ranges.items()]
[docs]def response_ranges_in_reg_order(ranges: RangeDict) -> list[Range]: """For all of the :class:~range_streams.response.RangeResponse` values in the :class:`~ranges.RangeDict`, list the ranges from their original :attribute:~range_streams.response.RangeResponse.request` in order of registration. Args: ranges : Either the internal or external ranges of a :class:`~range_streams.stream.RangeStream`. """ return [v.request.range for k, v in ranges.items()]
[docs]def most_recent_range( stream: range_streams.stream.RangeStream, internal: bool = True ) -> Range | None: """ For all of the :class:`~range_streams.response.RangeResponse` values in the :class:`~ranges.RangeDict`, list the ranges from their original :attr:`~range_streams.response.RangeResponse.request` in order of registration. If ``internal`` is ``True``, use :attr:`~range_streams.stream.RangeStream._ranges` as the :class:`~ranges.RangeDict`, else use the 'external' (computed) property :attr:`~range_streams.stream.RangeStream.ranges`. The external ones take into account the position the file has been read/seeked to. Args: stream : Either the internal or external ranges of a :class:`~range_streams.stream.RangeStream`. internal : Whether to use the internal or external ranges. """ if stream._ranges.isempty(): rng = None # type: Range | None else: ranges = stream._ranges if internal else stream.ranges rng = ranges_in_reg_order(ranges)[-1] return rng
[docs]def range_termini(rng: Range) -> tuple[int, int]: """Get the inclusive start and end positions ``[start,end]`` from a :class:`ranges.Range`. These are referred to as the 'termini'. Ranges are always ascending. Args: rng : A :class:`~ranges.Range` (which by default will be half-closed, i.e. not inclusive of the end position). """ if rng.isempty(): raise ValueError("Empty range has no termini") # If range is not empty then can compare regardless of if interval is closed/open start = rng.start if rng.include_start else rng.start + 1 end = rng.end if rng.include_end else rng.end - 1 return start, end
[docs]def range_len(rng: Range) -> int: """Get the length of a :class:`~ranges.Range`. Args: rng : A :class:`~ranges.Range` (which by default will be half-closed, i.e. not inclusive of the end position). """ rmin, rmax = range_termini(rng) return rmax - rmin
[docs]def range_min(rng: Range) -> int: """Get the minimum (or start terminus) of a :class:`~ranges.Range`. Args: rng : A :class:`~ranges.Range` (which by default will be half-closed, i.e. not inclusive of the end position). """ if rng.isempty(): raise ValueError("Empty range has no minimum") return range_termini(rng)[0]
[docs]def range_max(rng: Range) -> int: """Get the maximum (or end terminus) of a :class:`~ranges.Range`. Args: rng : A :class:`~ranges.Range` (which by default will be half-closed, i.e. not inclusive of the end position). """ if rng.isempty(): raise ValueError("Empty range has no maximum") return range_termini(rng)[1]
[docs]def validate_range( byte_range: Range | tuple[int, int], allow_empty: bool = True ) -> Range: """Validate ``byte_range`` and convert to a half-closed (i.e. not inclusive of the end position) ``[start,end)`` :class:`~ranges.Range` if given as integer tuple. Args: byte_range : Either a :class:`tuple` of two :class:`int` positions with which to create a :class:`~ranges.Range` (which by default will be half-closed, i.e. not inclusive of the end position); or simply a :class:`~ranges.Range`. """ complain_about_types = ( f"{byte_range=} must be a Range from the python-ranges" " package or an integer 2-tuple" ) if isinstance(byte_range, tuple): if not all(map(lambda x: isinstance(x, int), byte_range)): raise TypeError(complain_about_types) if len(byte_range) != 2: raise TypeError(complain_about_types) byte_range = Range(*byte_range) elif not isinstance(byte_range, Range): raise TypeError(complain_about_types) elif not all(map(lambda o: isinstance(o, int), [byte_range.start, byte_range.end])): raise TypeError("Ranges must be discrete: use integers for start and end") if not allow_empty and byte_range.isempty(): raise ValueError("Range is empty") return byte_range
[docs]def range_span(ranges: list[Range]) -> Range: """Given a list of :class:`~ranges.Range`, calculate their 'span' (i.e. the range spanned from their minimum to maximum). This span may of course not be completely 'covered' by the ranges in the list. Assumes input list of :class:`~ranges.RangeSet` are in ascending order, switches if not. Args: ranges : A list of ranges whose span is to be given. """ first_termini = range_termini(ranges[0]) last_termini = range_termini(ranges[-1]) if first_termini > last_termini: first_termini, last_termini = last_termini, first_termini min_start, _ = first_termini _, max_end = last_termini return Range(min_start, max_end + 1)