Files
lovdata-chat/session-manager/http_pool.py
Torbjørn Lindahl 9683cf280b fix: add SSE streaming proxy and robust make try startup
The SSE proxy was buffering the entire response body with a 30s read
timeout, causing 504s on the OpenCode /global/event stream. Add a
streaming path that detects SSE requests (by Accept header or /event
path) and returns a StreamingResponse with no read timeout.

Also fix the make try target to poll the health endpoint for Docker
readiness and wait for the container to reach running status before
opening the browser.
2026-02-16 00:38:57 +01:00

201 lines
7.2 KiB
Python

"""
HTTP Connection Pool Manager
Provides a global httpx.AsyncClient instance with connection pooling
to eliminate the overhead of creating new HTTP clients for each proxy request.
"""
import asyncio
import logging
import time
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
import httpx
logger = logging.getLogger(__name__)
class HTTPConnectionPool:
"""Global HTTP connection pool manager for proxy operations."""
def __init__(self):
self._client: Optional[httpx.AsyncClient] = None
self._last_health_check: float = 0
self._health_check_interval: float = 60 # Check health every 60 seconds
self._is_healthy: bool = True
self._reconnect_lock = asyncio.Lock()
# Connection pool configuration
self._config = {
"limits": httpx.Limits(
max_keepalive_connections=20, # Keep connections alive
max_connections=100, # Max total connections
keepalive_expiry=300.0, # Keep connections alive for 5 minutes
),
"timeout": httpx.Timeout(
connect=10.0, # Connection timeout
read=30.0, # Read timeout
write=10.0, # Write timeout
pool=5.0, # Pool timeout
),
"follow_redirects": False,
"http2": False, # Disable HTTP/2 for simplicity
}
async def __aenter__(self):
await self.ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Keep client alive - don't close it
pass
async def ensure_client(self) -> None:
"""Ensure the HTTP client is initialized and healthy."""
if self._client is None:
await self._create_client()
# Periodic health check
current_time = time.time()
if current_time - self._last_health_check > self._health_check_interval:
if not await self._check_client_health():
logger.warning("HTTP client health check failed, recreating client")
await self._recreate_client()
self._last_health_check = current_time
async def _create_client(self) -> None:
"""Create a new HTTP client with connection pooling."""
async with self._reconnect_lock:
if self._client:
await self._client.aclose()
self._client = httpx.AsyncClient(**self._config)
self._is_healthy = True
logger.info("HTTP connection pool client created")
async def _recreate_client(self) -> None:
"""Recreate the HTTP client (used when health check fails)."""
logger.info("Recreating HTTP connection pool client")
await self._create_client()
async def _check_client_health(self) -> bool:
"""Check if the HTTP client is still healthy."""
if not self._client:
return False
try:
# Simple health check - we could ping a reliable endpoint
# For now, just check if client is still responsive
# In a real implementation, you might ping a health endpoint
return self._is_healthy
except Exception as e:
logger.warning(f"HTTP client health check error: {e}")
return False
async def request(self, method: str, url: str, **kwargs) -> httpx.Response:
"""Make an HTTP request using the connection pool."""
await self.ensure_client()
if not self._client:
raise RuntimeError("HTTP client not available")
try:
response = await self._client.request(method, url, **kwargs)
return response
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.PoolTimeout) as e:
# Connection-related errors - client might be unhealthy
logger.warning(f"Connection error, marking client as unhealthy: {e}")
self._is_healthy = False
raise
except Exception as e:
# Other errors - re-raise as-is
raise
async def close(self) -> None:
"""Close the HTTP client and cleanup resources."""
async with self._reconnect_lock:
if self._client:
await self._client.aclose()
self._client = None
self._is_healthy = False
logger.info("HTTP connection pool client closed")
async def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics."""
if not self._client:
return {"status": "not_initialized"}
# httpx doesn't expose detailed pool stats, but we can provide basic info
return {
"status": "healthy" if self._is_healthy else "unhealthy",
"last_health_check": self._last_health_check,
"config": {
"max_keepalive_connections": self._config[
"limits"
].max_keepalive_connections,
"max_connections": self._config["limits"].max_connections,
"keepalive_expiry": self._config["limits"].keepalive_expiry,
"connect_timeout": self._config["timeout"].connect,
"read_timeout": self._config["timeout"].read,
},
}
# Global HTTP connection pool instance
_http_pool = HTTPConnectionPool()
@asynccontextmanager
async def get_http_client():
"""Context manager for getting the global HTTP client."""
async with _http_pool:
yield _http_pool
async def make_http_request(method: str, url: str, **kwargs) -> httpx.Response:
"""Make an HTTP request using the global connection pool."""
async with get_http_client() as client:
return await client.request(method, url, **kwargs)
@asynccontextmanager
async def stream_http_request(method: str, url: str, **kwargs):
"""Stream an HTTP response using a dedicated client with no read timeout.
Yields an httpx.Response whose body has NOT been read -- caller must
iterate over ``response.aiter_bytes()`` / ``aiter_lines()`` etc.
A separate AsyncClient is used (not the pool) because httpx's
``stream()`` keeps the connection checked-out for the lifetime of the
context manager, and SSE streams are effectively infinite. Using a
short-lived client avoids starving the pool.
"""
timeout = httpx.Timeout(connect=10.0, read=None, write=10.0, pool=5.0)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
async with client.stream(method, url, **kwargs) as response:
yield response
async def get_connection_pool_stats() -> Dict[str, Any]:
"""Get connection pool statistics."""
return await _http_pool.get_pool_stats()
async def close_connection_pool() -> None:
"""Close the global connection pool (for cleanup)."""
await _http_pool.close()
# Lifecycle management for FastAPI
async def init_http_pool() -> None:
"""Initialize the HTTP connection pool on startup."""
logger.info("Initializing HTTP connection pool")
await _http_pool.ensure_client()
async def shutdown_http_pool() -> None:
"""Shutdown the HTTP connection pool on shutdown."""
logger.info("Shutting down HTTP connection pool")
await _http_pool.close()