""" HTTP Connection Pool Manager Provides a global httpx.AsyncClient instance with connection pooling to eliminate the overhead of creating new HTTP clients for each proxy request. """ import asyncio import logging import time from typing import Optional, Dict, Any from contextlib import asynccontextmanager import httpx logger = logging.getLogger(__name__) class HTTPConnectionPool: """Global HTTP connection pool manager for proxy operations.""" def __init__(self): self._client: Optional[httpx.AsyncClient] = None self._last_health_check: float = 0 self._health_check_interval: float = 60 # Check health every 60 seconds self._is_healthy: bool = True self._reconnect_lock = asyncio.Lock() # Connection pool configuration self._config = { "limits": httpx.Limits( max_keepalive_connections=20, # Keep connections alive max_connections=100, # Max total connections keepalive_expiry=300.0, # Keep connections alive for 5 minutes ), "timeout": httpx.Timeout( connect=10.0, # Connection timeout read=30.0, # Read timeout write=10.0, # Write timeout pool=5.0, # Pool timeout ), "follow_redirects": False, "http2": False, # Disable HTTP/2 for simplicity } async def __aenter__(self): await self.ensure_client() return self async def __aexit__(self, exc_type, exc_val, exc_tb): # Keep client alive - don't close it pass async def ensure_client(self) -> None: """Ensure the HTTP client is initialized and healthy.""" if self._client is None: await self._create_client() # Periodic health check current_time = time.time() if current_time - self._last_health_check > self._health_check_interval: if not await self._check_client_health(): logger.warning("HTTP client health check failed, recreating client") await self._recreate_client() self._last_health_check = current_time async def _create_client(self) -> None: """Create a new HTTP client with connection pooling.""" async with self._reconnect_lock: if self._client: await self._client.aclose() self._client = httpx.AsyncClient(**self._config) self._is_healthy = True logger.info("HTTP connection pool client created") async def _recreate_client(self) -> None: """Recreate the HTTP client (used when health check fails).""" logger.info("Recreating HTTP connection pool client") await self._create_client() async def _check_client_health(self) -> bool: """Check if the HTTP client is still healthy.""" if not self._client: return False try: # Simple health check - we could ping a reliable endpoint # For now, just check if client is still responsive # In a real implementation, you might ping a health endpoint return self._is_healthy except Exception as e: logger.warning(f"HTTP client health check error: {e}") return False async def request(self, method: str, url: str, **kwargs) -> httpx.Response: """Make an HTTP request using the connection pool.""" await self.ensure_client() if not self._client: raise RuntimeError("HTTP client not available") try: response = await self._client.request(method, url, **kwargs) return response except (httpx.ConnectError, httpx.ConnectTimeout, httpx.PoolTimeout) as e: # Connection-related errors - client might be unhealthy logger.warning(f"Connection error, marking client as unhealthy: {e}") self._is_healthy = False raise except Exception as e: # Other errors - re-raise as-is raise async def close(self) -> None: """Close the HTTP client and cleanup resources.""" async with self._reconnect_lock: if self._client: await self._client.aclose() self._client = None self._is_healthy = False logger.info("HTTP connection pool client closed") async def get_pool_stats(self) -> Dict[str, Any]: """Get connection pool statistics.""" if not self._client: return {"status": "not_initialized"} # httpx doesn't expose detailed pool stats, but we can provide basic info return { "status": "healthy" if self._is_healthy else "unhealthy", "last_health_check": self._last_health_check, "config": { "max_keepalive_connections": self._config[ "limits" ].max_keepalive_connections, "max_connections": self._config["limits"].max_connections, "keepalive_expiry": self._config["limits"].keepalive_expiry, "connect_timeout": self._config["timeout"].connect, "read_timeout": self._config["timeout"].read, }, } # Global HTTP connection pool instance _http_pool = HTTPConnectionPool() @asynccontextmanager async def get_http_client(): """Context manager for getting the global HTTP client.""" async with _http_pool: yield _http_pool async def make_http_request(method: str, url: str, **kwargs) -> httpx.Response: """Make an HTTP request using the global connection pool.""" async with get_http_client() as client: return await client.request(method, url, **kwargs) async def get_connection_pool_stats() -> Dict[str, Any]: """Get connection pool statistics.""" return await _http_pool.get_pool_stats() async def close_connection_pool() -> None: """Close the global connection pool (for cleanup).""" await _http_pool.close() # Lifecycle management for FastAPI async def init_http_pool() -> None: """Initialize the HTTP connection pool on startup.""" logger.info("Initializing HTTP connection pool") await _http_pool.ensure_client() async def shutdown_http_pool() -> None: """Shutdown the HTTP connection pool on shutdown.""" logger.info("Shutting down HTTP connection pool") await _http_pool.close()