from __future__ import annotations
from typing import TYPE_CHECKING, AsyncIterator, Iterator
MYPY = False # when using mypy will be overrided as True
if MYPY or not TYPE_CHECKING: # pragma: no cover
import httpx # avoid importing to Sphinx type checker
from ranges import Range
from .http_utils import PartialContentStatusError, detect_header_value, range_header
from .range_utils import range_len
__all__ = ["RangeRequest"]
[docs]class RangeRequest:
"""
Store a GET request and the response stream while keeping a reference to
the client that spawned it, providing an overridable
:attr:`~range_streams.response.RangeResponse._iterator` attribute
[by default giving access to
:meth:`~range_streams.response.RangeResponse.iter_raw`] on the
underlying ``httpx.Response``, suitable for
:class:`~range_streams.response.RangeResponse`
to wrap in a :class:`io.BytesIO` buffered stream. For async clients,
:attr:`~range_streams.response.RangeResponse._aiterator` is set instead
[giving access to
:meth:`~range_streams.response.RangeResponse.aiter_raw`] on the
"""
def __init__(
self,
byte_range: Range,
url: str,
client,
GET_got: tuple | None = None,
window_on_range: Range = Range(0, 0),
chunk_size: int | None = None,
):
"""
Make a new partial content request, or simulate one from a provided (completed)
streaming GET request.
The latter option should be used carefully to achieve improved performance from
this library (in particular where read operations on the stream are expected to
be linear, without large gaps between cursor positions which must be loaded
prior to subsequent read operations).
Args:
byte_range : The :class:`~ranges.Range` to request.
url : The URL to be requested.
client : The client to use for the request
GET_got : A 2-tuple of the already-executed ``httpx.Request``
and the received ``httpx.Response``, or ``None``
(the default). If provided, the ``byte_range`` is not
requested but instead is the range that was already
requested, and the ``url`` is the requested URL.
window_on_range : If a non-empty range is passed, this is taken as the stream
this request is a window onto (indicating this is a
simulated request). Any read operations will be
restricted to this range of positions (as the underlying
stream being 'windowed' is a larger one).
chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator (or
``httpx.Response.aiter_raw`` if using an async client)
"""
self.range = byte_range
self.url = url
self.client = client
self.check_client()
# Allow a RangeRequest to be made from a pre-existing streamed GET request
self.is_simulated = GET_got is not None
self.window_on_range = window_on_range
self.is_windowed = not window_on_range.isempty()
self.chunk_size = chunk_size
if self.is_simulated:
assert GET_got is not None # give mypy a clue
# "Simulating" a partial range request with pre-provided GET req. + response
self.request, self.response = GET_got
self._check_resp_req() # Sphinx typing workaround
# This shouldn't need to be accessed but set it to be thorough
self.content_range = f"{self.range_header}/{range_len(byte_range)}"
if isinstance(self.client, httpx.AsyncClient):
# Note: _aiter_raw is 'stored' uncalled as cannot await here (not async)
# The Callable becomes a Coroutine once `await_aiterator` called
self._aiterator_preinit = None if self.is_windowed else self.aiter_raw
else:
self._iterator = None if self.is_windowed else self.iter_raw()
else:
# Make and send a partial range request
self.setup_stream()
self.content_range = self.content_range_header()
if self.client_is_async:
# Note: _aiter_raw is 'stored' uncalled as cannot await here (not async)
self._aiterator_preinit = self.aiter_raw
else:
self._iterator = self.iter_raw()
[docs] async def await_aiterator(self) -> None:
"""
Initialise the async iterator on the
:attr:`~range_streams.response.RangeResponse._aiterator` attribute from the
stored function which when called returns the ``typing.AsyncIterator[bytes]``.
"""
assert self._aiterator_preinit is not None
self._aiterator = await self._aiterator_preinit()
@property
def client_is_async(self):
return isinstance(self.client, httpx.AsyncClient)
@property
def aiterator_initialised(self):
return self.client_is_async and hasattr(self, "_aiterator")
[docs] @classmethod
def windowed_request(
cls,
byte_range: Range,
range_request: RangeRequest,
tail_mark: int,
chunk_size: int | None,
) -> RangeRequest:
"""
Reuse the stream from an existing streaming request rather to create a new
'windowed' RangeRequest from an existing RangeRequest, but change the byte range
to be used on it. If the existing RangeRequest (``range_request``) is anything
other than a stream of the full file range, then relative ranges will need to be
calculated. This constructor was written on the assumption of a full file range.
Args:
byte_range : The :class:`~ranges.Range` provided by this request.
on_request : The sent ``httpx.Request``
tail_mark : The :attr:`~range_streams.response.RangeResponse.tail_mark`
to trim the ``byte_range`` (if any). Passed separately
chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator (or
``httpx.Response.aiter_raw`` if using an async client)
"""
window_range = Range(byte_range.start, byte_range.end - tail_mark)
# Build the request that this object pretends to have sent
request_headers = range_header(window_range)
unsent_request = range_request.client.build_request(
method="GET",
url=range_request.url,
headers=request_headers,
)
content_byte_range = request_headers["range"].replace("=", " ")
total_content_length = range_request.total_content_length
window_on_range = range_request.range
window_len = range_len(window_range)
windowed_response = range_request.response
windowed_range_request = cls(
byte_range=window_range,
url=range_request.url,
client=range_request.client,
GET_got=(unsent_request, windowed_response),
window_on_range=window_on_range, # Keep a reference to the underlying range
)
# Calling ``response.iter_raw()`` again raises ``httpx.StreamConsumed`` error
# so simply overwrite after initialisation with existing RangeRequest iterator
if range_request.client_is_async:
if not range_request.aiterator_initialised:
msg = "aiterator is not initialised"
msg += ": `await_aiterator` after instantiating an async RangeRequest"
raise ValueError(msg)
windowed_range_request._aiterator = range_request._aiterator
else:
windowed_range_request._iterator = range_request._iterator
return windowed_range_request
[docs] @classmethod
def from_get_stream(
cls, byte_range: Range, client, req, resp, chunk_size: int | None = None
) -> RangeRequest:
"""
Avoid making a new partial content request, instead interpret a streaming GET
request as one when provided along with a ``byte_range``.
Does not call
:meth:`~range_streams.request.RangeRequest.raise_for_non_partial_content`
as is done after setting the :attr:`~range_streams.request.RangeRequest.request`
and :attr:`~range_streams.request.RangeRequest.response` in
:meth:`~range_streams.request.RangeRequest.setup_stream`.
Note: ``req`` and ``resp`` are type checked 'manually' at init (not via type
hints) due to Sphinx type hints bug with the ``httpx`` library.
Args:
byte_range : The :class:`~ranges.Range` provided by this request.
req : The sent ``httpx.Request``
resp : The received ``httpx.Response``
chunk_size : The size of chunks to read the response into the buffer with
"""
range_request = cls(
byte_range=byte_range,
url=str(req.url),
client=client,
GET_got=(req, resp),
chunk_size=chunk_size,
)
return range_request
@property
def range_header(self):
return range_header(self.range)
[docs] def setup_stream(self) -> None:
"""
``client.stream("GET", url)`` but leave the stream to be manually closed
rather than using a context manager
"""
self.request = self.client.build_request(
method="GET", url=self.url, headers=self.range_header
)
self.response = self.client.send(request=self.request, stream=True)
self.raise_for_non_partial_content()
[docs] def raise_for_non_partial_content(self):
"""
Raise the :class:`~range_streams.http_utils.PartialContentStatusError` if the response status code is
anything other than 206 (Partial Content), as that is what was requested.
"""
if self.response.status_code != 206:
raise PartialContentStatusError(
request=self.request, response=self.response
)
@property
def total_content_length(self) -> int:
"""
Obtain the total content length from the ``content-range`` header of a
partial content HTTP GET request. This method is not used for the HTTP HEAD
request sent when a :class:`~range_streams.stream.RangeStream` is
initialised with an empty :class:`~ranges.Range` (since that is not a partial
content request it returns a ``content-length`` header which can be read
as an integer directly).
"""
return int(self.content_range.split("/")[-1])
[docs] def iter_raw(self) -> Iterator[bytes]:
"""
Wrap the :meth:`iter_raw` method of the underlying :class:`httpx.Response`
object within the :class:`~range_streams.response.RangeResponse` in
:attr:`~range_streams.request.RangeRequest.response`.
"""
return self.response.iter_raw(chunk_size=self.chunk_size)
[docs] async def aiter_raw(self) -> AsyncIterator[bytes]:
"""
Wrap the :meth:`iter_raw` method of the underlying :class:`httpx.Response`
object within the :class:`~range_streams.response.RangeResponse` in
:attr:`~range_streams.request.RangeRequest.response`.
"""
return self.response.aiter_raw(chunk_size=self.chunk_size)
[docs] def close(self) -> None:
"""
Close the :attr:`~range_streams.request.RangeRequest.response`
:class:`~range_streams.response.RangeResponse`.
"""
if not self.response.is_closed:
self.response.close()
def _check_resp_req(self):
"""
Type checking workaround (Sphinx type hint extension does not like httpx
so check the type manually with a method called at initialisation).
"""
if not isinstance(self.request, httpx.Request): # pragma: no cover
raise NotImplementedError("Only HTTPX responses currently supported")
if not isinstance(self.response, httpx.Response): # pragma: no cover
raise NotImplementedError("Only HTTPX responses currently supported")
[docs] def check_client(self):
"""
Type checking workaround (Sphinx type hint extension does not like httpx
so check the type manually with a method called at initialisation).
"""
if not any(
isinstance(self.client, c) for c in (httpx.Client, httpx.AsyncClient)
): # pragma: no cover
raise NotImplementedError("Only HTTPX clients currently supported")