# -*- coding: utf-8 -*-
"""Asynchronous HTTP requester implementation backed by aiohttp."""
from __future__ import annotations
import asyncio
from contextlib import suppress
from typing import Any
from typing import Dict
from typing import Optional
from aiohttp import (
ClientResponse,
ClientResponseError,
ClientSession,
ClientTimeout,
TCPConnector,
)
from core_mixins.compatibility import Self
from core_https.exceptions import RetryableException
from .base import HTTPMethod
from .base import IRequester
[docs]
class AioHttpRequester(IRequester):
"""
Asynchronous HTTP requester implementation using aiohttp.
This class provides an async HTTP client interface using the aiohttp library.
It supports automatic session management, configurable retry logic with exponential
backoff, connection pooling, and comprehensive error handling.
The requester can work with externally provided ClientSession objects or create
and manage its own session internally. It implements the async context manager
protocol for convenient resource cleanup.
Features:
- Automatic session creation and management
- Configurable retry logic with exponential backoff
- Connection pooling with configurable limits
- Comprehensive HTTP exception mapping
- Support for custom timeouts per request
- Context manager support for resource cleanup
.. code-block:: python
import aiohttp
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod
requester: AioHttpRequester = AioHttpRequester(raise_for_status=True)
async def get():
# This is optional as the client creates one session for you if not provided.
session = aiohttp.ClientSession()
try:
response = await requester.request(
method=HTTPMethod.GET,
session=session,
url=url,
params={
"x-api-key": "..."
})
return await response.text()
except Exception as error:
pass
finally:
await session.close()
res = asyncio.run(get())
print(res)
..
"""
[docs]
def __init__(
self,
session: Optional[ClientSession] = None,
retries: Optional[int] = 3,
**kwargs,
) -> None:
"""
Initialize the AioHttpRequester.
Args:
session: Optional pre-configured aiohttp ClientSession to use for requests.
If not provided, a new session will be created automatically with
the configured timeout and connection limits. When providing a custom
session, you are responsible for closing it.
retries: Number of retry attempts for failed requests. Defaults to 3.
Set to 0 to disable retries completely. Only applies to retryable
errors like network timeouts and server errors (5xx status codes).
**kwargs: Additional arguments passed to the base IRequester class,
including encoding, raise_for_status, backoff_factor, timeout,
connector_limit, and connector_limit_per_host.
Note:
The timeout specified in kwargs becomes the default session timeout.
Individual requests can override this using the timeout parameter
in the request() method.
"""
super().__init__(**kwargs)
self._session = session
self._session_lock: Optional[asyncio.Lock] = None
self._owns_session = session is None
self._timeout = ClientTimeout(total=self.timeout)
self.retries = retries
async def __aenter__(self) -> Self:
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
[docs]
@classmethod
def engine(cls) -> str:
return "aiohttp"
[docs]
async def _ensure_session(self) -> ClientSession:
"""
Ensure a ClientSession exists, creating one if necessary.
This method implements a thread-safe lazy initialization pattern using
double-checked locking to ensure only one session is created even when
called concurrently from multiple coroutines.
Returns:
ClientSession: The active aiohttp ClientSession instance.
Note:
If a session was provided in the constructor, it will be returned as-is.
If no session was provided, a new one will be created with the configured
timeout and connection limits.
Thread Safety:
This method is safe to call concurrently from multiple coroutines.
The double-check locking pattern ensures only one session is created.
"""
if self._session is None:
if self._session_lock is None:
self._session_lock = asyncio.Lock()
async with self._session_lock:
if self._session is None: # Double-check after acquiring lock...
self._session = ClientSession(
timeout=self._timeout,
connector=TCPConnector(
limit=self.connector_limit,
limit_per_host=self.connector_limit_per_host,
),
)
self._owns_session = True
return self._session
[docs]
async def request(
self,
url: str,
method: HTTPMethod = HTTPMethod.GET,
headers: Optional[Dict[str, Any]] = None,
retries: Optional[int] = None,
backoff_factor: Optional[float] = None,
session: Optional[ClientSession] = None,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
**kwargs,
) -> ClientResponse:
"""
Make an asynchronous HTTP request with retry logic.
This method performs HTTP requests with automatic retry functionality,
exponential backoff, and comprehensive error handling. Failed requests
are retried based on the configured retry policy.
Args:
url: The target URL for the HTTP request.
method: HTTP method to use. Defaults to GET. Supports all standard
HTTP methods (GET, POST, PUT, DELETE, etc.).
headers: Optional HTTP headers to include in the request. These will
be merged with any session-level headers.
retries: Number of retry attempts for this specific request.
If not provided, uses the instance-level retry setting.
Set to 0 to disable retries for this request.
backoff_factor: Multiplier for exponential backoff between retries.
The actual delay is calculated as: backoff_factor * attempt_number.
If not provided, uses the instance-level setting or defaults to 0.5.
session: Optional ClientSession to use for this request.
If not provided, uses the requester's session (creating one if necessary).
Useful for per-request session customization.
params: URL query parameters to include in the request.
Will be properly URL-encoded and appended to the URL.
timeout: Request timeout in seconds for this specific request.
Overrides the instance-level timeout setting.
Set to None to use the default timeout.
**kwargs: Additional parameters passed to aiohttp's request method.
Common options include: json, data, cookies, ssl, proxy, etc.
See aiohttp.ClientSession.request documentation for full list.
Returns:
ClientResponse: The aiohttp response object. Use methods like
.json(), .text(), .read() to extract the response content.
Raises:
AuthenticationException: For 401 Unauthorized responses.
AuthorizationException: For 403 Forbidden responses.
RateLimitException: For 429 Too Many Requests (when retries exhausted).
RetryableException: For 502/503/504 server errors (when retries exhausted).
ServiceException: For other 4xx client errors.
InternalServerError: For 5xx server errors (when retries exhausted).
Note:
The retry logic only applies to network errors and server errors (5xx).
Client errors (4xx) are not retried, except for 429 (rate limit)
which may be retried based on the configured policy.
"""
session_ = session or await self._ensure_session()
kwargs_ = kwargs.copy()
if timeout is not None:
kwargs_["timeout"] = ClientTimeout(total=timeout)
retries = retries if retries is not None else self.retries
if retries is None:
retries = 3
backoff_factor = (
backoff_factor
if backoff_factor is not None
else self.backoff_factor
if self.backoff_factor is not None
else 0.5
)
attempts = 0
while True:
attempts += 1
try:
response = await session_.request(
method=str(method),
url=url,
headers=headers,
params=params,
**kwargs_,
)
if self.raise_for_status:
response.raise_for_status()
return response
except ClientResponseError as error:
if error.status not in self.RETRYABLE_ERRORS or attempts > retries:
self.raise_custom_exception(error.status, error.message)
with suppress(RetryableException):
self.raise_custom_exception(
error.status,
error.message,
within_retry=True,
)
await asyncio.sleep(backoff_factor * (2 ** (attempts - 1)))
[docs]
async def close(self) -> None:
"""
Close the internal session if it was created by this requester.
This method performs cleanup of the internal ClientSession, but only
if the session was created internally. If a custom session was provided
in the constructor, it will not be closed as the caller is responsible
for managing its lifecycle.
The session reference is cleared only when the session was created
internally. External sessions retain their reference so the requester
remains usable after close() without silently creating a new session.
Note:
This method is automatically called when using the async context
manager protocol (__aexit__). Manual calling is only necessary
when not using the context manager pattern.
"""
if self._session and self._owns_session:
await self._session.close()
self._session = None
self._session_lock = None