Examples#

This page provides practical examples for using core-https in your projects.

Basic HTTP Requests#

Using requests library#

from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod

# Create a requester instance
requester = RequestsRequester(
    timeout=30,
    retries=3,
    raise_for_status=True
)

# Make a GET request
response = requester.request(
    method=HTTPMethod.GET,
    url="https://api.example.com/users"
)

# Access response data
data = response.json()
print(f"Status: {response.status_code}")
print(f"Data: {data}")

Using urllib3#

from core_https.requesters.urllib3_ import Urllib3Requester
from core_https.utils import HTTPMethod

requester = Urllib3Requester(
    timeout=30,
    connector_limit=10,  # Connection pool size
    retries=3
)

response = requester.request(
    method=HTTPMethod.GET,
    url="https://api.example.com/data",
    headers={"Authorization": "Bearer token"}
)

# urllib3 responses need decoding
data = response.data.decode('utf-8')
print(f"Response: {data}")

Async HTTP Requests#

Basic async request with aiohttp#

import asyncio
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod

async def fetch_data():
    async with AioHttpRequester(timeout=30, retries=3) as requester:
        response = await requester.request(
            method=HTTPMethod.GET,
            url="https://api.example.com/data"
        )

        data = await response.json()
        return data

# Run the async function
result = asyncio.run(fetch_data())
print(result)

Multiple concurrent requests#

import asyncio
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod

async def fetch_all(urls):
    async with AioHttpRequester(timeout=30) as requester:
        tasks = [
            requester.request(method=HTTPMethod.GET, url=url)
            for url in urls
        ]

        responses = await asyncio.gather(*tasks)

        # Process all responses
        results = []
        for response in responses:
            data = await response.json()
            results.append(data)

        return results

urls = [
    "https://api.example.com/users/1",
    "https://api.example.com/users/2",
    "https://api.example.com/users/3"
]

results = asyncio.run(fetch_all(urls))

Rate Limiting & Throttling#

Throttling concurrent requests#

Limit the number of simultaneous requests to prevent overwhelming the client or server:

import asyncio
from core_https.requesters.aiohttp_throttle import AioHttpThrottleRequester
from core_https.utils import HTTPMethod

async def fetch_with_throttle():
    # Only 5 requests running concurrently at any time
    requester = AioHttpThrottleRequester(
        max_concurrency=5,
        timeout=30
    )

    urls = [f"https://api.example.com/item/{i}" for i in range(100)]

    try:
        tasks = [
            requester.request(method=HTTPMethod.GET, url=url)
            for url in urls
        ]

        # Even with 100 tasks, only 5 will run simultaneously
        responses = await asyncio.gather(*tasks)

        results = []
        for response in responses:
            data = await response.json()
            results.append(data)

        return results
    finally:
        await requester.close()

results = asyncio.run(fetch_with_throttle())

Rate limiting requests per time period#

Comply with API rate limits by controlling request frequency:

import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.utils import HTTPMethod

async def fetch_with_rate_limit():
    # Limit to 10 requests per second
    requester = AioHttpRateLimitRequester(
        max_rate=10,           # Max 10 requests...
        time_period=1.0,       # ...per second
        timeout=30
    )

    try:
        # Make 50 requests respecting the rate limit
        tasks = [
            requester.request(
                method=HTTPMethod.GET,
                url=f"https://api.example.com/data/{i}"
            )
            for i in range(50)
        ]

        # Requests will be automatically rate-limited
        responses = await asyncio.gather(*tasks)

        results = []
        for response in responses:
            data = await response.json()
            results.append(data)

        return results
    finally:
        await requester.close()

results = asyncio.run(fetch_with_rate_limit())

Choosing between throttle and rate limit#

Use AioHttpThrottleRequester when:

  • You want to limit resource usage (CPU, memory, connections)

  • Protecting your client from overload

  • No time-based restrictions needed

Use AioHttpRateLimitRequester when:

  • API has rate limits (e.g., 1000 requests/hour)

  • You need time-based request control

Example: GitHub API compliance

import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.requesters.base import HTTPMethod

async def fetch_github_data():
    # GitHub API: 5000 requests per hour
    requester = AioHttpRateLimitRequester(
        max_rate=5000,
        time_period=3600,  # 1 hour in seconds
        timeout=30
    )

    try:
        response = await requester.request(
            method=HTTPMethod.GET,
            url="https://api.github.com/users/octocat",
            headers={"Authorization": "token YOUR_TOKEN"}
        )

        data = await response.json()
        return data
    finally:
        await requester.close()

result = asyncio.run(fetch_github_data())

Example: Combining rate limiting and concurrency throttling

import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.requesters.aiohttp_throttle import AioHttpThrottleRequester
from core_https.requesters.base import HTTPMethod

class RateLimitedThrottleClient(AioHttpRateLimitRequester, AioHttpThrottleRequester):
    @classmethod
    def engine(cls) -> str:
        return "aiohttp_rate_limit_throttle"

async def fetch_with_both():
    async with RateLimitedThrottleClient(
        max_rate=100,          # Max 100 requests per minute
        time_period=60,
        max_concurrency=10,    # At most 10 in-flight at once
        timeout=30
    ) as client:
        tasks = [
            client.request(method=HTTPMethod.GET, url=f"https://api.example.com/item/{i}")
            for i in range(50)
        ]
        return await asyncio.gather(*tasks)

asyncio.run(fetch_with_both())

Error Handling#

Handling specific HTTP errors#

from core_https.requesters.requests_ import RequestsRequester
from core_https.exceptions import (
    AuthenticationException,
    AuthorizationException,
    RateLimitException,
    RetryableException,
    InternalServerError
)
from core_https.utils import HTTPMethod

requester = RequestsRequester(
    timeout=30,
    retries=3,
    raise_for_status=True
)

try:
    response = requester.request(
        method=HTTPMethod.GET,
        url="https://api.example.com/protected",
        headers={"Authorization": "Bearer invalid_token"}
    )
except AuthenticationException as e:
    print(f"Authentication failed (401): {e}")
    # Re-authenticate and retry
except AuthorizationException as e:
    print(f"Not authorized (403): {e}")
    # User doesn't have permissions
except RateLimitException as e:
    print(f"Rate limited (429): {e}")
    # Wait and retry
except RetryableException as e:
    print(f"Temporary failure (502/503/504): {e}")
    # Service temporarily unavailable
except InternalServerError as e:
    print(f"Server error (5xx): {e}")
    # Log and alert

Retry strategies with backoff#

from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod

requester = RequestsRequester(
    timeout=30,
    retries=5,              # Retry up to 5 times
    backoff_factor=0.5,     # Wait 0.5 * attempt_number seconds
    raise_for_status=True
)

# If request fails, it will retry with exponential backoff:
# Attempt 1: immediate
# Attempt 2: wait 0.5 seconds
# Attempt 3: wait 1.0 seconds
# Attempt 4: wait 1.5 seconds
# Attempt 5: wait 2.0 seconds

response = requester.request(
    method=HTTPMethod.GET,
    url="https://unreliable-api.example.com/data"
)

HTTP Status Codes & Methods#

Using HTTPStatus enum#

from core_https.utils import HTTPStatus

# Check status code
status = HTTPStatus.OK
print(status.value)        # 200
print(status.phrase)       # "OK"
print(status.description)  # "Request succeeded"

# Status checking helpers
print(HTTPStatus.OK.is_success)              # True (2xx)
print(HTTPStatus.NOT_FOUND.is_client_error)  # True (4xx)
print(HTTPStatus.BAD_GATEWAY.is_server_error) # True (5xx)
print(HTTPStatus.CREATED.is_redirect)        # False

# Use in conditionals
response_status = 404
if HTTPStatus(response_status).is_client_error:
    print("Client error occurred")

Using HTTPMethod enum#

from core_https.utils import HTTPMethod

# Method properties
method = HTTPMethod.GET
print(method.is_safe)        # True (doesn't modify server state)
print(method.is_idempotent)  # True (multiple calls have same effect)
print(method.is_cacheable)   # True (response can be cached)

# POST example
method = HTTPMethod.POST
print(method.is_safe)        # False
print(method.is_idempotent)  # False
print(method.is_cacheable)   # False

# Use with requesters
from core_https.requesters.requests_ import RequestsRequester

requester = RequestsRequester()

# GET request
response = requester.request(
    method=HTTPMethod.GET,
    url="https://api.example.com/users"
)

# POST request
response = requester.request(
    method=HTTPMethod.POST,
    url="https://api.example.com/users",
    json={"name": "John Doe", "email": "john@example.com"}
)

Testing with Mocks#

Testing code that uses requesters#

import unittest
from core_https.tests.decorators import patch_requests
from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod

class TestMyAPI(unittest.TestCase):

    @patch_requests(json_response={"id": 1, "name": "Test User"})
    def test_fetch_user(self):
        """Test fetching user data"""
        requester = RequestsRequester()

        response = requester.request(
            method=HTTPMethod.GET,
            url="https://api.example.com/users/1"
        )

        data = response.json()
        self.assertEqual(data["id"], 1)
        self.assertEqual(data["name"], "Test User")

    @patch_requests(
        status=404,
        text_response="User not found"
    )
    def test_user_not_found(self):
        """Test 404 error handling"""
        requester = RequestsRequester(raise_for_status=False)

        response = requester.request(
            method=HTTPMethod.GET,
            url="https://api.example.com/users/999"
        )

        self.assertEqual(response.status_code, 404)
        self.assertEqual(response.text, "User not found")

Testing async code with aiohttp#

import unittest
import asyncio
from core_https.tests.decorators import patch_aiohttp
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod

class TestMyAsyncAPI(unittest.TestCase):

    @patch_aiohttp(json_response={"status": "ok"})
    def test_async_request(self):
        """Test async request"""
        async def fetch():
            async with AioHttpRequester() as requester:
                response = await requester.request(
                    method=HTTPMethod.GET,
                    url="https://api.example.com/status"
                )
                data = await response.json()
                return data

        result = asyncio.run(fetch())
        self.assertEqual(result["status"], "ok")

Best Practices#

1. Always use context managers for async requesters#

# ✅ Good - Ensures proper cleanup
async with AioHttpRequester() as requester:
    response = await requester.request(...)

# ❌ Bad - May leak resources
requester = AioHttpRequester()
response = await requester.request(...)

2. Set appropriate timeouts#

# ✅ Good - Prevents hanging requests
requester = RequestsRequester(timeout=30)

# ❌ Bad - May hang forever
requester = RequestsRequester()  # No timeout

3. Use rate limiting for external APIs#

# ✅ Good - Respects API limits
requester = AioHttpRateLimitRequester(
    max_rate=100,
    time_period=60  # 100 requests per minute
)

# ❌ Bad - May get rate limited or banned
requester = AioHttpRequester()

4. Handle exceptions appropriately#

# ✅ Good - Specific error handling
try:
    response = requester.request(...)
except RateLimitException:
    time.sleep(60)  # Wait before retry
except AuthenticationException:
    refresh_token()  # Re-authenticate

# ❌ Bad - Catches everything
try:
    response = requester.request(...)
except Exception:
    pass  # Silently ignores all errors

5. Close requesters when done#

# ✅ Good - Explicit cleanup
requester = AioHttpRequester()
try:
    response = await requester.request(...)
finally:
    await requester.close()

# Or use context manager
async with AioHttpRequester() as requester:
    response = await requester.request(...)