Source code for range_streams.stream

r""":mod:`range_streams.stream` exposes a class
:class:`~range_streams.stream.RangeStream`, whose key property (once initialised) is
:attr:`~range_streams.stream.RangeStream.ranges`,
which provides a :class:`~ranges.RangeDict` comprising the ranges of
the file being streamed.

The method :meth:`~range_streams.stream.RangeStream.add` will request further ranges,
and (unlike the other methods in this module) will accept a tuple of two integers as its
argument (``byte_range``).
"""

from __future__ import annotations

from copy import deepcopy
from io import SEEK_SET
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Coroutine, Type
from urllib.parse import urlparse

MYPY = False  # when using mypy will be overrided as True
if MYPY or not TYPE_CHECKING:  # pragma: no cover
    import httpx  # avoid importing to Sphinx type checker

from ranges import Range, RangeDict

from .async_utils import AsyncFetcher
from .http_utils import detect_header_value, range_header
from .overlaps import get_range_containing, overlap_whence
from .range_utils import (
    ALWAYS_SET_TOLD,
    most_recent_range,
    range_max,
    range_span,
    range_termini,
    ranges_in_reg_order,
    validate_range,
)
from .request import RangeRequest
from .response import RangeResponse

__all__ = ["RangeStream"]

DEBUG_VERBOSE = False


[docs]class RangeStream: """ A class representing a file being streamed from a server which supports range requests, with the `ranges` property providing a list of those intervals requested so far (and not yet exhausted). When the class is initialised its length checked upon the first range request, and the client provided is not closed (you must handle this yourself). Further ranges may be requested on the :class:`~range_streams.stream.RangeStream` by calling :meth:`~range_streams.stream.RangeStream.add`. Both the :meth:`~range_streams.stream.RangeStream.__init__` and :meth:`~range_streams.stream.RangeStream.add` methods support the specification of a range interval as either a tuple of two integers or a :class:`~ranges.Range` from the :mod:`python-ranges` package (an external requirement installed alongside this package). Either way, the interval created is interpreted to be the standard Python convention of a half-open interval ``[start,stop)``. Don't forget to close the ``httpx.Response`` yourself! The :meth:`~range_streams.stream.RangeStream.close` method is available (or :meth:`~range_streams.response.RangeResponse.close`) to help you. """ _length_checked: bool = False _active_range: Range | None = None """ Set by :meth:`~range_streams.stream.RangeStream.set_active_range`, through which the :attr:`~range_streams.stream.RangeStream.active_range_response` property gives access to the currently 'active' range (usually the most recently created). """ _ranges: RangeDict """ `'Internal'` ranges attribute. Start position is not affected by reading in bytes from the :class:`RangeResponse` (unlike the 'external' :attr:`~range_streams.stream.RangeStream.ranges` property) """ _range_windows: RangeDict """ `'Internal'` ranges attribute for windowed ranges. Start position is not affected by reading in bytes from the :class:`RangeResponse` (unlike the 'external' :attr:`~range_streams.stream.RangeStream.ranges` property). Used in single request mode (when :attr:`~range_streams.stream.RangeStream.single_request` is set to ``True`` at initialisation). """
[docs] def __init__( self, url: str, client=None, # don't hint httpx.Client (Sphinx gives error) byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, single_request: bool = False, force_async: bool = False, chunk_size: int | None = None, raise_response: bool = True, ): """ Set up a stream for the file at ``url``, with either an initial range to be requested (HTTP partial content request), or if left as the empty range (default: ``Range(0,0)``) a HEAD request will be sent instead, so as to set the total size of the target file on the :attr:`~range_streams.stream.RangeStream.total_bytes` property. By default (if ``client`` is left as ``None``) a fresh :class:`httpx.Client` will be created for each stream. The ``byte_range`` can be specified as either a :class:`~ranges.Range` object, or 2-tuple of integers (``(start, end)``), interpreted either way as a half-closed interval ``[start, end)``, as given by Python's built-in :class:`range`. If ``byte_range`` is passed as the empty range ``Range(0,0)`` (its default), then a HEAD request is sent to ``url`` on initialisation, setting the :attr:`~range_streams.stream.RangeStream.total_bytes` value from the ``content-length`` header in the subsequent response. If ``single_request`` is ``True`` (default: ``False``), then the behaviour when an empty ``byte_range`` is passed instead becomes to send a standard streaming GET request (not a partial content request at all), and instead the class will then facilitate an interface that 'simulates' these calls, i.e. as if each time :meth:`~range_streams.stream.RangeStream.add` was used the range requests were being returned instantly (as everything needed was already obtained on the first request at initialisation). More performant when reading a stream linearly. Note: internally, this single request is known as 'the monostream', and is stored on the :attr:`~range_streams.stream.RangeStream.monostream` property. Note: a single request will not be as efficient if streaming the response non-linearly (since reading a byte in the stream requires loading all bytes up to it). This will mean it is only performant to use for certain file types or applications (e.g. a ZIP file is read "in a principled manner" from the end [the Central Directory] first, so gains greatly from using multiple partial content requests rather than a single stream, whereas a PNG file can only be read "in a principled manner" linearly, iterating through the chunks from the start). The ``pruning_level`` controls the policy for overlap handling (``0`` will resize overlapped ranges, ``1`` will delete overlapped ranges, and ``2`` will raise an error when a new range is added which overlaps a pre-existing range). The ``chunk_size`` controls the size of the chunks that are read in from the ``httpx.Response.iter_raw`` iterator on the streamed HTTP response. - See docs for the :meth:`~range_streams.stream.RangeStream.handle_overlap` method for further details. Args: url : (:class:`str`) The URL of the file to be streamed client : (:class:`httpx.Client` | ``None``) The HTTPX client to use for HTTP requests byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range of positions on the file to be requested pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'), or ``2`` ('strict') single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. force_async : (:class:`bool` | ``None``) Whether to require the client to be ``httpx.AsyncClient``, and if no client is given, to create one on initialisation. (Experimental/WIP) chunk_size : (:class:`int` | ``None``) The chunk size used for the ``httpx.Response.iter_raw`` response byte iterators raise_response : (:class:`bool`) Whether to raise HTTP status code exceptions """ self.url = url self.set_client(client=client, force_async=force_async) self.pruning_level = pruning_level self.single_request = single_request self.chunk_size = chunk_size self.raise_response = raise_response self._ranges = RangeDict() self._range_windows = RangeDict() if self.client_is_async: # await self.async_add(byte_range=byte_range) pass # Can't call async_add from a synchronous init method else: self.add(byte_range=byte_range)
def __repr__(self) -> str: return ( f"{self.__class__.__name__}{self.__ranges_repr__()} @@ " f"'{self.name}' from {self.domain}" ) @property def freely_requestable(self): """ Trivial opposite of the :attr:`~range_streams.stream.RangeStream.single_request` attribute, so that conditional blocks can treat this as the 'conventional' case and the single request case be the alternative (which looks better). """ return not self.single_request
[docs] def set_client(self, client, force_async: bool) -> None: """ Check client type explicitly to handle a/sync and optional HTTPX client. Args: client : (:class:`httpx.Client` | class:`httpx.AsyncClient` | ``None``) The client to be used for all HTTP requests made on the `range_streams.stream.RangeStream`. If ``None``, a fresh one will be created. force_async : (:class:`bool`) If the ``client`` is ``None``, this parameter determines whether :class:`httpx.Client` or class:`httpx.AsyncClient` is set as the client. If a synchronous client is given and ``force_async`` is ``True``, an error will be raised. """ if client is None: client = httpx.AsyncClient() if force_async else httpx.Client() elif isinstance(client, httpx.Client): if force_async: raise TypeError(f"{client=} is not async (`httpx.AsyncClient`)") elif not isinstance(client, httpx.AsyncClient): raise TypeError(f"{client=} is not a HTTPX client") self.client = client
@property def client_is_async(self): return isinstance(self.client, httpx.AsyncClient) @property def sync_client(self): # returns httpx.Client | httpx.AsyncClient | None """ Provide a synchronous client: either the stream's client, or a fresh one if the stream's client is asynchronous. Used for HEAD requests on an async RangeStream. Presumes a client has been set correctly. """ return httpx.Client() if self.client_is_async else self.client
[docs] def __ranges_repr__(self) -> str: return ", ".join(map(str, self.list_ranges()))
[docs] def check_is_subrange(self, rng: Range): if rng not in self.total_range: raise ValueError(f"{rng} is not a sub-range of {self.total_range}")
[docs] def check_range_integrity(self, use_windows=False) -> None: """ Every :class:`~ranges.RangeSet` in the :attr:`~range_streams.stream.RangeStream._ranges` :class:`~ranges.RangeDict` keys must contain 1 :class:`~ranges.Range` each """ rng_dict = self._range_windows if use_windows else self._ranges if sum(len(rs._ranges) - 1 for rs in rng_dict.ranges()) != 0: bad_rs = [rs for rs in rng_dict.ranges() if len(rs._ranges) - 1 != 0] for rset in bad_rs: for rng in rset: rng_resp = rng_dict[rng.start] rng_max = range_max(rng) if rng_resp.tell() > rng_max: rset.discard(rng) # discard subrange if len(rset.ranges()) < 2: bad_rs.remove(rset) if bad_rs: raise ValueError(f"Each RangeSet must contain 1 Range: found {bad_rs=}")
[docs] def compute_external_ranges(self, use_windows: bool = False) -> RangeDict: """ If ``use_windows`` is ``True``, the ``internal_range_dict`` is :attr:`~range_streams.stream.RangeStream._range_windows` rather than :attr:`~range_streams.stream.RangeStream._ranges` when ``use_windows`` is ``False`` (default: ``False``). Modifying the ``internal_range_dict`` attribute to account for the bytes consumed (from the head) and tail mark offset of where a range was already trimmed to avoid an overlap (from the tail). While the :class:`~ranges.RangeSet` keys are a deep copy of the ``internal_range_dict`` :class:`~ranges.RangeDict` keys (and therefore will not propagate if modified), the RangeResponse values are references, therefore will propagate to the ``internal_range_dict`` :class:`~ranges.RangeDict` if modified (primarily when ``read``). When ``use_windows`` is ``True``, these RangeResponse values are 'simulations' (a.k.a. mock/dummy objects) of the range response that would be received from a partial content request (they in fact merely came from a streamed GET request). """ prepared_rangedict = RangeDict() internal_rangedict = self._range_windows if use_windows else self._ranges for rng_set, rng_response in internal_rangedict.items(): requested_range = rng_response.request.range rng = deepcopy(requested_range) # if (rng_response.start, rng_response.end) < 0: # # negative range # ... if rng_response.is_consumed(): continue told_is_set = rng_response.is_windowed or ALWAYS_SET_TOLD if rng_response_tell := ( rng_response.told if told_is_set else rng_response.tell() ): # Access single range (assured by unique RangeResponse values of # RangeDict) of singleton rangeset (assured by check_range_integrity) rng.start += rng_response_tell if rng_response.tail_mark: rng.end -= rng_response.tail_mark if rng.start > rng.end: raise ValueError(f"{rng} has been malformed (rng_response=)") prepared_rangedict.update({rng: rng_response}) return prepared_rangedict
@property def ranges(self): """ Read-only view on the :class:`~ranges.RangeDict` stored in the :attr:`~range_streams.stream.RangeStream._ranges` attribute, modifying it to account for the bytes consumed (from the head) and tail mark offset of where a range was already trimmed to avoid an overlap (from the tail). Each :attr:`~range_streams.stream.RangeStream.ranges` :class:`~ranges.RangeDict` key is a :class:`~ranges.RangeSet` containing 1 :class:`~ranges.Range`. Check this assumption (singleton :class:`~ranges.RangeSet` "integrity") holds and retrieve this list of :class:`~ranges.RangeSet` keys in ascending order, as a list of :class:`~ranges.Range`. Requests are restricted to not re-request already-requested file ranges, so give windows onto the underlying range that can be consumed (but the underlying :class:~range_streams.response.RangeResponse` will persist and cannot be consumed by reading). """ self.check_range_integrity() # Unclear if this is necessary but seems consistent to do here: if self.single_request: # Also check integrity of the range windows self.check_range_integrity(use_windows=True) if self.freely_requestable: # Not limited to a single request ranges = self.compute_external_ranges() else: # Single request limit: can only add a window onto already requested range ranges = self.compute_external_ranges(use_windows=True) return ranges
[docs] def overlap_whence( self, rng: Range, internal: bool = False, use_windows: bool = False ) -> int | None: if DEBUG_VERBOSE: print(f"IN {rng=} {internal=} {use_windows=}") for k, v in self._range_windows.items(): print(f"{k}: {v} ({v.told})") internal_rng_dict = self._range_windows if use_windows else self._ranges rng_dict = internal_rng_dict if internal else self.ranges if DEBUG_VERBOSE: print(f"OUT {rng}") return overlap_whence(rng_dict=rng_dict, rng=rng)
[docs] def register_range( self, rng: Range, value: RangeResponse, activate: bool = True, use_windows: bool = False, ): if self._length_checked: self.check_is_subrange(rng) else: raise ValueError("Stream length must be set before registering a range") if DEBUG_VERBOSE: print(f"Hit {rng=} {value=} {activate=} {use_windows=}") if ( self.overlap_whence(rng, internal=False, use_windows=use_windows) is not None ): self.handle_overlap(rng, internal=False, use_windows=use_windows) # print(f"Pre: {self._ranges=}") # print(f"Adding: {rng=}") ranges = self._range_windows if use_windows else self._ranges # This is where a previous tail mark is erased (if replacing an overlap) ranges.add(rng=rng, value=value) if activate: self.set_active_range(rng)
# print(f"Post: {self._ranges=}")
[docs] def set_active_range(self, rng: Range): """ Setter for the active range (through which :attr:`~range_streams.stream.RangeStream.active_range_response` is also set). """ if self._active_range != rng: self._active_range = rng
@property def active_range_response(self) -> RangeResponse: """ Look up the :class:`~range_streams.response.RangeResponse` object associated with the currently active range by using :attr:`~range_streams.stream.RangeStream._active_range` as the :class:`~ranges.Range` key for the internal :attr:`~range_streams.stream.RangeStream._ranges` :class:`RangeDict`. Look it up in the :attr:`~range_streams.stream.RangeStream._ranges` :class:`RangeDict` instead if in single request mode. """ internal_rng_dict = self._range_windows if self.single_request else self._ranges try: return internal_rng_dict[self._active_range] except KeyError: e_pre = "Cannot get active range response " if self._active_range is None: raise ValueError(f"{e_pre}(no active range)") raise ValueError(f"{e_pre}({self._active_range=}")
[docs] def ext2int(self, ext_rng: Range) -> RangeResponse: """ Given the external range `ext_rng` and the :class:`RangeStream` on which it is 'stored' (or rather, computed, in the :attr:`~range_streams.stream.RangeStream.ranges` property), return the internal :class:`~ranges.Range` stored on the :attr:`_ranges` attribute of the :attr:`~range_streams.stream.RangeStream`, by looking up the shared :class:`~range_streams.response.RangeResponse` value. Args: ext_rng : A :class:`~ranges.Range` from the 'external' :attr:`~range_streams.stream.RangeStream.ranges` with which to cross-reference in :attr:`~range_streams.stream.RangeStream._ranges` to identify the corresponding 'internal' range. """ rng_response = self.ranges[ext_rng] for k, v in self._ranges.items(): if v == rng_response: return k[0].ranges()[0] raise ValueError("Looked up a non-existent key in the internal RangeDict")
[docs] def burn_range(self, overlapped_ext_rng: Range): """Get the internal range (i.e. without offsets applied from the current read position on the range) from the external one (which may differ if the seek position has advanced from the start position, usually due to reading bytes from the range). Once this internal range has been identified, delete it, and set the :attr:`~range_streams.stream.RangeStream._active_range` to the most recent (or if the stream becomes empty, set it to ``None``). Args: overlapped_ext_rng : the overlapped external range """ internal_rng = self.ext2int(ext_rng=overlapped_ext_rng) self._ranges.remove(internal_rng) # set `_active_range` to most recently registered internal range or None if empty self.set_active_range(most_recent_range(self, internal=True))
[docs] def handle_overlap( self, rng: Range, internal: bool = False, use_windows: bool = False, ) -> None: """ Handle overlaps with a given pruning level: 0. "replant" ranges overlapped at the head with fresh, disjoint ranges 'downstream' or mark their tails to effectively truncate them if overlapped at the tail 1. "burn" existing ranges overlapped anywhere by the new range 2. "strict" will throw a :class:`ValueError` """ internal_rng_dict = self._range_windows if use_windows else self._ranges ranges = internal_rng_dict if internal else self.ranges if self.pruning_level not in range(3): raise ValueError("Pruning level must be 0, 1, or 2") # print(f"Handling {rng=} with {self.pruning_level=}") if rng.isempty(): raise ValueError("Range overlap not detected as the range is empty") if self.pruning_level == 2: # 2: strict raise ValueError( "Range overlap not registered due to strict pruning policy" ) rng_min, rng_max = range_termini(rng) if rng not in ranges: # May be partially overlapping has_min, has_max = (pos in ranges for pos in [rng_min, rng_max]) if has_min: # if has_min and has_max: # print("Partially contained on multiple ranges") # T: Overlap at tail of pre-existing RangeResponse truncates that tail # M: Overlap at midbody of pre-existing RangeResponse truncates that tail overlapped_rng = get_range_containing(rng_dict=ranges, position=rng_min) # print(f"T/M {overlapped_rng=}") if self.pruning_level == 1: # 1: burn self.burn_range(overlapped_ext_rng=overlapped_rng) else: # 0: replant o_rng_min, o_rng_max = range_termini(overlapped_rng) intersect_len = o_rng_max - rng_min + 1 ranges[rng_min].tail_mark += intersect_len elif has_max: # H: Overlap at head of pre-existing RangeResponse is replanted or burnt overlapped_rng = get_range_containing(rng_dict=ranges, position=rng_max) # print(f"H {overlapped_rng=}") if self.pruning_level == 1: # 1: burn self.burn_range(overlapped_ext_rng=overlapped_rng) else: # 0: replant o_rng_min, o_rng_max = range_termini(overlapped_rng) intersect_len = rng_max - o_rng_min + 1 # For now, simply throw away: read `size=intersect_len` bytes of response, # consequently `tell` will trim the head computed in `ranges` property # _ = ranges[rng_max].read(intersect_len) self.burn_range(overlapped_ext_rng=overlapped_rng) if (new_o_rng_min := o_rng_min + intersect_len) > rng_max: new_o_rng_max = ( o_rng_max # (I can't think of exceptions to this?) ) new_o_rng = Range(new_o_rng_min, new_o_rng_max + 1) self.add( new_o_rng ) # head-overlapped range has been 'replanted' else: info = f"{rng=} and {ranges=}" raise ValueError(f"Range overlap not detected at termini {info}") else: # HTT: Full overlap with an existing range ("Head To Tail") overlapped_rng = get_range_containing(rng_dict=ranges, position=rng_max) # Fully overlapped ranges would be exhausted if read, so delete regardless of # whether pruning policy is "replant"/"burn" (i.e. can't replant empty range) # print(f"HTT {overlapped_rng=}") self.burn_range(overlapped_ext_rng=overlapped_rng)
@property def total_bytes(self) -> int | None: """ The total number of bytes (i.e. the length) of the file being streamed. """ return self._length if self._length_checked else None
[docs] def isempty(self) -> bool: """ Whether the internal :attr:`~range_streams.stream.RangeStream._ranges` :class:`~ranges.RangeDict` is empty (contains no range-RangeResponse key-value pairs). """ return self.ranges.isempty()
@property def spanning_range(self) -> Range: return Range(0, 0) if self.isempty() else range_span(self.list_ranges()) @property def total_range(self) -> Range: try: return Range(0, self._length) except Exception: # messy exception avalanche raise AttributeError("Cannot use total_range before setting _length") @property def name(self) -> str: return Path(self.url).name @property def domain(self) -> str: return urlparse(self.url).netloc
[docs] def tell(self) -> int: return self.active_range_response.tell()
[docs] def read(self, size=None) -> bytes: return self.active_range_response.read(size=size)
[docs] def seek(self, position, whence=SEEK_SET) -> None: self.active_range_response.seek(position=position, whence=whence)
[docs] def send_request(self, byte_range: Range) -> RangeRequest: if self.client_is_async: raise NotImplementedError("Async client support WIP") return RangeRequest( byte_range=byte_range, url=self.url, client=self.client, chunk_size=self.chunk_size, )
[docs] def simulate_request( self, byte_range: Range, parent_range_request: RangeRequest | None = None ) -> RangeRequest: """ Simulate the :class:`~range_streams.request.RangeRequest` obtained from a partial content request for ``byte_range`` on the stream's URL through a "window" on ``range_request`` (expected to be a streamed GET request for the full file range). If no ``parent_range_request`` is provided, it is assumed to be the one on the :class:`~range_streams.response.RangeResponse` in the internal :attr:`~range_streams.stream.RangeStream._ranges` :class:`~ranges.RangeDict` Args: byte_range : The :class:`~ranges.Range` to simulate a partial content request for. parent_range_request : The :class:`~range_streams.request.RangeRequest` over which to use a "window" to simulate the range request. """ if parent_range_request is None: access_pos = byte_range.start if access_pos not in self._ranges: msg = f"{access_pos} is not in internal RangeDict\n{self._ranges=}" raise ValueError(msg) parent_range_response = self._ranges[access_pos] parent_range_request = parent_range_response.request return RangeRequest.windowed_request( byte_range=byte_range, range_request=parent_range_request, tail_mark=parent_range_response.tail_mark, chunk_size=self.chunk_size, )
[docs] async def get_async_monostream(self) -> None: """ Send a streaming GET request with an open-ended ``content-range`` header, to obtain the total range. Suitable for higher performance (to avoid repeated requests on the :class:`~range_streams.stream.RangeStream` which accrue a time cost). Should be called after the :class:`~range_streams.stream.RangeStream` is initialised (with both ``single_request`` and ``force_async`` as True), and [unlike the initialisation method] of course this method must be awaited. """ rng_h = range_header(rng=Range(0, 0)) # Empty range -> open-ended range header req = self.client.build_request(method="GET", url=self.url, headers=rng_h) resp = await self.client.send(request=req, stream=True) if self.raise_response: resp.raise_for_status() total_length = self.check_response_length(headers=resp.headers, req=req.method) self.set_length(length=total_length) # This is where self.send_request would give a RangeRequest... range_req = RangeRequest.from_get_stream( byte_range=self.total_range, client=self.client, req=req, resp=resp, chunk_size=self.chunk_size, ) await range_req.await_aiterator() # Initialise its async stream iterator # then just use the req to create a RangeResponse and register as usual resp = RangeResponse(stream=self, range_request=range_req, range_name="") self.register_range( rng=self.total_range, value=resp, activate=False, # Don't set the active range as this is 'internal' use_windows=False, )
[docs] def get_monostream(self) -> None: """ Send a streaming GET request with an open-ended ``content-range`` header, to obtain the total range. Suitable for higher performance (to avoid repeated requests on the :class:`~range_streams.stream.RangeStream` which accrue a time cost). Called at initialisation (within the first) when ``single_request`` is passed to :class:`~range_streams.stream.RangeStream` as ``True``. """ rng_h = range_header(rng=Range(0, 0)) # Empty range -> open-ended range header req = self.client.build_request(method="GET", url=self.url, headers=rng_h) resp = self.client.send(request=req, stream=True) if self.raise_response: resp.raise_for_status() total_length = self.check_response_length(headers=resp.headers, req=req.method) self.set_length(length=total_length) # This is where self.send_request would give a RangeRequest... range_req = RangeRequest.from_get_stream( byte_range=self.total_range, client=self.client, req=req, resp=resp, chunk_size=self.chunk_size, ) # then just use the req to create a RangeResponse and register as usual resp = RangeResponse(stream=self, range_request=range_req, range_name="") self.register_range( rng=self.total_range, value=resp, activate=False, # Don't set the active range as this is 'internal' use_windows=False, )
[docs] def send_head_request(self) -> None: """ Send a 'plain' HEAD request without range headers, to check the total content length without creating a RangeRequest (simply discard the response as it can only be associated with the empty range, which cannot be stored in a :class:`~ranges.RangeDict`), raising for status ASAP. To be used when initialised with an empty byte range. If the :attr:`range_streams.stream.RangeStream.client` is asynchronous, use a synchronous client (created for this single request). """ req = self.sync_client.build_request(method="HEAD", url=self.url) resp = self.sync_client.send(request=req) # Not advisable to allow this to be skipped as the rest of function would error # (could refactor rest of method into its own method, to skip now/call later?) # Primarily `raise_response` is for async (since try/catch won't wrap a single # request), so only implementing it for the monostream methods accordingly resp.raise_for_status() total_length = self.check_response_length(headers=resp.headers, req=req.method) self.set_length(length=total_length)
[docs] def check_response_length(self, headers: dict[str, str], req: str) -> int: """ Return the length of the response from its ``content-length`` header (after checking it contains this header, else raising :class:`KeyError`), as an integer. Args: headers : The response headers req : The request method (to be reported in any :class:`KeyError` raised) """ total_length = detect_header_value( headers=headers, key="content-length", source=f"{req} request response" ) return int(total_length)
[docs] def set_length(self, length: int) -> None: self._length = length self._length_checked = True
[docs] def list_ranges(self) -> list[Range]: """ Retrieve ascending order list of RangeSet keys, as a :class:`list` of :class:`~ranges.Range`. The :class:`~ranges.RangeSet` to :class:`~ranges.Range` transformation is permitted because the :attr:`~range_streams.stream.RangeStream.ranges` property method begins by checking range integrity, which requires each :class:`~ranges.RangeSet` to be a singleton set (of a single :class:`~ranges.Range`). If ``activate`` is ``True`` (the default), the range will be made the active range of the :class:`~range_streams.stream.RangeStream` upon being registered (if it meets the criteria for registration). If ``pruning_level`` is ``0`` then overlaps are handled using a "replant" policy (redefine and overwrite the existing range to be disjoint when the new range would overlap it), if it's ``1`` they are handled with a "burn" policy (simply dispose of the existing range to eliminate any potential overlap), and if it's ``2`` using a "strict" policy (raising errors upon detecting overlap). """ return [rngset.ranges()[0] for rngset in self.ranges.ranges()]
[docs] async def add_async( self, byte_range: Range | tuple[int, int] = Range("[0, 0)"), activate: bool = True, name: str = "", ) -> None: byte_range = validate_range(byte_range=byte_range, allow_empty=True) if not self.single_request: raise NotImplementedError( "Async RangeStreams are only available in single request mode (for now)" ) # Do not request an empty range if total length already checked (at init) if not self._length_checked and byte_range.isempty(): await self.get_async_monostream() elif not byte_range.isempty(): self.add_window(byte_range=byte_range, activate=activate, name=name)
[docs] def add_window( self, byte_range: Range | tuple[int, int] = Range("[0, 0)"), activate: bool = True, name: str = "", ) -> None: """ Register a window onto the original range in the ``_ranges`` RangeDict rather than add a new range entry to the dict (which would A) clash with the single entire range B) require another request Args: byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range of positions on the file to be read from the request on the stream. activate : (:class:`bool`) Whether to make this newly added :class:`~ranges.Range` the active range on the stream upon creating it. name : (:class:`str`) A name (default: ``''``) to give to the range. """ req = self.simulate_request(byte_range=byte_range) resp = RangeResponse(stream=self, range_request=req, range_name=name) self.register_range( rng=byte_range, value=resp, activate=activate, use_windows=True, )
[docs] def add( self, byte_range: Range | tuple[int, int] = Range("[0, 0)"), activate: bool = True, name: str = "", ) -> None: """ Add a range to the stream. If it is empty and the length of the stream has not already been determined, this will initiate a HEAD request to check the file's total size. In all other cases, only add the :class:`~ranges.Range` to the :class:`~ranges.RangeDict` of :attr:`~range_streams.stream.RangeStream.ranges`, set up a streaming partial content GET request, but do not try to read any bytes from it (so response data will be downloaded upon creation). The ``byte_range`` can be specified as either a :class:`~ranges.Range` object, or 2-tuple of integers (``(start, end)``), interpreted either way as a half-closed interval ``[start, end)``, as given by Python's built-in :class:`range`. If ``activate`` is ``True``, make this range the active range upon adding it to the stream (allowing access to the associated response through the :attr:`~range_streams.stream.RangeStream.active_range_response` property). If a ``name`` is provided (used in subclasses where the stream is an archive with individually named files within it), assign this name to the :class:`~range_streams.response.RangeResponse` (as its ``range_name`` argument). Args: byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range of positions on the file to be requested and stored in the :class:`~ranges.RangeDict` on :attr:`~range_streams.stream.RangeStream.ranges` activate : (:class:`bool`) Whether to make this newly added :class:`~ranges.Range` the active range on the stream upon creating it. name : (:class:`str`) A name (default: ``''``) to give to the range. """ byte_range = validate_range(byte_range=byte_range, allow_empty=True) # Do not allow a non-single request async RangeStream to be created if self.client_is_async: raise ValueError("Add a range to an async RangeStream via add_async") # Do not request an empty range if total length already checked (at init) if not self._length_checked and byte_range.isempty(): if self.single_request: self.get_monostream() else: self.send_head_request() elif not byte_range.isempty(): if self.single_request: self.add_window(byte_range=byte_range, activate=activate, name=name) else: req = self.send_request(byte_range=byte_range) if not self._length_checked: self.set_length(length=req.total_content_length) if byte_range in ranges_in_reg_order(self.ranges): pass # trivial no-op when adding a range that already exists elif not byte_range.isempty(): # bytes are available in the RangeRequest.response stream resp = RangeResponse( stream=self, range_request=req, range_name=name ) self.register_range( rng=byte_range, value=resp, activate=activate, use_windows=False, )
@property def is_closed(self): """ True if the ``httpx.Response`` object(s) associated with the :class:`~range_streams.response.RangeResponse` values in the internal :attr:`~range_streams.stream.RangeStream._ranges` :class:`~ranges.RangeDict` is/are all closed. """ return all(range_response.is_closed for range_response in self._ranges.values())
[docs] def close(self) -> None: """ Close any ``httpx.Response`` on the stream. In single request mode, there is just the one (shared with all the 'windowed' responses). """ for range_response in self._ranges.values(): range_response.close()
[docs] async def aclose(self) -> None: """ Close any ``httpx.Response`` on the async stream. In single request mode, there is just the one (shared with all the 'windowed' responses). """ for range_response in self._ranges.values(): await range_response.aclose()
[docs] @classmethod def make_async_fetcher( cls, urls: list[str], callback: Callable | None = None, verbose: bool = False, show_progress_bar: bool = True, timeout_s: float = 5.0, client=None, close_client: bool = False, **kwargs, ): return AsyncFetcher( stream_cls=cls, urls=urls, callback=callback, verbose=verbose, show_progress_bar=show_progress_bar, timeout_s=timeout_s, client=client, close_client=close_client, **kwargs, # Any other kwargs can be passed through to RangeStream subclass )