Source code for core_https.requesters.aiohttp_throttle

# -*- coding: utf-8 -*-

"""Concurrency-throttled aiohttp requester using asyncio.Semaphore."""

import asyncio
from typing import Optional

from aiohttp import ClientResponse

from core_https.requesters.aiohttp_ import AioHttpRequester


[docs] class AioHttpThrottleRequester(AioHttpRequester): """ An `AioHttpRequester` implementation that limits the number of concurrent in-flight HTTP requests using an `asyncio.Semaphore`. This throttler enforces *max_concurrency* at the coroutine level within a single event loop. Each call to :meth:`request` must acquire a semaphore permit before dispatching the actual HTTP request. **Notes:** - The throttling mechanism limits only concurrent *coroutine execution*. It does not enforce rate limiting (requests per second). - If you override :meth:`request` in a subclass, be aware that `await super().request(...)` calls the parent implementation (so the parent semaphore *will* be used for the actual HTTP call). However, any code you run **before** or **after** that `super()` call executes outside the parent's semaphore (and therefore is not throttled). """
[docs] def __init__(self, max_concurrency: int, **kwargs) -> None: """ :param max_concurrency: Maximum number of concurrent requests allowed. :param kwargs: Passed through to :class:`AioHttpRequester`. """ super().__init__(**kwargs) self.max_concurrency = max_concurrency self._semaphore: Optional[asyncio.Semaphore] = None
[docs] @classmethod def engine(cls) -> str: return "aiohttp_throttle"
[docs] async def request(self, *args, **kwargs) -> ClientResponse: """ Execute an HTTP request with concurrency throttling. It acquires a semaphore permit before delegating the actual request to the underlying :class:`AioHttpRequester` implementation. :returns: The aiohttp response object. :raises: Any exception raised by the underlying session. """ if self._semaphore is None: self._semaphore = asyncio.Semaphore(self.max_concurrency) async with self._semaphore: return await super().request(*args, **kwargs)
[docs] async def close(self) -> None: await super().close() self._semaphore = None