183 lines
6.3 KiB
Python
183 lines
6.3 KiB
Python
"""
|
|
HTTP Connection Pool Manager
|
|
|
|
Provides a global httpx.AsyncClient instance with connection pooling
|
|
to eliminate the overhead of creating new HTTP clients for each proxy request.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Optional, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HTTPConnectionPool:
|
|
"""Global HTTP connection pool manager for proxy operations."""
|
|
|
|
def __init__(self):
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
self._last_health_check: float = 0
|
|
self._health_check_interval: float = 60 # Check health every 60 seconds
|
|
self._is_healthy: bool = True
|
|
self._reconnect_lock = asyncio.Lock()
|
|
|
|
# Connection pool configuration
|
|
self._config = {
|
|
"limits": httpx.Limits(
|
|
max_keepalive_connections=20, # Keep connections alive
|
|
max_connections=100, # Max total connections
|
|
keepalive_expiry=300.0, # Keep connections alive for 5 minutes
|
|
),
|
|
"timeout": httpx.Timeout(
|
|
connect=10.0, # Connection timeout
|
|
read=30.0, # Read timeout
|
|
write=10.0, # Write timeout
|
|
pool=5.0, # Pool timeout
|
|
),
|
|
"follow_redirects": False,
|
|
"http2": False, # Disable HTTP/2 for simplicity
|
|
}
|
|
|
|
async def __aenter__(self):
|
|
await self.ensure_client()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
# Keep client alive - don't close it
|
|
pass
|
|
|
|
async def ensure_client(self) -> None:
|
|
"""Ensure the HTTP client is initialized and healthy."""
|
|
if self._client is None:
|
|
await self._create_client()
|
|
|
|
# Periodic health check
|
|
current_time = time.time()
|
|
if current_time - self._last_health_check > self._health_check_interval:
|
|
if not await self._check_client_health():
|
|
logger.warning("HTTP client health check failed, recreating client")
|
|
await self._recreate_client()
|
|
self._last_health_check = current_time
|
|
|
|
async def _create_client(self) -> None:
|
|
"""Create a new HTTP client with connection pooling."""
|
|
async with self._reconnect_lock:
|
|
if self._client:
|
|
await self._client.aclose()
|
|
|
|
self._client = httpx.AsyncClient(**self._config)
|
|
self._is_healthy = True
|
|
logger.info("HTTP connection pool client created")
|
|
|
|
async def _recreate_client(self) -> None:
|
|
"""Recreate the HTTP client (used when health check fails)."""
|
|
logger.info("Recreating HTTP connection pool client")
|
|
await self._create_client()
|
|
|
|
async def _check_client_health(self) -> bool:
|
|
"""Check if the HTTP client is still healthy."""
|
|
if not self._client:
|
|
return False
|
|
|
|
try:
|
|
# Simple health check - we could ping a reliable endpoint
|
|
# For now, just check if client is still responsive
|
|
# In a real implementation, you might ping a health endpoint
|
|
return self._is_healthy
|
|
except Exception as e:
|
|
logger.warning(f"HTTP client health check error: {e}")
|
|
return False
|
|
|
|
async def request(self, method: str, url: str, **kwargs) -> httpx.Response:
|
|
"""Make an HTTP request using the connection pool."""
|
|
await self.ensure_client()
|
|
|
|
if not self._client:
|
|
raise RuntimeError("HTTP client not available")
|
|
|
|
try:
|
|
response = await self._client.request(method, url, **kwargs)
|
|
return response
|
|
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.PoolTimeout) as e:
|
|
# Connection-related errors - client might be unhealthy
|
|
logger.warning(f"Connection error, marking client as unhealthy: {e}")
|
|
self._is_healthy = False
|
|
raise
|
|
except Exception as e:
|
|
# Other errors - re-raise as-is
|
|
raise
|
|
|
|
async def close(self) -> None:
|
|
"""Close the HTTP client and cleanup resources."""
|
|
async with self._reconnect_lock:
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
self._is_healthy = False
|
|
logger.info("HTTP connection pool client closed")
|
|
|
|
async def get_pool_stats(self) -> Dict[str, Any]:
|
|
"""Get connection pool statistics."""
|
|
if not self._client:
|
|
return {"status": "not_initialized"}
|
|
|
|
# httpx doesn't expose detailed pool stats, but we can provide basic info
|
|
return {
|
|
"status": "healthy" if self._is_healthy else "unhealthy",
|
|
"last_health_check": self._last_health_check,
|
|
"config": {
|
|
"max_keepalive_connections": self._config[
|
|
"limits"
|
|
].max_keepalive_connections,
|
|
"max_connections": self._config["limits"].max_connections,
|
|
"keepalive_expiry": self._config["limits"].keepalive_expiry,
|
|
"connect_timeout": self._config["timeout"].connect,
|
|
"read_timeout": self._config["timeout"].read,
|
|
},
|
|
}
|
|
|
|
|
|
# Global HTTP connection pool instance
|
|
_http_pool = HTTPConnectionPool()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def get_http_client():
|
|
"""Context manager for getting the global HTTP client."""
|
|
async with _http_pool:
|
|
yield _http_pool
|
|
|
|
|
|
async def make_http_request(method: str, url: str, **kwargs) -> httpx.Response:
|
|
"""Make an HTTP request using the global connection pool."""
|
|
async with get_http_client() as client:
|
|
return await client.request(method, url, **kwargs)
|
|
|
|
|
|
async def get_connection_pool_stats() -> Dict[str, Any]:
|
|
"""Get connection pool statistics."""
|
|
return await _http_pool.get_pool_stats()
|
|
|
|
|
|
async def close_connection_pool() -> None:
|
|
"""Close the global connection pool (for cleanup)."""
|
|
await _http_pool.close()
|
|
|
|
|
|
# Lifecycle management for FastAPI
|
|
async def init_http_pool() -> None:
|
|
"""Initialize the HTTP connection pool on startup."""
|
|
logger.info("Initializing HTTP connection pool")
|
|
await _http_pool.ensure_client()
|
|
|
|
|
|
async def shutdown_http_pool() -> None:
|
|
"""Shutdown the HTTP connection pool on shutdown."""
|
|
logger.info("Shutting down HTTP connection pool")
|
|
await _http_pool.close()
|