Source code for range_streams.response

from __future__ import annotations

from io import SEEK_END, SEEK_SET, BytesIO
from typing import TYPE_CHECKING

if TYPE_CHECKING:  # pragma: no cover
    # absolute imports for Sphinx
    import range_streams  # for RangeStream, RangeRequest

from ranges import Range

from .range_utils import ALWAYS_SET_TOLD, range_len

__all__ = ["RangeResponse"]


DEBUG_VERBOSE = False


class BufferLedger(BytesIO):
    """
    A thin wrapper to :class:`io.BytesIO` with an attribute
    """

    def __init__(self, active_rng: Range = Range(0, 0)):
        super().__init__()
        self.active_buf_range: Range = active_rng


[docs]class RangeResponse: """ Adapted from `obskyr's ResponseStream demo code <https://gist.github.com/obskyr/b9d4b4223e7eaf4eedcd9defabb34f13>`_, this class handles the streamed partial request as a file-like object. Don't forget to close the ``httpx.Response`` yourself! The :meth:`~range_streams.response.RangeResponse.close` method is available (or :meth:`~range_streams.stream.RangeStream.close`) to help you. """ tail_mark: int = 0 """ The amount by which to shorten the 'tail' (i.e. the upper end) of the range when deciding if it is 'consumed'. Incremented within the :meth:`~range_streams.stream.RangeStream.handle_overlap` method when the ``pruning_level`` is set to ``1`` (indicating a "replant" policy). Under a 'replant' policy, when a new range is to be added and would overlap at the tail of an existing range, the pre-existing range should be effectively truncated by 'marking their tails' (where `an existing range` is assumed here to only be considered a range if it is not 'consumed' yet). """ _bytes: BufferLedger def __init__( self, stream: range_streams.RangeStream, range_request: range_streams.RangeRequest, range_name: str = "", ): self.parent_stream = stream self.request = range_request self.range_name = range_name self.is_windowed = self.check_is_windowed() self.read_ready = not self.is_windowed if self.is_windowed or ALWAYS_SET_TOLD: self.told = 0 if self.is_windowed: # Don't create a buffer, refer to the source range's RangeResponse buffer self._bytes = self.source_range_response._bytes else: self._bytes = BufferLedger() def __repr__(self): rng_name = self.range_name if self.range_name == "" else f' "{self.range_name}"' return ( f"{self.__class__.__name__}{rng_name} {self.request.range} @ " f"'{self.parent_stream.name}' from {self.parent_stream.domain}" ) @property def source_range_response(self) -> RangeResponse: """ The RangeResponse associated with the source range, for a windowed range. Only access this if windowed (if not a windowed range, this will give the RangeResponse associated with the range at position 0, as the default :attr:`~range_streams.request.RangeRequest.window_on_range` value for non-windowed ranges is the empty range ``[0,0)``, whose start will be used as the key for the :attr:`~range_streams.stream.RangeStream._ranges` RangeDict). """ if not self.is_windowed: raise ValueError("source_range_response accessed for non-windowed range") return self.parent_stream._ranges[self.source_range.start]
[docs] def set_active_buf_range(self, rng: Range) -> None: """ Update the :attr:`~range_streams.response.RangeResponse._bytes` buffer's :attr:`~range_streams.response.RangeResponse._bytes.active_buf_range` attribute with the given :~ranges.Range` (``rng``). """ self._bytes.active_buf_range = rng
@property def is_active_buf_range(self) -> bool: """ The active range is stored on the buffer the HTTP response stream writes to (in the :attr:`~range_streams.response.RangeResponse._bytes.active_buf_range` attribute) so that whenever the active range changes, it is detectable immediately (all interfaces to read/seek/load the buffer are 'guarded' by a call to :meth:`~range_streams.response.RangeResponse.buf_keep` to achieve this). When this change is detected, since the cursor may be in another range of the shared source buffer (where the previously active window was busy doing its thing), the cursor is first moved to the last stored :meth:`~range_streams.response.RangeResponse.tell` position, which is stored on each :class:`~range_streams.response.RangeResponse` in the :attr:`~range_streams.response.RangeResponse.told` attribute, and initialised as ``0`` so that on first use it simply refers to the start position of the window range. Note that the active range only changes for 'windowed' :class:`~range_streams.response.RangeResponse` objects sharing a 'source' buffer with a source :class:`~range_streams.response.RangeResponse in the :attr:`~range_streams.stream.RangeStream._ranges` :class:`~ranges.RangeDict`. To clarify: the active range changes on first use for non-windowed ranges, since the active range is initialised as the empty range (but after that it doesn't!) """ return self._bytes.active_buf_range == self.request.range
[docs] def verify_sync(self, msg=""): if self.parent_stream.client_is_async: raise ValueError(f"Synchronous client check failed{msg}")
[docs] def verify_async(self, msg=""): if not self.parent_stream.client_is_async: raise ValueError(f"Asynchronous client check failed{msg}")
@property def source_iterator(self): """ The iterator associated with the source range, for a windowed range. """ self.verify_sync(msg=" when accessing source_iterator property") return self.source_range_response.request._iterator @property def _iterator(self): self.verify_sync(msg=" when accessing iterator property") return self.source_iterator if self.is_windowed else self.request._iterator @property def source_aiterator(self): """ The async iterator associated with the source range, for a windowed range. """ self.verify_async(msg=" when accessing source_aiterator property") return self.source_range_response.request._aiterator @property def _aiterator(self): self.verify_async(msg=" when accessing aiterator property") return self.source_aiterator if self.is_windowed else self.request._aiterator
[docs] def check_is_windowed(self) -> bool: """ Whether the associated request is windowed. Used to set :attr:`~range_streams.response.RangeResponse.is_windowed` on init """ return not self.source_range.isempty()
@property def source_range(self) -> Range: """ Wrapper for :attr:`~range_streams.request.RangeRequest.window_on_range` with a less confusing name to access. Note that this will be the empty range if the request is not a windowed request. """ return self.request.window_on_range @property def is_in_window(self) -> bool: """ Whether file cursor is in the window. Trivially true for a non-windowed request, otherwise checks if the file cursor is currently within (or exactly at the end of) the window range. """ if self.is_windowed: return self._bytes.tell() in Range( self.request.range.start, self.request.range.end, include_start=True, include_end=True, ) else: return True
[docs] def tell_abs(self, live=True) -> int: """ Get the absolute file cursor position from either the active range response tell (if ``live`` is ``True``: default) or the position stored on the active range response (if ``live`` is ``False``). Both are given as ``absolute`` positions by adding the :attr:`~range_streams.response.RangeResponse.window_offset`, (which is 0 for non-windowed ranges). """ return (self.told if live else self.tell()) + self.window_offset
[docs] def buf_keep(self) -> None: """ If the currently set active buffer range on the :attr:`~range_streams.response.RangeResponse._bytes` buffer is not the range on this :class:`~range_streams.response.RangeResponse`, then set it to be. This is the mechanism by which windowed ranges are switched (the windows share the same 'source' buffer, and the value of the active buffer range stored on that buffer indicates the most recently active window). At initialisation, all :class:`~range_streams.response.RangeResponse` have their active buffer range set to the empty range, ``Range(0,0)``. """ if not self.is_active_buf_range: rng = self.request.range if DEBUG_VERBOSE: print(f"Buffer switch... {rng=}") if self.is_windowed: cursor_dest = rng.start + self.told if DEBUG_VERBOSE: print(f"... {self.told=}") self._bytes.seek(cursor_dest) # Do not set `told` as it was just used (i.e. redundant to do so) self.set_active_buf_range(rng=rng)
[docs] def store_tell(self) -> None: """ Store the [window-relative] tell value in :attr:`~range_streams.response.RangeResponse.told` in the event of any read, seek, or load on the stream, when accessed through the RangeResponse (do not access directly if you want to keep a reliable stored value for :attr:`~range_streams.response.RangeResponse.told`). """ if self.is_windowed or ALWAYS_SET_TOLD: self.told = self.tell()
@property def window_offset(self) -> int: has_offset = self.is_windowed and self.request.range > self.source_range return self.request.range.start - self.source_range.start if has_offset else 0
[docs] def prepare_reading_window(self) -> None: """ Prepare the stream cursor for reading (unclear if this should only be done on initialisation...) Should be done every time if the cursor is shared, but is it? """ # UPDATE: MAY BE REDUNDANT AFTER `buf_keep` IMPLEMENTED? TODO: TEST if not self.is_in_window: self.seek(position=0) # Window offset is added in seek function self.read_ready = True # Remove barrier flag attribute if DEBUG_VERBOSE: print("\n---READ_READY removed\n---")
@property def client(self): # Returns: httpx.Client | httpx.AsyncClient """ The request's client. """ return self.request.client @property def url(self) -> str: """ A wrapper to access the :attr:`~range_streams.stream.RangeStream.url` of the 'parent' :class:`~range_streams.stream.RangeStream`. """ return self.parent_stream.url @property def name(self) -> str: """ A wrapper to access the :attr:`~range_streams.stream.RangeStream.name` of the 'parent' :class:`~range_streams.stream.RangeStream`. """ return self.parent_stream.name def _load_all(self) -> None: """ If seeking on a windowed range, then 'loading all' will not really load to the end of the stream, just the end of the window onto it. """ self.verify_sync(msg=" when loading all") self.buf_keep() if self.is_windowed: # Would need to offset this if source range is non-total range # (also may need to take into account tail-mark for windows?) window_end = self.request.range.end self._load_until(window_end) else: self._bytes.seek(0, SEEK_END) for chunk in self._iterator: self._bytes.write(chunk) self.store_tell() async def _aload_all(self) -> None: """ If seeking on a windowed range, then 'loading all' will not really load to the end of the stream, just the end of the window onto it. """ self.verify_async(msg=" when loading all") self.buf_keep() if self.is_windowed: # Would need to offset this if source range is non-total range # (also may need to take into account tail-mark for windows?) window_end = self.request.range.end await self._aload_until(window_end) else: self._bytes.seek(0, SEEK_END) async for chunk in self._aiterator: self._bytes.write(chunk) self.store_tell() def _load_until(self, goal_position: int) -> None: self.verify_sync(msg=f" when loading until {goal_position}") self.buf_keep() current_position = self._bytes.seek(0, SEEK_END) while current_position < goal_position: try: current_position += self._bytes.write(next(self._iterator)) except StopIteration: break self.store_tell() async def _aload_until(self, goal_position: int) -> None: self.verify_async(msg=f" when loading until {goal_position}") self.buf_keep() current_position = self._bytes.seek(0, SEEK_END) while current_position < goal_position: try: awaited_bytes = await self._aiterator.__anext__() current_position += self._bytes.write(awaited_bytes) except StopAsyncIteration: break self.store_tell()
[docs] def tell(self) -> int: """ File-like tell (position indicator) within the range request stream. """ if not self.read_ready: # If it's not yet ready, lie about where the cursor is (give where it will be) t = 0 if DEBUG_VERBOSE: print("Blanked the tell: not ready") elif self.is_windowed: t = self._bytes.tell() - self.window_offset if DEBUG_VERBOSE: print(f"{t=} (windowed tell)") else: t = self._bytes.tell() if DEBUG_VERBOSE: print(f"{t=} (plain tell)") return t
def _prepare_to_read(self) -> int: """ Called at the start of :meth:`~range_streams.response.RangeResponse.read` to ensure the reading window is prepared (on the first read of a windowed range) and acquire the starting position. """ self.buf_keep() if DEBUG_VERBOSE: print(f"Reading {self.request.range}") if not self.read_ready: # Only run on the first use after init self.prepare_reading_window() return self._bytes.tell()
[docs] def read(self, size: int | None = None) -> bytes: """ File-like reading within the range request stream, with careful handling of windowed ranges and tail marks. """ self.verify_sync(msg=f" when reading {size} bytes") left_off_at = self._prepare_to_read() if size is None: self._load_all() else: goal_position = left_off_at + size if DEBUG_VERBOSE: print(f"{goal_position=} = {left_off_at=} + {size=}") # Probably overshoots the cursor (loads a chunk at a time) self._load_until(goal_position) read_bytes = self._get_read_bytes(size=size, left_off_at=left_off_at) return read_bytes
[docs] async def aread(self, size: int | None = None) -> bytes: """ File-like reading within the range request stream, with careful handling of windowed ranges and tail marks. """ self.verify_async(msg=f" when reading {size} bytes") left_off_at = self._prepare_to_read() if size is None: await self._aload_all() else: goal_position = left_off_at + size if DEBUG_VERBOSE: print(f"{goal_position=} = {left_off_at=} + {size=}") # Probably overshoots the cursor (loads a chunk at a time) await self._aload_until(goal_position) read_bytes = self._get_read_bytes(size=size, left_off_at=left_off_at) return read_bytes
def _get_read_bytes(self, size: int | None, left_off_at: int) -> bytes: """ Called at the end of :meth:`~range_streams.response.RangeResponse.read` and :meth:`~range_streams.response.RangeResponse.aread` to rewind the cursor to the starting position after the bytes to read are loaded [from the a/sync iterator], read said bytes and return them (ensuring to store the final cursor position). """ self._bytes.seek(left_off_at) if self.is_windowed: # Convert absolute window end to relative offset on source range # (should do this using window_offset to permit non-total ranges!) window_end = self.request.range.end - self.tail_mark remaining_bytes = window_end - left_off_at else: rng_len = self.total_len_to_read remaining_bytes = rng_len - left_off_at if size is None or size > remaining_bytes: size = remaining_bytes read_bytes = self._bytes.read(size) self.store_tell() return read_bytes
[docs] def seek(self, position: int, whence=SEEK_SET): """ File-like seeking within the range request stream. Synchronous only. """ msg = "No negative seek so `RangeResponse.seek` is synchronous (try `load_all`)" self.buf_keep() if whence == SEEK_END: if self.request.client_is_async: raise NotImplementedError(msg) else: self._load_all() if self.is_windowed: position = position + self.window_offset self._bytes.seek(position, whence) self.store_tell()
@property def total_len_to_read(self): return range_len(self.request.range) + 1 - self.tail_mark
[docs] def is_consumed(self) -> bool: """ Whether the :meth:`~range_streams.response.RangeResponse.tell` position (indicating 'consumed' or 'read so far') along with the :attr:`~range_streams.response.RangeResponse.tail_mark` indicates whether the stream should be considered consumed. The :attr:`~range_streams.response.RangeResponse.tail_mark` is part of a mechanism to 'shorten' ranges when an overlap is detected, to preserve the one-to-one integrity of the :class:`~ranges.RangeDict` (see notes on the "replant" policy of :meth:`~range_streams.stream.RangeStream.handle_overlap`, set by the ``pruning_level`` passed into :class:`~range_streams.stream.RangeStream` on initialisation). Note that there is (absolutely!) nothing stopping a stream from being re-consumed, but this library works on the assumption that all streams will be handled in an efficient manner (with any data read out from them either used once only or else will be reused from the first output rather than re-accessed directly from the stream itself). To this end, :class:`~range_streams.stream.RangeStream` has measures in place to "decommission" ranges once they are consumed (see in particular :meth:`~range_streams.stream.RangeStream.burn_range` and :meth:`~range_streams.stream.RangeStream.handle_overlap`). """ if not self.is_in_window: # File cursor position may not be set to the start of the window but when it # is read it will be placed at the start (don't do this here: checking if # consumed shouldn't move the cursor) The window start position will become # the ``tell()`` upon first ``read()`` read_so_far = 0 else: read_so_far = self.tell() len_to_read = self.total_len_to_read return (len_to_read - read_so_far) <= 0 # should not go below!
@property def is_closed(self): """ True if the associated ``httpx.Response`` object is closed. For a windowed response in single request mode, this will be shared with any/all other windowed responses on the stream. """ return self.request.response.is_closed
[docs] def close(self): """ Close the associated ``httpx.Response`` object. In single request mode, there is just the one (shared with all the 'windowed' responses). """ self.verify_sync(msg=f" when closing the request response on {self}") self.request.response.close()
[docs] async def aclose(self): """ Close the associated ``httpx.Response`` object. In single request mode, there is just the one (shared with all the 'windowed' responses). """ self.verify_async(msg=f" when closing the request response on {self}") await self.request.response.aclose()