Files
lovdata-chat/session-manager/main.py

1173 lines
40 KiB
Python

"""
Session Management Service for Lovdata Chat
This service manages the lifecycle of OpenCode containers for individual user sessions.
Each session gets its own isolated container with a dedicated working directory.
"""
import os
import uuid
import json
import asyncio
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, List
from contextlib import asynccontextmanager
import docker
from docker.errors import DockerException, NotFound
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
import uvicorn
import httpx
# Import host IP detection utility
from host_ip_detector import async_get_host_ip, reset_host_ip_cache
# Import resource management utilities
from resource_manager import (
get_resource_limits,
check_system_resources,
should_throttle_sessions,
ResourceLimits,
ResourceValidator,
)
# Import structured logging
from logging_config import (
get_logger,
RequestContext,
log_performance,
log_request,
log_session_operation,
log_security_event,
init_logging,
)
# Import async Docker client
from async_docker_client import (
get_async_docker_client,
async_create_container,
async_start_container,
async_stop_container,
async_remove_container,
async_get_container,
async_list_containers,
async_docker_ping,
)
# Import HTTP connection pool
from http_pool import (
make_http_request,
get_connection_pool_stats,
init_http_pool,
shutdown_http_pool,
)
# Import session authentication
from session_auth import (
generate_session_auth_token,
validate_session_auth_token,
revoke_session_auth_token,
rotate_session_auth_token,
cleanup_expired_auth_tokens,
get_session_auth_info,
get_active_auth_sessions_count,
list_active_auth_sessions,
)
# Import database layer
from database import (
init_database,
shutdown_database,
SessionModel,
get_database_stats,
run_migrations,
)
# Import container health monitoring
from container_health import (
start_container_health_monitoring,
stop_container_health_monitoring,
get_container_health_stats,
get_container_health_history,
)
# Import Docker service abstraction
from docker_service import DockerService, DockerOperationError
# Import database layer
from database import (
init_database,
shutdown_database,
SessionModel,
get_database_stats,
run_migrations,
)
# Initialize structured logging
init_logging()
# Get configured logger
logger = get_logger(__name__)
# Configuration for async operations
USE_ASYNC_DOCKER = os.getenv("USE_ASYNC_DOCKER", "true").lower() == "true"
# Session storage configuration
USE_DATABASE_STORAGE = os.getenv("USE_DATABASE_STORAGE", "true").lower() == "true"
# Configuration - Resource limits are now configurable and enforced
SESSIONS_DIR = Path("/app/sessions")
SESSIONS_FILE = Path("/app/sessions/sessions.json")
CONTAINER_IMAGE = os.getenv("CONTAINER_IMAGE", "lovdata-opencode:latest")
# Resource limits - configurable via environment variables with defaults
CONTAINER_MEMORY_LIMIT = os.getenv(
"CONTAINER_MEMORY_LIMIT", "4g"
) # Memory limit per container
CONTAINER_CPU_QUOTA = int(
os.getenv("CONTAINER_CPU_QUOTA", "100000")
) # CPU quota (100000 = 1 core)
CONTAINER_CPU_PERIOD = int(
os.getenv("CONTAINER_CPU_PERIOD", "100000")
) # CPU period (microseconds)
# Session management
MAX_CONCURRENT_SESSIONS = int(
os.getenv("MAX_CONCURRENT_SESSIONS", "3")
) # Max concurrent sessions
SESSION_TIMEOUT_MINUTES = int(
os.getenv("SESSION_TIMEOUT_MINUTES", "60")
) # Auto-cleanup timeout
# Resource monitoring thresholds
MEMORY_WARNING_THRESHOLD = float(
os.getenv("MEMORY_WARNING_THRESHOLD", "0.8")
) # 80% memory usage
CPU_WARNING_THRESHOLD = float(
os.getenv("CPU_WARNING_THRESHOLD", "0.9")
) # 90% CPU usage
class SessionData(BaseModel):
session_id: str
container_name: str
container_id: Optional[str] = None
host_dir: str
port: Optional[int] = None
auth_token: Optional[str] = None # Authentication token for the session
created_at: datetime
last_accessed: datetime
status: str = "creating" # creating, running, stopped, error
class SessionManager:
def __init__(self, docker_service: Optional[DockerService] = None):
# Use injected Docker service or create default
if docker_service:
self.docker_service = docker_service
else:
self.docker_service = DockerService(use_async=USE_ASYNC_DOCKER)
# Initialize session storage
if USE_DATABASE_STORAGE:
# Use database backend
self.sessions: Dict[
str, SessionData
] = {} # Keep in-memory cache for performance
logger.info("Session storage initialized", extra={"backend": "database"})
else:
# Use JSON file backend (legacy)
self.sessions: Dict[str, SessionData] = {}
self._load_sessions_from_file()
logger.info("Session storage initialized", extra={"backend": "json_file"})
# Initialize container health monitoring
from container_health import get_container_health_monitor
self.health_monitor = get_container_health_monitor()
# Dependencies will be set when health monitoring starts
logger.info(
"SessionManager initialized",
extra={
"docker_service_type": type(self.docker_service).__name__,
"storage_backend": "database" if USE_DATABASE_STORAGE else "json_file",
},
)
def _load_sessions_from_file(self):
"""Load session data from JSON file (legacy method)"""
if SESSIONS_FILE.exists():
try:
with open(SESSIONS_FILE, "r") as f:
data = json.load(f)
for session_id, session_dict in data.items():
# Convert datetime strings back to datetime objects
session_dict["created_at"] = datetime.fromisoformat(
session_dict["created_at"]
)
session_dict["last_accessed"] = datetime.fromisoformat(
session_dict["last_accessed"]
)
self.sessions[session_id] = SessionData(**session_dict)
logger.info(
"Sessions loaded from JSON file",
extra={"count": len(self.sessions)},
)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Could not load sessions file", extra={"error": str(e)})
self.sessions = {}
async def _load_sessions_from_database(self):
"""Load active sessions from database into memory cache"""
try:
# Load only running/creating sessions to avoid loading too much data
db_sessions = await SessionModel.get_sessions_by_status("running")
db_sessions.extend(await SessionModel.get_sessions_by_status("creating"))
self.sessions = {}
for session_dict in db_sessions:
# Convert to SessionData model
session_data = SessionData(**session_dict)
self.sessions[session_dict["session_id"]] = session_data
logger.info(
"Sessions loaded from database", extra={"count": len(self.sessions)}
)
except Exception as e:
logger.error(
"Failed to load sessions from database", extra={"error": str(e)}
)
self.sessions = {}
def _save_sessions(self):
"""Save session data to persistent storage"""
SESSIONS_DIR.mkdir(exist_ok=True)
data = {}
for session_id, session in self.sessions.items():
data[session_id] = session.dict()
with open(SESSIONS_FILE, "w") as f:
json.dump(data, f, indent=2, default=str)
def _generate_session_id(self) -> str:
"""Generate a unique session ID"""
return str(uuid.uuid4()).replace("-", "")[:16]
def _get_available_port(self) -> int:
"""Find an available port for the container"""
used_ports = {s.port for s in self.sessions.values() if s.port}
port = 8081 # Start from 8081 to avoid conflicts
while port in used_ports:
port += 1
return port
def _check_container_limits(self) -> bool:
"""Check if we're within concurrent session limits"""
active_sessions = sum(
1 for s in self.sessions.values() if s.status in ["creating", "running"]
)
return active_sessions < MAX_CONCURRENT_SESSIONS
async def _async_check_container_limits(self) -> bool:
"""Async version of container limits check"""
return self._check_container_limits()
async def create_session(self) -> SessionData:
"""Create a new OpenCode session with dedicated container"""
# Check concurrent session limits
if USE_ASYNC_DOCKER:
limits_ok = await self._async_check_container_limits()
else:
limits_ok = self._check_container_limits()
if not limits_ok:
raise HTTPException(
status_code=429,
detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached",
)
# Check system resource limits
should_throttle, reason = should_throttle_sessions()
if should_throttle:
raise HTTPException(
status_code=503,
detail=f"System resource constraints prevent new sessions: {reason}",
)
session_id = self._generate_session_id()
container_name = f"opencode-{session_id}"
host_dir = str(SESSIONS_DIR / session_id)
port = self._get_available_port()
# Create host directory
Path(host_dir).mkdir(parents=True, exist_ok=True)
# Generate authentication token for this session
auth_token = generate_session_auth_token(session_id)
session = SessionData(
session_id=session_id,
container_name=container_name,
host_dir=host_dir,
port=port,
auth_token=auth_token,
created_at=datetime.now(),
last_accessed=datetime.now(),
status="creating",
)
# Store in memory cache
self.sessions[session_id] = session
# Persist to database if using database storage
if USE_DATABASE_STORAGE:
try:
await SessionModel.create_session(
{
"session_id": session_id,
"container_name": container_name,
"host_dir": host_dir,
"port": port,
"auth_token": auth_token,
"status": "creating",
}
)
logger.info(
"Session created in database", extra={"session_id": session_id}
)
except Exception as e:
logger.error(
"Failed to create session in database",
extra={"session_id": session_id, "error": str(e)},
)
# Continue with in-memory storage as fallback
# Start container asynchronously
if USE_ASYNC_DOCKER:
asyncio.create_task(self._start_container_async(session))
else:
asyncio.create_task(self._start_container_sync(session))
return session
async def _start_container_async(self, session: SessionData):
"""Start the OpenCode container asynchronously using aiodeocker"""
try:
# Get and validate resource limits
resource_limits = get_resource_limits()
logger.info(
f"Starting container {session.container_name} with resource limits: memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}"
)
# Create container using Docker service
container_info = await self.docker_service.create_container(
name=session.container_name,
image=CONTAINER_IMAGE,
volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}},
ports={"8080": session.port},
environment={
"MCP_SERVER": os.getenv("MCP_SERVER", ""),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
"GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
# Secure authentication for OpenCode server
"OPENCODE_SERVER_PASSWORD": session.auth_token or "",
"SESSION_AUTH_TOKEN": session.auth_token or "",
"SESSION_ID": session.session_id,
},
network_mode="bridge",
# Apply resource limits to prevent resource exhaustion
mem_limit=resource_limits.memory_limit,
cpu_quota=resource_limits.cpu_quota,
cpu_period=resource_limits.cpu_period,
# Additional security and resource constraints
tmpfs={
"/tmp": "rw,noexec,nosuid,size=100m",
"/var/tmp": "rw,noexec,nosuid,size=50m",
},
)
# For async mode, containers are already started during creation
# For sync mode, we need to explicitly start them
if not self.docker_service.use_async:
await self.docker_service.start_container(container_info.container_id)
session.container_id = container_info.container_id
session.status = "running"
# Update in-memory cache
self.sessions[session.session_id] = session
# Update database if using database storage
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session.session_id,
{
"container_id": container_info.container_id,
"status": "running",
},
)
except Exception as e:
logger.error(
"Failed to update session in database",
extra={"session_id": session.session_id, "error": str(e)},
)
logger.info(
"Container started successfully",
extra={
"session_id": session.session_id,
"container_name": session.container_name,
"container_id": container_info.container_id,
"port": session.port,
},
)
except Exception as e:
session.status = "error"
self._save_sessions()
logger.error(f"Failed to start container {session.container_name}: {e}")
async def _start_container_sync(self, session: SessionData):
"""Start the OpenCode container using Docker service (sync mode)"""
try:
# Get and validate resource limits
resource_limits = get_resource_limits()
logger.info(
f"Starting container {session.container_name} with resource limits: memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}"
)
# Create container using Docker service
container_info = await self.docker_service.create_container(
name=session.container_name,
image=CONTAINER_IMAGE,
volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}},
ports={"8080": session.port},
environment={
"MCP_SERVER": os.getenv("MCP_SERVER", ""),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
"GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
# Secure authentication for OpenCode server
"OPENCODE_SERVER_PASSWORD": session.auth_token or "",
"SESSION_AUTH_TOKEN": session.auth_token or "",
"SESSION_ID": session.session_id,
},
network_mode="bridge",
# Apply resource limits to prevent resource exhaustion
mem_limit=resource_limits.memory_limit,
cpu_quota=resource_limits.cpu_quota,
cpu_period=resource_limits.cpu_period,
# Additional security and resource constraints
tmpfs={
"/tmp": "rw,noexec,nosuid,size=100m",
"/var/tmp": "rw,noexec,nosuid,size=50m",
},
)
session.container_id = container_info.container_id
session.status = "running"
# Update in-memory cache
self.sessions[session.session_id] = session
# Update database if using database storage
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session.session_id,
{
"container_id": container_info.container_id,
"status": "running",
},
)
except Exception as e:
logger.error(
"Failed to update session in database",
extra={"session_id": session.session_id, "error": str(e)},
)
logger.info(
"Container started successfully",
extra={
"session_id": session.session_id,
"container_name": session.container_name,
"container_id": container_info.container_id,
"port": session.port,
},
)
except Exception as e:
session.status = "error"
self._save_sessions()
logger.error(f"Failed to start container {session.container_name}: {e}")
async def get_session(self, session_id: str) -> Optional[SessionData]:
"""Get session information"""
# Check in-memory cache first
session = self.sessions.get(session_id)
if session:
session.last_accessed = datetime.now()
# Update database if using database storage
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session_id, {"last_accessed": datetime.now()}
)
except Exception as e:
logger.warning(
"Failed to update session access time in database",
extra={"session_id": session_id, "error": str(e)},
)
return session
# If not in cache and using database, try to load from database
if USE_DATABASE_STORAGE:
try:
db_session = await SessionModel.get_session(session_id)
if db_session:
# Convert to SessionData and cache it
session_data = SessionData(**db_session)
self.sessions[session_id] = session_data
logger.debug(
"Session loaded from database", extra={"session_id": session_id}
)
return session_data
except Exception as e:
logger.error(
"Failed to load session from database",
extra={"session_id": session_id, "error": str(e)},
)
return None
async def list_sessions(self) -> List[SessionData]:
"""List all sessions"""
return list(self.sessions.values())
async def list_containers_async(self, all: bool = False) -> List:
"""List containers asynchronously"""
return await self.docker_service.list_containers(all=all)
async def cleanup_expired_sessions(self):
"""Clean up expired sessions and their containers"""
now = datetime.now()
expired_sessions = []
for session_id, session in self.sessions.items():
# Check if session has expired
if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES):
expired_sessions.append(session_id)
# Stop and remove container
try:
await self.docker_service.stop_container(
session.container_name, timeout=10
)
await self.docker_service.remove_container(session.container_name)
logger.info(f"Cleaned up container {session.container_name}")
except Exception as e:
logger.error(
f"Error cleaning up container {session.container_name}: {e}"
)
# Remove session directory
try:
import shutil
shutil.rmtree(session.host_dir)
logger.info(f"Removed session directory {session.host_dir}")
except OSError as e:
logger.error(
f"Error removing session directory {session.host_dir}: {e}"
)
for session_id in expired_sessions:
del self.sessions[session_id]
if expired_sessions:
self._save_sessions()
logger.info(f"Cleaned up {len(expired_sessions)} expired sessions")
# Also cleanup expired authentication tokens
expired_tokens = cleanup_expired_auth_tokens()
if expired_tokens > 0:
logger.info(f"Cleaned up {expired_tokens} expired authentication tokens")
# Global session manager instance
session_manager = SessionManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
global USE_DATABASE_STORAGE # Declare global at function start
# Startup
logger.info("Starting Session Management Service")
# Initialize HTTP connection pool
await init_http_pool()
# Initialize database if using database storage
if USE_DATABASE_STORAGE:
try:
await init_database()
await run_migrations()
# Load active sessions from database
await session_manager._load_sessions_from_database()
logger.info("Database initialized and sessions loaded")
except Exception as e:
logger.error("Database initialization failed", extra={"error": str(e)})
if USE_DATABASE_STORAGE:
logger.warning("Falling back to JSON file storage")
USE_DATABASE_STORAGE = False
session_manager._load_sessions_from_file()
# Start container health monitoring
try:
docker_client = None
if USE_ASYNC_DOCKER:
from async_docker_client import get_async_docker_client
# Create a client instance for health monitoring
async with get_async_docker_client() as client:
docker_client = client._docker if hasattr(client, "_docker") else None
else:
docker_client = session_manager.docker_client
await start_container_health_monitoring(session_manager, docker_client)
logger.info("Container health monitoring started")
except Exception as e:
logger.error(
"Failed to start container health monitoring", extra={"error": str(e)}
)
# Start cleanup task
async def cleanup_task():
while True:
await session_manager.cleanup_expired_sessions()
await asyncio.sleep(300) # Run every 5 minutes
cleanup_coro = asyncio.create_task(cleanup_task())
yield
# Shutdown
logger.info("Shutting down Session Management Service")
cleanup_coro.cancel()
# Shutdown HTTP connection pool
await shutdown_http_pool()
# Shutdown container health monitoring
try:
await stop_container_health_monitoring()
logger.info("Container health monitoring stopped")
except Exception as e:
logger.error(
"Error stopping container health monitoring", extra={"error": str(e)}
)
# Shutdown database
if USE_DATABASE_STORAGE:
await shutdown_database()
app = FastAPI(
title="Lovdata Chat Session Manager",
description="Manages isolated OpenCode containers for Norwegian legal research sessions",
version="1.0.0",
lifespan=lifespan,
)
@app.post("/sessions", response_model=SessionData)
async def create_session(request: Request):
"""Create a new session with dedicated container"""
start_time = time.time()
with RequestContext():
try:
log_request("POST", "/sessions", 200, 0, operation="create_session_start")
session = await session_manager.create_session()
duration_ms = (time.time() - start_time) * 1000
log_session_operation(
session.session_id, "created", duration_ms=duration_ms
)
log_performance(
"create_session", duration_ms, session_id=session.session_id
)
return session
except HTTPException as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
"POST", "/sessions", e.status_code, duration_ms, error=str(e.detail)
)
raise
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
log_request("POST", "/sessions", 500, duration_ms, error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to create session: {str(e)}"
)
@app.get("/sessions/{session_id}", response_model=SessionData)
async def get_session(session_id: str, request: Request):
"""Get session information"""
start_time = time.time()
with RequestContext():
try:
log_request(
"GET", f"/sessions/{session_id}", 200, 0, operation="get_session_start"
)
session = await session_manager.get_session(session_id)
if not session:
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET",
f"/sessions/{session_id}",
404,
duration_ms,
session_id=session_id,
)
raise HTTPException(status_code=404, detail="Session not found")
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET",
f"/sessions/{session_id}",
200,
duration_ms,
session_id=session_id,
)
log_session_operation(session_id, "accessed", duration_ms=duration_ms)
return session
except HTTPException:
raise
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET", f"/sessions/{session_id}", 500, duration_ms, error=str(e)
)
raise HTTPException(
status_code=500, detail=f"Failed to get session: {str(e)}"
)
@app.get("/sessions", response_model=List[SessionData])
async def list_sessions():
"""List all active sessions"""
return await session_manager.list_sessions()
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str, background_tasks: BackgroundTasks):
"""Delete a session and its container"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
# Revoke authentication token
revoke_session_auth_token(session_id)
# Schedule cleanup
background_tasks.add_task(session_manager.cleanup_expired_sessions)
# Remove from sessions immediately
del session_manager.sessions[session_id]
session_manager._save_sessions()
return {"message": f"Session {session_id} scheduled for deletion"}
@app.post("/cleanup")
async def trigger_cleanup():
"""Manually trigger cleanup of expired sessions"""
await session_manager.cleanup_expired_sessions()
return {"message": "Cleanup completed"}
@app.get("/sessions/{session_id}/auth")
async def get_session_auth_info(session_id: str):
"""Get authentication information for a session"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
auth_info = get_session_auth_info(session_id)
if not auth_info:
raise HTTPException(status_code=404, detail="Authentication info not found")
return {
"session_id": session_id,
"auth_info": auth_info,
"has_token": session.auth_token is not None,
}
@app.post("/sessions/{session_id}/auth/rotate")
async def rotate_session_token(session_id: str):
"""Rotate the authentication token for a session"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
from session_auth import _session_token_manager
new_token = _session_token_manager.rotate_session_token(session_id)
if not new_token:
raise HTTPException(status_code=404, detail="Failed to rotate token")
# Update session with new token
session.auth_token = new_token
session_manager._save_sessions()
return {
"session_id": session_id,
"new_token": new_token,
"message": "Token rotated successfully",
}
@app.get("/auth/sessions")
async def list_authenticated_sessions():
"""List all authenticated sessions"""
sessions = list_active_auth_sessions()
return {
"active_auth_sessions": len(sessions),
"sessions": sessions,
}
@app.get("/health/container")
async def get_container_health():
"""Get detailed container health statistics"""
stats = get_container_health_stats()
return stats
@app.get("/health/container/{session_id}")
async def get_session_container_health(session_id: str):
"""Get container health information for a specific session"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
stats = get_container_health_stats(session_id)
history = get_container_health_history(session_id, limit=20)
return {
"session_id": session_id,
"container_id": session.container_id,
"stats": stats.get(f"session_{session_id}", {}),
"recent_history": history,
}
@app.api_route(
"/session/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"],
)
async def proxy_to_session(request: Request, session_id: str, path: str):
"""Proxy requests to session containers based on session ID in URL"""
start_time = time.time()
with RequestContext():
log_request(
request.method,
f"/session/{session_id}/{path}",
200,
0,
operation="proxy_start",
session_id=session_id,
)
session = await session_manager.get_session(session_id)
if not session or session.status != "running":
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
404,
duration_ms,
session_id=session_id,
error="Session not found or not running",
)
raise HTTPException(
status_code=404, detail="Session not found or not running"
)
# Dynamically detect the Docker host IP from container perspective
# This supports multiple Docker environments (Docker Desktop, Linux, cloud, etc.)
try:
host_ip = await async_get_host_ip()
logger.info(f"Using detected host IP for proxy: {host_ip}")
except RuntimeError as e:
# Fallback to environment variable or common defaults
host_ip = os.getenv("HOST_IP")
if not host_ip:
# Try common Docker gateway IPs as final fallback
common_gateways = ["172.17.0.1", "192.168.65.1", "host.docker.internal"]
for gateway in common_gateways:
try:
# Test connectivity to gateway
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1.0)
result = sock.connect_ex((gateway, 22))
sock.close()
if result == 0:
host_ip = gateway
logger.warning(f"Using fallback gateway IP: {host_ip}")
break
except Exception:
continue
else:
logger.error(f"Host IP detection failed: {e}")
raise HTTPException(
status_code=500,
detail="Could not determine Docker host IP for proxy routing",
)
container_url = f"http://{host_ip}:{session.port}"
# Prepare the request URL
url = f"{container_url}/{path}"
if request.url.query:
url += f"?{request.url.query}"
# Get request body
body = await request.body()
# Prepare headers (exclude host header to avoid conflicts)
headers = dict(request.headers)
headers.pop("host", None)
# Add authentication headers for the OpenCode server
if session.auth_token:
headers["Authorization"] = f"Bearer {session.auth_token}"
headers["X-Session-Token"] = session.auth_token
headers["X-Session-ID"] = session.session_id
# Make the proxy request using the connection pool
try:
log_session_operation(
session_id, "proxy_request", method=request.method, path=path
)
response = await make_http_request(
method=request.method,
url=url,
headers=headers,
content=body,
)
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
response.status_code,
duration_ms,
session_id=session_id,
operation="proxy_complete",
)
# Log security event for proxy access
log_security_event(
"proxy_access",
"info",
session_id=session_id,
method=request.method,
path=path,
status_code=response.status_code,
)
# Return the response
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
)
except httpx.TimeoutException as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
504,
duration_ms,
session_id=session_id,
error="timeout",
)
log_security_event(
"proxy_timeout",
"warning",
session_id=session_id,
method=request.method,
path=path,
error=str(e),
)
raise HTTPException(
status_code=504, detail="Request to session container timed out"
)
except httpx.RequestError as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
502,
duration_ms,
session_id=session_id,
error=str(e),
)
log_security_event(
"proxy_connection_error",
"error",
session_id=session_id,
method=request.method,
path=path,
error=str(e),
)
raise HTTPException(
status_code=502,
detail=f"Failed to connect to session container: {str(e)}",
)
@app.get("/health")
async def health_check():
"""Health check endpoint with comprehensive resource monitoring"""
docker_ok = False
host_ip_ok = False
detected_host_ip = None
resource_status = {}
http_pool_stats = {}
try:
# Check Docker connectivity
docker_ok = await session_manager.docker_service.ping()
except Exception as e:
logger.warning(f"Docker health check failed: {e}")
docker_ok = False
try:
# Check host IP detection
detected_host_ip = await async_get_host_ip()
host_ip_ok = True
except Exception as e:
logger.warning(f"Host IP detection failed: {e}")
host_ip_ok = False
try:
# Check system resource status
resource_status = check_system_resources()
except Exception as e:
logger.warning("Resource monitoring failed", extra={"error": str(e)})
resource_status = {"error": str(e)}
try:
# Get HTTP connection pool statistics
http_pool_stats = await get_connection_pool_stats()
except Exception as e:
logger.warning("HTTP pool stats failed", extra={"error": str(e)})
http_pool_stats = {"error": str(e)}
# Check database status if using database storage
database_status = {}
if USE_DATABASE_STORAGE:
try:
database_status = await get_database_stats()
except Exception as e:
logger.warning("Database stats failed", extra={"error": str(e)})
database_status = {"status": "error", "error": str(e)}
# Get container health statistics
container_health_stats = {}
try:
container_health_stats = get_container_health_stats()
except Exception as e:
logger.warning("Container health stats failed", extra={"error": str(e)})
container_health_stats = {"error": str(e)}
# Determine overall health status
resource_alerts = (
resource_status.get("alerts", []) if isinstance(resource_status, dict) else []
)
critical_alerts = [
a
for a in resource_alerts
if isinstance(a, dict) and a.get("level") == "critical"
]
# Check HTTP pool health
http_healthy = (
http_pool_stats.get("status") == "healthy"
if isinstance(http_pool_stats, dict)
else False
)
if critical_alerts or not (docker_ok and host_ip_ok and http_healthy):
status = "unhealthy"
elif resource_alerts:
status = "degraded"
else:
status = "healthy"
health_data = {
"status": status,
"docker": docker_ok,
"docker_mode": "async" if USE_ASYNC_DOCKER else "sync",
"host_ip_detection": host_ip_ok,
"detected_host_ip": detected_host_ip,
"http_connection_pool": http_pool_stats,
"storage_backend": "database" if USE_DATABASE_STORAGE else "json_file",
"active_sessions": len(
[s for s in session_manager.sessions.values() if s.status == "running"]
),
"resource_limits": {
"memory_limit": CONTAINER_MEMORY_LIMIT,
"cpu_quota": CONTAINER_CPU_QUOTA,
"cpu_period": CONTAINER_CPU_PERIOD,
"max_concurrent_sessions": MAX_CONCURRENT_SESSIONS,
},
"system_resources": resource_status.get("system_resources", {})
if isinstance(resource_status, dict)
else {},
"resource_alerts": resource_alerts,
"timestamp": datetime.now().isoformat(),
}
# Add database information if using database storage
if USE_DATABASE_STORAGE and database_status:
health_data["database"] = database_status
# Add container health information
if container_health_stats:
health_data["container_health"] = container_health_stats
return health_data
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)