Source code for core_https.requesters.aiohttp_throttle
# -*- coding: utf-8 -*-
"""Concurrency-throttled aiohttp requester using asyncio.Semaphore."""
import asyncio
from typing import Optional
from aiohttp import ClientResponse
from core_https.requesters.aiohttp_ import AioHttpRequester
[docs]
class AioHttpThrottleRequester(AioHttpRequester):
"""
An `AioHttpRequester` implementation that limits the number of
concurrent in-flight HTTP requests using an `asyncio.Semaphore`. This
throttler enforces *max_concurrency* at the coroutine level within a
single event loop. Each call to :meth:`request` must acquire a semaphore
permit before dispatching the actual HTTP request.
**Notes:**
- The throttling mechanism limits only concurrent *coroutine execution*. It
does not enforce rate limiting (requests per second).
- If you override :meth:`request` in a subclass, be aware that
`await super().request(...)` calls the parent implementation (so the parent
semaphore *will* be used for the actual HTTP call). However, any code you
run **before** or **after** that `super()` call executes outside the parent's
semaphore (and therefore is not throttled).
"""
[docs]
def __init__(self, max_concurrency: int, **kwargs) -> None:
"""
:param max_concurrency: Maximum number of concurrent requests allowed.
:param kwargs: Passed through to :class:`AioHttpRequester`.
"""
super().__init__(**kwargs)
self.max_concurrency = max_concurrency
self._semaphore: Optional[asyncio.Semaphore] = None
[docs]
@classmethod
def engine(cls) -> str:
return "aiohttp_throttle"
[docs]
async def request(self, *args, **kwargs) -> ClientResponse:
"""
Execute an HTTP request with concurrency throttling. It acquires
a semaphore permit before delegating the actual request to the
underlying :class:`AioHttpRequester` implementation.
:returns: The aiohttp response object.
:raises: Any exception raised by the underlying session.
"""
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self.max_concurrency)
async with self._semaphore:
return await super().request(*args, **kwargs)
[docs]
async def close(self) -> None:
await super().close()
self._semaphore = None