Examples#
This page provides practical examples for using core-https in your projects.
Basic HTTP Requests#
Using requests library#
from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod
# Create a requester instance
requester = RequestsRequester(
timeout=30,
retries=3,
raise_for_status=True
)
# Make a GET request
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/users"
)
# Access response data
data = response.json()
print(f"Status: {response.status_code}")
print(f"Data: {data}")
Using urllib3#
from core_https.requesters.urllib3_ import Urllib3Requester
from core_https.utils import HTTPMethod
requester = Urllib3Requester(
timeout=30,
connector_limit=10, # Connection pool size
retries=3
)
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/data",
headers={"Authorization": "Bearer token"}
)
# urllib3 responses need decoding
data = response.data.decode('utf-8')
print(f"Response: {data}")
Async HTTP Requests#
Basic async request with aiohttp#
import asyncio
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod
async def fetch_data():
async with AioHttpRequester(timeout=30, retries=3) as requester:
response = await requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/data"
)
data = await response.json()
return data
# Run the async function
result = asyncio.run(fetch_data())
print(result)
Multiple concurrent requests#
import asyncio
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod
async def fetch_all(urls):
async with AioHttpRequester(timeout=30) as requester:
tasks = [
requester.request(method=HTTPMethod.GET, url=url)
for url in urls
]
responses = await asyncio.gather(*tasks)
# Process all responses
results = []
for response in responses:
data = await response.json()
results.append(data)
return results
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
]
results = asyncio.run(fetch_all(urls))
Rate Limiting & Throttling#
Throttling concurrent requests#
Limit the number of simultaneous requests to prevent overwhelming the client or server:
import asyncio
from core_https.requesters.aiohttp_throttle import AioHttpThrottleRequester
from core_https.utils import HTTPMethod
async def fetch_with_throttle():
# Only 5 requests running concurrently at any time
requester = AioHttpThrottleRequester(
max_concurrency=5,
timeout=30
)
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
try:
tasks = [
requester.request(method=HTTPMethod.GET, url=url)
for url in urls
]
# Even with 100 tasks, only 5 will run simultaneously
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
data = await response.json()
results.append(data)
return results
finally:
await requester.close()
results = asyncio.run(fetch_with_throttle())
Rate limiting requests per time period#
Comply with API rate limits by controlling request frequency:
import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.utils import HTTPMethod
async def fetch_with_rate_limit():
# Limit to 10 requests per second
requester = AioHttpRateLimitRequester(
max_rate=10, # Max 10 requests...
time_period=1.0, # ...per second
timeout=30
)
try:
# Make 50 requests respecting the rate limit
tasks = [
requester.request(
method=HTTPMethod.GET,
url=f"https://api.example.com/data/{i}"
)
for i in range(50)
]
# Requests will be automatically rate-limited
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
data = await response.json()
results.append(data)
return results
finally:
await requester.close()
results = asyncio.run(fetch_with_rate_limit())
Choosing between throttle and rate limit#
Use AioHttpThrottleRequester when:
You want to limit resource usage (CPU, memory, connections)
Protecting your client from overload
No time-based restrictions needed
Use AioHttpRateLimitRequester when:
API has rate limits (e.g., 1000 requests/hour)
You need time-based request control
Example: GitHub API compliance
import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.requesters.base import HTTPMethod
async def fetch_github_data():
# GitHub API: 5000 requests per hour
requester = AioHttpRateLimitRequester(
max_rate=5000,
time_period=3600, # 1 hour in seconds
timeout=30
)
try:
response = await requester.request(
method=HTTPMethod.GET,
url="https://api.github.com/users/octocat",
headers={"Authorization": "token YOUR_TOKEN"}
)
data = await response.json()
return data
finally:
await requester.close()
result = asyncio.run(fetch_github_data())
Example: Combining rate limiting and concurrency throttling
import asyncio
from core_https.requesters.aiohttp_rate_limit import AioHttpRateLimitRequester
from core_https.requesters.aiohttp_throttle import AioHttpThrottleRequester
from core_https.requesters.base import HTTPMethod
class RateLimitedThrottleClient(AioHttpRateLimitRequester, AioHttpThrottleRequester):
@classmethod
def engine(cls) -> str:
return "aiohttp_rate_limit_throttle"
async def fetch_with_both():
async with RateLimitedThrottleClient(
max_rate=100, # Max 100 requests per minute
time_period=60,
max_concurrency=10, # At most 10 in-flight at once
timeout=30
) as client:
tasks = [
client.request(method=HTTPMethod.GET, url=f"https://api.example.com/item/{i}")
for i in range(50)
]
return await asyncio.gather(*tasks)
asyncio.run(fetch_with_both())
Error Handling#
Handling specific HTTP errors#
from core_https.requesters.requests_ import RequestsRequester
from core_https.exceptions import (
AuthenticationException,
AuthorizationException,
RateLimitException,
RetryableException,
InternalServerError
)
from core_https.utils import HTTPMethod
requester = RequestsRequester(
timeout=30,
retries=3,
raise_for_status=True
)
try:
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/protected",
headers={"Authorization": "Bearer invalid_token"}
)
except AuthenticationException as e:
print(f"Authentication failed (401): {e}")
# Re-authenticate and retry
except AuthorizationException as e:
print(f"Not authorized (403): {e}")
# User doesn't have permissions
except RateLimitException as e:
print(f"Rate limited (429): {e}")
# Wait and retry
except RetryableException as e:
print(f"Temporary failure (502/503/504): {e}")
# Service temporarily unavailable
except InternalServerError as e:
print(f"Server error (5xx): {e}")
# Log and alert
Retry strategies with backoff#
from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod
requester = RequestsRequester(
timeout=30,
retries=5, # Retry up to 5 times
backoff_factor=0.5, # Wait 0.5 * attempt_number seconds
raise_for_status=True
)
# If request fails, it will retry with exponential backoff:
# Attempt 1: immediate
# Attempt 2: wait 0.5 seconds
# Attempt 3: wait 1.0 seconds
# Attempt 4: wait 1.5 seconds
# Attempt 5: wait 2.0 seconds
response = requester.request(
method=HTTPMethod.GET,
url="https://unreliable-api.example.com/data"
)
HTTP Status Codes & Methods#
Using HTTPStatus enum#
from core_https.utils import HTTPStatus
# Check status code
status = HTTPStatus.OK
print(status.value) # 200
print(status.phrase) # "OK"
print(status.description) # "Request succeeded"
# Status checking helpers
print(HTTPStatus.OK.is_success) # True (2xx)
print(HTTPStatus.NOT_FOUND.is_client_error) # True (4xx)
print(HTTPStatus.BAD_GATEWAY.is_server_error) # True (5xx)
print(HTTPStatus.CREATED.is_redirect) # False
# Use in conditionals
response_status = 404
if HTTPStatus(response_status).is_client_error:
print("Client error occurred")
Using HTTPMethod enum#
from core_https.utils import HTTPMethod
# Method properties
method = HTTPMethod.GET
print(method.is_safe) # True (doesn't modify server state)
print(method.is_idempotent) # True (multiple calls have same effect)
print(method.is_cacheable) # True (response can be cached)
# POST example
method = HTTPMethod.POST
print(method.is_safe) # False
print(method.is_idempotent) # False
print(method.is_cacheable) # False
# Use with requesters
from core_https.requesters.requests_ import RequestsRequester
requester = RequestsRequester()
# GET request
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/users"
)
# POST request
response = requester.request(
method=HTTPMethod.POST,
url="https://api.example.com/users",
json={"name": "John Doe", "email": "john@example.com"}
)
Testing with Mocks#
Testing code that uses requesters#
import unittest
from core_https.tests.decorators import patch_requests
from core_https.requesters.requests_ import RequestsRequester
from core_https.utils import HTTPMethod
class TestMyAPI(unittest.TestCase):
@patch_requests(json_response={"id": 1, "name": "Test User"})
def test_fetch_user(self):
"""Test fetching user data"""
requester = RequestsRequester()
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/users/1"
)
data = response.json()
self.assertEqual(data["id"], 1)
self.assertEqual(data["name"], "Test User")
@patch_requests(
status=404,
text_response="User not found"
)
def test_user_not_found(self):
"""Test 404 error handling"""
requester = RequestsRequester(raise_for_status=False)
response = requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/users/999"
)
self.assertEqual(response.status_code, 404)
self.assertEqual(response.text, "User not found")
Testing async code with aiohttp#
import unittest
import asyncio
from core_https.tests.decorators import patch_aiohttp
from core_https.requesters.aiohttp_ import AioHttpRequester
from core_https.utils import HTTPMethod
class TestMyAsyncAPI(unittest.TestCase):
@patch_aiohttp(json_response={"status": "ok"})
def test_async_request(self):
"""Test async request"""
async def fetch():
async with AioHttpRequester() as requester:
response = await requester.request(
method=HTTPMethod.GET,
url="https://api.example.com/status"
)
data = await response.json()
return data
result = asyncio.run(fetch())
self.assertEqual(result["status"], "ok")
Best Practices#
1. Always use context managers for async requesters#
# ✅ Good - Ensures proper cleanup
async with AioHttpRequester() as requester:
response = await requester.request(...)
# ❌ Bad - May leak resources
requester = AioHttpRequester()
response = await requester.request(...)
2. Set appropriate timeouts#
# ✅ Good - Prevents hanging requests
requester = RequestsRequester(timeout=30)
# ❌ Bad - May hang forever
requester = RequestsRequester() # No timeout
3. Use rate limiting for external APIs#
# ✅ Good - Respects API limits
requester = AioHttpRateLimitRequester(
max_rate=100,
time_period=60 # 100 requests per minute
)
# ❌ Bad - May get rate limited or banned
requester = AioHttpRequester()
4. Handle exceptions appropriately#
# ✅ Good - Specific error handling
try:
response = requester.request(...)
except RateLimitException:
time.sleep(60) # Wait before retry
except AuthenticationException:
refresh_token() # Re-authenticate
# ❌ Bad - Catches everything
try:
response = requester.request(...)
except Exception:
pass # Silently ignores all errors
5. Close requesters when done#
# ✅ Good - Explicit cleanup
requester = AioHttpRequester()
try:
response = await requester.request(...)
finally:
await requester.close()
# Or use context manager
async with AioHttpRequester() as requester:
response = await requester.request(...)