diff --git a/session-manager/app.py b/session-manager/app.py new file mode 100644 index 0000000..fcd2d56 --- /dev/null +++ b/session-manager/app.py @@ -0,0 +1,96 @@ +import asyncio +from contextlib import asynccontextmanager + +from fastapi import FastAPI + +from config import USE_ASYNC_DOCKER, USE_DATABASE_STORAGE +from session_manager import session_manager +from async_docker_client import get_async_docker_client +from http_pool import init_http_pool, shutdown_http_pool +from database import init_database, shutdown_database, run_migrations +from container_health import ( + start_container_health_monitoring, + stop_container_health_monitoring, +) +from logging_config import get_logger, init_logging +from routes import sessions_router, auth_router, health_router, proxy_router + +init_logging() +logger = get_logger(__name__) + +_use_database_storage = USE_DATABASE_STORAGE + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global _use_database_storage + + logger.info("Starting Session Management Service") + + await init_http_pool() + + if _use_database_storage: + try: + await init_database() + await run_migrations() + await session_manager._load_sessions_from_database() + logger.info("Database initialized and sessions loaded") + except Exception as e: + logger.error("Database initialization failed", extra={"error": str(e)}) + if _use_database_storage: + logger.warning("Falling back to JSON file storage") + _use_database_storage = False + session_manager._load_sessions_from_file() + + try: + docker_client = None + if USE_ASYNC_DOCKER: + async with get_async_docker_client() as client: + docker_client = client._docker if hasattr(client, "_docker") else None + else: + docker_client = session_manager.docker_service + + await start_container_health_monitoring(session_manager, docker_client) + logger.info("Container health monitoring started") + except Exception as e: + logger.error( + "Failed to start container health monitoring", extra={"error": str(e)} + ) + + async def cleanup_task(): + while True: + await session_manager.cleanup_expired_sessions() + await asyncio.sleep(300) + + cleanup_coro = asyncio.create_task(cleanup_task()) + + yield + + logger.info("Shutting down Session Management Service") + cleanup_coro.cancel() + + await shutdown_http_pool() + + try: + await stop_container_health_monitoring() + logger.info("Container health monitoring stopped") + except Exception as e: + logger.error( + "Error stopping container health monitoring", extra={"error": str(e)} + ) + + if _use_database_storage: + await shutdown_database() + + +app = FastAPI( + title="Lovdata Chat Session Manager", + description="Manages isolated OpenCode containers for Norwegian legal research sessions", + version="1.0.0", + lifespan=lifespan, +) + +app.include_router(sessions_router) +app.include_router(auth_router) +app.include_router(health_router) +app.include_router(proxy_router) diff --git a/session-manager/config.py b/session-manager/config.py new file mode 100644 index 0000000..e2abee6 --- /dev/null +++ b/session-manager/config.py @@ -0,0 +1,47 @@ +""" +Configuration module for Session Management Service + +Centralized configuration loading from environment variables with defaults. +""" + +import os +from pathlib import Path + + +# Session storage configuration +SESSIONS_DIR = Path("/app/sessions") +SESSIONS_FILE = Path("/app/sessions/sessions.json") + +# Container configuration +CONTAINER_IMAGE = os.getenv("CONTAINER_IMAGE", "lovdata-opencode:latest") + +# Resource limits - configurable via environment variables with defaults +CONTAINER_MEMORY_LIMIT = os.getenv( + "CONTAINER_MEMORY_LIMIT", "4g" +) # Memory limit per container +CONTAINER_CPU_QUOTA = int( + os.getenv("CONTAINER_CPU_QUOTA", "100000") +) # CPU quota (100000 = 1 core) +CONTAINER_CPU_PERIOD = int( + os.getenv("CONTAINER_CPU_PERIOD", "100000") +) # CPU period (microseconds) + +# Session management +MAX_CONCURRENT_SESSIONS = int( + os.getenv("MAX_CONCURRENT_SESSIONS", "3") +) # Max concurrent sessions +SESSION_TIMEOUT_MINUTES = int( + os.getenv("SESSION_TIMEOUT_MINUTES", "60") +) # Auto-cleanup timeout + +# Resource monitoring thresholds +MEMORY_WARNING_THRESHOLD = float( + os.getenv("MEMORY_WARNING_THRESHOLD", "0.8") +) # 80% memory usage +CPU_WARNING_THRESHOLD = float( + os.getenv("CPU_WARNING_THRESHOLD", "0.9") +) # 90% CPU usage + +# Feature flags +USE_ASYNC_DOCKER = os.getenv("USE_ASYNC_DOCKER", "true").lower() == "true" +USE_DATABASE_STORAGE = os.getenv("USE_DATABASE_STORAGE", "true").lower() == "true" diff --git a/session-manager/main.py b/session-manager/main.py index f3e850e..f36aab5 100644 --- a/session-manager/main.py +++ b/session-manager/main.py @@ -1,1292 +1,13 @@ """ -Session Management Service for Lovdata Chat +Session Management Service for Lovdata Chat - Entry Point -This service manages the lifecycle of OpenCode containers for individual user sessions. -Each session gets its own isolated container with a dedicated working directory. +Run with: python main.py +Or: uvicorn app:app --reload --host 0.0.0.0 --port 8000 """ -import os -import re -import uuid -import json -import asyncio -import logging -import time -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, Optional, List -from contextlib import asynccontextmanager -from urllib.parse import urlparse - -import docker -from docker.errors import DockerException, NotFound -from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Response -from fastapi.responses import JSONResponse, StreamingResponse -from pydantic import BaseModel import uvicorn -import httpx - -# Import host IP detection utility -from host_ip_detector import async_get_host_ip, reset_host_ip_cache - -# Import resource management utilities -from resource_manager import ( - get_resource_limits, - check_system_resources, - should_throttle_sessions, - ResourceLimits, - ResourceValidator, -) - -# Import structured logging -from logging_config import ( - get_logger, - RequestContext, - log_performance, - log_request, - log_session_operation, - log_security_event, - init_logging, -) - -# Import async Docker client -from async_docker_client import ( - get_async_docker_client, - async_create_container, - async_start_container, - async_stop_container, - async_remove_container, - async_get_container, - async_list_containers, - async_docker_ping, -) - -# Import HTTP connection pool -from http_pool import ( - make_http_request, - get_connection_pool_stats, - init_http_pool, - shutdown_http_pool, -) - -# Import session authentication -from session_auth import ( - generate_session_auth_token, - validate_session_auth_token, - revoke_session_auth_token, - rotate_session_auth_token, - cleanup_expired_auth_tokens, - get_session_auth_info, - get_active_auth_sessions_count, - list_active_auth_sessions, -) - -# Import database layer -from database import ( - init_database, - shutdown_database, - SessionModel, - get_database_stats, - run_migrations, -) - -# Import container health monitoring -from container_health import ( - start_container_health_monitoring, - stop_container_health_monitoring, - get_container_health_stats, - get_container_health_history, -) - -# Import Docker service abstraction -from docker_service import DockerService, DockerOperationError - -# Import database layer -from database import ( - init_database, - shutdown_database, - SessionModel, - get_database_stats, - run_migrations, -) - -# Initialize structured logging -init_logging() - -# Get configured logger -logger = get_logger(__name__) - -# Configuration for async operations -USE_ASYNC_DOCKER = os.getenv("USE_ASYNC_DOCKER", "true").lower() == "true" - -# Session storage configuration -USE_DATABASE_STORAGE = os.getenv("USE_DATABASE_STORAGE", "true").lower() == "true" - - -# Configuration - Resource limits are now configurable and enforced -SESSIONS_DIR = Path("/app/sessions") -SESSIONS_FILE = Path("/app/sessions/sessions.json") -CONTAINER_IMAGE = os.getenv("CONTAINER_IMAGE", "lovdata-opencode:latest") - -# Resource limits - configurable via environment variables with defaults -CONTAINER_MEMORY_LIMIT = os.getenv( - "CONTAINER_MEMORY_LIMIT", "4g" -) # Memory limit per container -CONTAINER_CPU_QUOTA = int( - os.getenv("CONTAINER_CPU_QUOTA", "100000") -) # CPU quota (100000 = 1 core) -CONTAINER_CPU_PERIOD = int( - os.getenv("CONTAINER_CPU_PERIOD", "100000") -) # CPU period (microseconds) - -# Session management -MAX_CONCURRENT_SESSIONS = int( - os.getenv("MAX_CONCURRENT_SESSIONS", "3") -) # Max concurrent sessions -SESSION_TIMEOUT_MINUTES = int( - os.getenv("SESSION_TIMEOUT_MINUTES", "60") -) # Auto-cleanup timeout - -# Resource monitoring thresholds -MEMORY_WARNING_THRESHOLD = float( - os.getenv("MEMORY_WARNING_THRESHOLD", "0.8") -) # 80% memory usage -CPU_WARNING_THRESHOLD = float( - os.getenv("CPU_WARNING_THRESHOLD", "0.9") -) # 90% CPU usage - - -class SessionData(BaseModel): - session_id: str - container_name: str - container_id: Optional[str] = None - host_dir: str - port: Optional[int] = None - auth_token: Optional[str] = None # Authentication token for the session - created_at: datetime - last_accessed: datetime - status: str = "creating" # creating, running, stopped, error - - -class SessionManager: - def __init__(self, docker_service: Optional[DockerService] = None): - # Use injected Docker service or create default - if docker_service: - self.docker_service = docker_service - else: - self.docker_service = DockerService(use_async=USE_ASYNC_DOCKER) - - # Initialize session storage - if USE_DATABASE_STORAGE: - # Use database backend - self.sessions: Dict[ - str, SessionData - ] = {} # Keep in-memory cache for performance - logger.info("Session storage initialized", extra={"backend": "database"}) - else: - # Use JSON file backend (legacy) - self.sessions: Dict[str, SessionData] = {} - self._load_sessions_from_file() - logger.info("Session storage initialized", extra={"backend": "json_file"}) - - # Initialize container health monitoring - from container_health import get_container_health_monitor - - self.health_monitor = get_container_health_monitor() - # Dependencies will be set when health monitoring starts - - logger.info( - "SessionManager initialized", - extra={ - "docker_service_type": type(self.docker_service).__name__, - "storage_backend": "database" if USE_DATABASE_STORAGE else "json_file", - }, - ) - - def _load_sessions_from_file(self): - """Load session data from JSON file (legacy method)""" - if SESSIONS_FILE.exists(): - try: - with open(SESSIONS_FILE, "r") as f: - data = json.load(f) - for session_id, session_dict in data.items(): - # Convert datetime strings back to datetime objects - session_dict["created_at"] = datetime.fromisoformat( - session_dict["created_at"] - ) - session_dict["last_accessed"] = datetime.fromisoformat( - session_dict["last_accessed"] - ) - self.sessions[session_id] = SessionData(**session_dict) - logger.info( - "Sessions loaded from JSON file", - extra={"count": len(self.sessions)}, - ) - except (json.JSONDecodeError, KeyError) as e: - logger.warning("Could not load sessions file", extra={"error": str(e)}) - self.sessions = {} - - async def _load_sessions_from_database(self): - """Load active sessions from database into memory cache""" - try: - # Load only running/creating sessions to avoid loading too much data - db_sessions = await SessionModel.get_sessions_by_status("running") - db_sessions.extend(await SessionModel.get_sessions_by_status("creating")) - - self.sessions = {} - for session_dict in db_sessions: - # Convert to SessionData model - session_data = SessionData(**session_dict) - self.sessions[session_dict["session_id"]] = session_data - - logger.info( - "Sessions loaded from database", extra={"count": len(self.sessions)} - ) - except Exception as e: - logger.error( - "Failed to load sessions from database", extra={"error": str(e)} - ) - self.sessions = {} - - def _save_sessions(self): - """Save session data to persistent storage""" - SESSIONS_DIR.mkdir(exist_ok=True) - data = {} - for session_id, session in self.sessions.items(): - data[session_id] = session.dict() - - with open(SESSIONS_FILE, "w") as f: - json.dump(data, f, indent=2, default=str) - - def _generate_session_id(self) -> str: - """Generate a unique session ID""" - return str(uuid.uuid4()).replace("-", "")[:16] - - def _get_available_port(self) -> int: - """Find an available port for the container""" - used_ports = {s.port for s in self.sessions.values() if s.port} - port = 8081 # Start from 8081 to avoid conflicts - while port in used_ports: - port += 1 - return port - - def _check_container_limits(self) -> bool: - """Check if we're within concurrent session limits""" - active_sessions = sum( - 1 for s in self.sessions.values() if s.status in ["creating", "running"] - ) - return active_sessions < MAX_CONCURRENT_SESSIONS - - async def _async_check_container_limits(self) -> bool: - """Async version of container limits check""" - return self._check_container_limits() - - async def create_session(self) -> SessionData: - """Create a new OpenCode session with dedicated container""" - # Check concurrent session limits - if USE_ASYNC_DOCKER: - limits_ok = await self._async_check_container_limits() - else: - limits_ok = self._check_container_limits() - - if not limits_ok: - raise HTTPException( - status_code=429, - detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached", - ) - - # Check system resource limits - should_throttle, reason = should_throttle_sessions() - if should_throttle: - raise HTTPException( - status_code=503, - detail=f"System resource constraints prevent new sessions: {reason}", - ) - - session_id = self._generate_session_id() - container_name = f"opencode-{session_id}" - host_dir = str(SESSIONS_DIR / session_id) - port = self._get_available_port() - - # Create host directory - Path(host_dir).mkdir(parents=True, exist_ok=True) - - # Generate authentication token for this session - auth_token = generate_session_auth_token(session_id) - - session = SessionData( - session_id=session_id, - container_name=container_name, - host_dir=host_dir, - port=port, - auth_token=auth_token, - created_at=datetime.now(), - last_accessed=datetime.now(), - status="creating", - ) - - # Store in memory cache - self.sessions[session_id] = session - - # Persist to database if using database storage - if USE_DATABASE_STORAGE: - try: - await SessionModel.create_session( - { - "session_id": session_id, - "container_name": container_name, - "host_dir": host_dir, - "port": port, - "auth_token": auth_token, - "status": "creating", - } - ) - logger.info( - "Session created in database", extra={"session_id": session_id} - ) - except Exception as e: - logger.error( - "Failed to create session in database", - extra={"session_id": session_id, "error": str(e)}, - ) - # Continue with in-memory storage as fallback - - # Start container asynchronously - if USE_ASYNC_DOCKER: - asyncio.create_task(self._start_container_async(session)) - else: - asyncio.create_task(self._start_container_sync(session)) - - return session - - async def _start_container_async(self, session: SessionData): - """Start the OpenCode container asynchronously using aiodocker""" - try: - # Get and validate resource limits - resource_limits = get_resource_limits() - - logger.info( - f"Starting container {session.container_name} with resource limits: memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}" - ) - - # Create container using Docker service - container_info = await self.docker_service.create_container( - name=session.container_name, - image=CONTAINER_IMAGE, - volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}}, - ports={"8080": session.port}, - environment={ - "MCP_SERVER": os.getenv("MCP_SERVER", ""), - "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), - "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), - "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), - # Auth disabled for development - will be added later - # "OPENCODE_SERVER_PASSWORD": session.auth_token or "", - "SESSION_AUTH_TOKEN": session.auth_token or "", - "SESSION_ID": session.session_id, - }, - network_mode="bridge", - # Apply resource limits to prevent resource exhaustion - mem_limit=resource_limits.memory_limit, - cpu_quota=resource_limits.cpu_quota, - cpu_period=resource_limits.cpu_period, - # Additional security and resource constraints - tmpfs={ - "/tmp": "rw,noexec,nosuid,size=100m", - "/var/tmp": "rw,noexec,nosuid,size=50m", - }, - ) - - # Containers need to be explicitly started after creation - await self.docker_service.start_container(container_info.container_id) - - session.container_id = container_info.container_id - session.status = "running" - - # Update in-memory cache - self.sessions[session.session_id] = session - - # Update database if using database storage - if USE_DATABASE_STORAGE: - try: - await SessionModel.update_session( - session.session_id, - { - "container_id": container_info.container_id, - "status": "running", - }, - ) - except Exception as e: - logger.error( - "Failed to update session in database", - extra={"session_id": session.session_id, "error": str(e)}, - ) - - logger.info( - "Container started successfully", - extra={ - "session_id": session.session_id, - "container_name": session.container_name, - "container_id": container_info.container_id, - "port": session.port, - }, - ) - - except Exception as e: - session.status = "error" - self._save_sessions() - logger.error(f"Failed to start container {session.container_name}: {e}") - - async def _start_container_sync(self, session: SessionData): - """Start the OpenCode container using Docker service (sync mode)""" - try: - # Get and validate resource limits - resource_limits = get_resource_limits() - - logger.info( - f"Starting container {session.container_name} with resource limits: memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}" - ) - - # Create container using Docker service - container_info = await self.docker_service.create_container( - name=session.container_name, - image=CONTAINER_IMAGE, - volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}}, - ports={"8080": session.port}, - environment={ - "MCP_SERVER": os.getenv("MCP_SERVER", ""), - "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), - "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), - "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), - # Auth disabled for development - will be added later - # "OPENCODE_SERVER_PASSWORD": session.auth_token or "", - "SESSION_AUTH_TOKEN": session.auth_token or "", - "SESSION_ID": session.session_id, - }, - network_mode="bridge", - # Apply resource limits to prevent resource exhaustion - mem_limit=resource_limits.memory_limit, - cpu_quota=resource_limits.cpu_quota, - cpu_period=resource_limits.cpu_period, - # Additional security and resource constraints - tmpfs={ - "/tmp": "rw,noexec,nosuid,size=100m", - "/var/tmp": "rw,noexec,nosuid,size=50m", - }, - ) - - session.container_id = container_info.container_id - session.status = "running" - - # Update in-memory cache - self.sessions[session.session_id] = session - - # Update database if using database storage - if USE_DATABASE_STORAGE: - try: - await SessionModel.update_session( - session.session_id, - { - "container_id": container_info.container_id, - "status": "running", - }, - ) - except Exception as e: - logger.error( - "Failed to update session in database", - extra={"session_id": session.session_id, "error": str(e)}, - ) - - logger.info( - "Container started successfully", - extra={ - "session_id": session.session_id, - "container_name": session.container_name, - "container_id": container_info.container_id, - "port": session.port, - }, - ) - - except Exception as e: - session.status = "error" - self._save_sessions() - logger.error(f"Failed to start container {session.container_name}: {e}") - - async def get_session(self, session_id: str) -> Optional[SessionData]: - """Get session information""" - # Check in-memory cache first - session = self.sessions.get(session_id) - if session: - session.last_accessed = datetime.now() - # Update database if using database storage - if USE_DATABASE_STORAGE: - try: - await SessionModel.update_session( - session_id, {"last_accessed": datetime.now()} - ) - except Exception as e: - logger.warning( - "Failed to update session access time in database", - extra={"session_id": session_id, "error": str(e)}, - ) - return session - - # If not in cache and using database, try to load from database - if USE_DATABASE_STORAGE: - try: - db_session = await SessionModel.get_session(session_id) - if db_session: - # Convert to SessionData and cache it - session_data = SessionData(**db_session) - self.sessions[session_id] = session_data - logger.debug( - "Session loaded from database", extra={"session_id": session_id} - ) - return session_data - except Exception as e: - logger.error( - "Failed to load session from database", - extra={"session_id": session_id, "error": str(e)}, - ) - - return None - - async def list_sessions(self) -> List[SessionData]: - """List all sessions""" - return list(self.sessions.values()) - - async def list_containers_async(self, all: bool = False) -> List: - """List containers asynchronously""" - return await self.docker_service.list_containers(all=all) - - async def cleanup_expired_sessions(self): - """Clean up expired sessions and their containers""" - now = datetime.now() - expired_sessions = [] - - for session_id, session in self.sessions.items(): - # Check if session has expired - if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES): - expired_sessions.append(session_id) - - # Stop and remove container - try: - await self.docker_service.stop_container( - session.container_name, timeout=10 - ) - await self.docker_service.remove_container(session.container_name) - logger.info(f"Cleaned up container {session.container_name}") - except Exception as e: - logger.error( - f"Error cleaning up container {session.container_name}: {e}" - ) - - # Remove session directory - try: - import shutil - - shutil.rmtree(session.host_dir) - logger.info(f"Removed session directory {session.host_dir}") - except OSError as e: - logger.error( - f"Error removing session directory {session.host_dir}: {e}" - ) - - for session_id in expired_sessions: - del self.sessions[session_id] - - if expired_sessions: - self._save_sessions() - logger.info(f"Cleaned up {len(expired_sessions)} expired sessions") - - # Also cleanup expired authentication tokens - expired_tokens = cleanup_expired_auth_tokens() - if expired_tokens > 0: - logger.info(f"Cleaned up {expired_tokens} expired authentication tokens") - - -# Global session manager instance -session_manager = SessionManager() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager""" - global USE_DATABASE_STORAGE # Declare global at function start - - # Startup - logger.info("Starting Session Management Service") - - # Initialize HTTP connection pool - await init_http_pool() - - # Initialize database if using database storage - if USE_DATABASE_STORAGE: - try: - await init_database() - await run_migrations() - # Load active sessions from database - await session_manager._load_sessions_from_database() - logger.info("Database initialized and sessions loaded") - except Exception as e: - logger.error("Database initialization failed", extra={"error": str(e)}) - if USE_DATABASE_STORAGE: - logger.warning("Falling back to JSON file storage") - USE_DATABASE_STORAGE = False - session_manager._load_sessions_from_file() - - # Start container health monitoring - try: - docker_client = None - if USE_ASYNC_DOCKER: - from async_docker_client import get_async_docker_client - - # Create a client instance for health monitoring - async with get_async_docker_client() as client: - docker_client = client._docker if hasattr(client, "_docker") else None - else: - docker_client = session_manager.docker_client - - await start_container_health_monitoring(session_manager, docker_client) - logger.info("Container health monitoring started") - except Exception as e: - logger.error( - "Failed to start container health monitoring", extra={"error": str(e)} - ) - - # Start cleanup task - async def cleanup_task(): - while True: - await session_manager.cleanup_expired_sessions() - await asyncio.sleep(300) # Run every 5 minutes - - cleanup_coro = asyncio.create_task(cleanup_task()) - - yield - - # Shutdown - logger.info("Shutting down Session Management Service") - cleanup_coro.cancel() - - # Shutdown HTTP connection pool - await shutdown_http_pool() - - # Shutdown container health monitoring - try: - await stop_container_health_monitoring() - logger.info("Container health monitoring stopped") - except Exception as e: - logger.error( - "Error stopping container health monitoring", extra={"error": str(e)} - ) - - # Shutdown database - if USE_DATABASE_STORAGE: - await shutdown_database() - - -app = FastAPI( - title="Lovdata Chat Session Manager", - description="Manages isolated OpenCode containers for Norwegian legal research sessions", - version="1.0.0", - lifespan=lifespan, -) - - -@app.post("/sessions", response_model=SessionData) -async def create_session(request: Request): - """Create a new session with dedicated container""" - start_time = time.time() - - with RequestContext(): - try: - log_request("POST", "/sessions", 200, 0, operation="create_session_start") - - session = await session_manager.create_session() - - duration_ms = (time.time() - start_time) * 1000 - log_session_operation( - session.session_id, "created", duration_ms=duration_ms - ) - log_performance( - "create_session", duration_ms, session_id=session.session_id - ) - - return session - except HTTPException as e: - duration_ms = (time.time() - start_time) * 1000 - log_request( - "POST", "/sessions", e.status_code, duration_ms, error=str(e.detail) - ) - raise - except Exception as e: - duration_ms = (time.time() - start_time) * 1000 - log_request("POST", "/sessions", 500, duration_ms, error=str(e)) - raise HTTPException( - status_code=500, detail=f"Failed to create session: {str(e)}" - ) - - -@app.get("/sessions/{session_id}", response_model=SessionData) -async def get_session(session_id: str, request: Request): - """Get session information""" - start_time = time.time() - - with RequestContext(): - try: - log_request( - "GET", f"/sessions/{session_id}", 200, 0, operation="get_session_start" - ) - - session = await session_manager.get_session(session_id) - if not session: - duration_ms = (time.time() - start_time) * 1000 - log_request( - "GET", - f"/sessions/{session_id}", - 404, - duration_ms, - session_id=session_id, - ) - raise HTTPException(status_code=404, detail="Session not found") - - duration_ms = (time.time() - start_time) * 1000 - log_request( - "GET", - f"/sessions/{session_id}", - 200, - duration_ms, - session_id=session_id, - ) - log_session_operation(session_id, "accessed", duration_ms=duration_ms) - - return session - except HTTPException: - raise - except Exception as e: - duration_ms = (time.time() - start_time) * 1000 - log_request( - "GET", f"/sessions/{session_id}", 500, duration_ms, error=str(e) - ) - raise HTTPException( - status_code=500, detail=f"Failed to get session: {str(e)}" - ) - - -@app.get("/sessions", response_model=List[SessionData]) -async def list_sessions(): - """List all active sessions""" - return await session_manager.list_sessions() - - -@app.delete("/sessions/{session_id}") -async def delete_session(session_id: str, background_tasks: BackgroundTasks): - """Delete a session and its container""" - session = await session_manager.get_session(session_id) - if not session: - raise HTTPException(status_code=404, detail="Session not found") - - # Revoke authentication token - revoke_session_auth_token(session_id) - - # Schedule cleanup - background_tasks.add_task(session_manager.cleanup_expired_sessions) - - # Remove from sessions immediately - del session_manager.sessions[session_id] - session_manager._save_sessions() - - return {"message": f"Session {session_id} scheduled for deletion"} - - -@app.post("/cleanup") -async def trigger_cleanup(): - """Manually trigger cleanup of expired sessions""" - await session_manager.cleanup_expired_sessions() - return {"message": "Cleanup completed"} - - -@app.get("/sessions/{session_id}/auth") -async def get_session_auth_info(session_id: str): - """Get authentication information for a session""" - session = await session_manager.get_session(session_id) - if not session: - raise HTTPException(status_code=404, detail="Session not found") - - auth_info = get_session_auth_info(session_id) - if not auth_info: - raise HTTPException(status_code=404, detail="Authentication info not found") - - return { - "session_id": session_id, - "auth_info": auth_info, - "has_token": session.auth_token is not None, - } - - -@app.post("/sessions/{session_id}/auth/rotate") -async def rotate_session_token(session_id: str): - """Rotate the authentication token for a session""" - session = await session_manager.get_session(session_id) - if not session: - raise HTTPException(status_code=404, detail="Session not found") - - from session_auth import _session_token_manager - - new_token = _session_token_manager.rotate_session_token(session_id) - if not new_token: - raise HTTPException(status_code=404, detail="Failed to rotate token") - - # Update session with new token - session.auth_token = new_token - session_manager._save_sessions() - - return { - "session_id": session_id, - "new_token": new_token, - "message": "Token rotated successfully", - } - - -@app.get("/auth/sessions") -async def list_authenticated_sessions(): - """List all authenticated sessions""" - sessions = list_active_auth_sessions() - return { - "active_auth_sessions": len(sessions), - "sessions": sessions, - } - - -@app.get("/health/container") -async def get_container_health(): - """Get detailed container health statistics""" - stats = get_container_health_stats() - return stats - - -@app.get("/health/container/{session_id}") -async def get_session_container_health(session_id: str): - """Get container health information for a specific session""" - session = await session_manager.get_session(session_id) - if not session: - raise HTTPException(status_code=404, detail="Session not found") - - stats = get_container_health_stats(session_id) - history = get_container_health_history(session_id, limit=20) - - return { - "session_id": session_id, - "container_id": session.container_id, - "stats": stats.get(f"session_{session_id}", {}), - "recent_history": history, - } - - -# Routes for OpenCode SPA runtime requests -# These paths are hardcoded in the compiled JS and need cookie-based session routing -@app.api_route( - "/global/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_global_to_session(request: Request, path: str): - """Proxy /global/* requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - # Redirect to session-prefixed path - return await proxy_to_session(request, session_id, f"global/{path}") - - -@app.api_route( - "/assets/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_assets_to_session(request: Request, path: str): - """Proxy /assets/* requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - # Redirect to session-prefixed path - return await proxy_to_session(request, session_id, f"assets/{path}") - - -@app.api_route( - "/provider/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_provider_path_to_session(request: Request, path: str): - """Proxy /provider/* requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, f"provider/{path}") - - -@app.api_route( - "/provider", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_provider_to_session(request: Request): - """Proxy /provider requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, "provider") - - -@app.api_route( - "/project", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_project_to_session(request: Request): - """Proxy /project requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, "project") - - -@app.api_route( - "/path", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_path_to_session(request: Request): - """Proxy /path requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, "path") - - -@app.api_route( - "/find/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_find_to_session(request: Request, path: str): - """Proxy /find/* requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, f"find/{path}") - - -@app.api_route( - "/file", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_file_to_session(request: Request): - """Proxy /file requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, "file") - - -@app.api_route( - "/file/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_file_path_to_session(request: Request, path: str): - """Proxy /file/* requests to the current session based on cookie""" - session_id = request.cookies.get("lovdata_session") - if not session_id: - raise HTTPException(status_code=400, detail="No active session - please access via /session/{id}/ first") - return await proxy_to_session(request, session_id, f"file/{path}") - - -@app.api_route( - "/session/{session_id}/{path:path}", - methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"], -) -async def proxy_to_session(request: Request, session_id: str, path: str): - """Proxy requests to session containers based on session ID in URL""" - start_time = time.time() - - with RequestContext(): - log_request( - request.method, - f"/session/{session_id}/{path}", - 200, - 0, - operation="proxy_start", - session_id=session_id, - ) - - session = await session_manager.get_session(session_id) - if not session or session.status != "running": - duration_ms = (time.time() - start_time) * 1000 - log_request( - request.method, - f"/session/{session_id}/{path}", - 404, - duration_ms, - session_id=session_id, - error="Session not found or not running", - ) - raise HTTPException( - status_code=404, detail="Session not found or not running" - ) - - # For DinD architecture: containers run inside docker-daemon with ports mapped - # to docker-daemon's interface, so we proxy through docker-daemon hostname - docker_host = os.getenv("DOCKER_HOST", "http://docker-daemon:2375") - # Extract hostname from DOCKER_HOST (e.g., "http://docker-daemon:2375" -> "docker-daemon") - parsed = urlparse(docker_host) - container_host = parsed.hostname or "docker-daemon" - - container_url = f"http://{container_host}:{session.port}" - - # Prepare the request URL - url = f"{container_url}/{path}" - if request.url.query: - url += f"?{request.url.query}" - - # Get request body - body = await request.body() - - # Prepare headers (exclude host header to avoid conflicts) - headers = dict(request.headers) - headers.pop("host", None) - - # Add authentication headers for the OpenCode server - if session.auth_token: - headers["Authorization"] = f"Bearer {session.auth_token}" - headers["X-Session-Token"] = session.auth_token - headers["X-Session-ID"] = session.session_id - - # Make the proxy request using the connection pool - try: - log_session_operation( - session_id, "proxy_request", method=request.method, path=path - ) - - response = await make_http_request( - method=request.method, - url=url, - headers=headers, - content=body, - ) - - duration_ms = (time.time() - start_time) * 1000 - log_request( - request.method, - f"/session/{session_id}/{path}", - response.status_code, - duration_ms, - session_id=session_id, - operation="proxy_complete", - ) - - # Log security event for proxy access - log_security_event( - "proxy_access", - "info", - session_id=session_id, - method=request.method, - path=path, - status_code=response.status_code, - ) - - # Return the response - inject base tag for HTML to fix asset paths - content = response.content - response_headers = dict(response.headers) - content_type = response.headers.get("content-type", "") - - # For HTML responses, rewrite root-relative paths to include session prefix - # OpenCode uses paths like /assets/, /favicon.ico which bypass tag - # We need to prepend the session path to make them work - if "text/html" in content_type: - try: - html = content.decode("utf-8") - session_prefix = f"/session/{session_id}" - - # Rewrite src="/..." to src="/session/{id}/..." - html = re.sub(r'src="/', f'src="{session_prefix}/', html) - # Rewrite href="/..." to href="/session/{id}/..." - html = re.sub(r'href="/', f'href="{session_prefix}/', html) - # Rewrite content="/..." for meta tags - html = re.sub(r'content="/', f'content="{session_prefix}/', html) - - content = html.encode("utf-8") - response_headers["content-length"] = str(len(content)) - except UnicodeDecodeError: - pass # Not valid UTF-8, skip rewriting - - # Create response with session tracking cookie - resp = Response( - content=content, - status_code=response.status_code, - headers=response_headers, - ) - # Set cookie to track current session for /global/* and /assets/* routing - resp.set_cookie( - key="lovdata_session", - value=session_id, - httponly=True, - samesite="lax", - max_age=86400, # 24 hours - ) - return resp - - except httpx.TimeoutException as e: - duration_ms = (time.time() - start_time) * 1000 - log_request( - request.method, - f"/session/{session_id}/{path}", - 504, - duration_ms, - session_id=session_id, - error="timeout", - ) - log_security_event( - "proxy_timeout", - "warning", - session_id=session_id, - method=request.method, - path=path, - error=str(e), - ) - raise HTTPException( - status_code=504, detail="Request to session container timed out" - ) - except httpx.RequestError as e: - duration_ms = (time.time() - start_time) * 1000 - log_request( - request.method, - f"/session/{session_id}/{path}", - 502, - duration_ms, - session_id=session_id, - error=str(e), - ) - log_security_event( - "proxy_connection_error", - "error", - session_id=session_id, - method=request.method, - path=path, - error=str(e), - ) - raise HTTPException( - status_code=502, - detail=f"Failed to connect to session container: {str(e)}", - ) - - -@app.get("/health") -async def health_check(): - """Health check endpoint with comprehensive resource monitoring""" - docker_ok = False - host_ip_ok = False - detected_host_ip = None - resource_status = {} - http_pool_stats = {} - - try: - # Check Docker connectivity - docker_ok = await session_manager.docker_service.ping() - except Exception as e: - logger.warning(f"Docker health check failed: {e}") - docker_ok = False - - try: - # Check host IP detection - detected_host_ip = await async_get_host_ip() - host_ip_ok = True - except Exception as e: - logger.warning(f"Host IP detection failed: {e}") - host_ip_ok = False - - try: - # Check system resource status - resource_status = check_system_resources() - except Exception as e: - logger.warning("Resource monitoring failed", extra={"error": str(e)}) - resource_status = {"error": str(e)} - - try: - # Get HTTP connection pool statistics - http_pool_stats = await get_connection_pool_stats() - except Exception as e: - logger.warning("HTTP pool stats failed", extra={"error": str(e)}) - http_pool_stats = {"error": str(e)} - - # Check database status if using database storage - database_status = {} - if USE_DATABASE_STORAGE: - try: - database_status = await get_database_stats() - except Exception as e: - logger.warning("Database stats failed", extra={"error": str(e)}) - database_status = {"status": "error", "error": str(e)} - - # Get container health statistics - container_health_stats = {} - try: - container_health_stats = get_container_health_stats() - except Exception as e: - logger.warning("Container health stats failed", extra={"error": str(e)}) - container_health_stats = {"error": str(e)} - - # Determine overall health status - resource_alerts = ( - resource_status.get("alerts", []) if isinstance(resource_status, dict) else [] - ) - critical_alerts = [ - a - for a in resource_alerts - if isinstance(a, dict) and a.get("level") == "critical" - ] - - # Check HTTP pool health - http_healthy = ( - http_pool_stats.get("status") == "healthy" - if isinstance(http_pool_stats, dict) - else False - ) - - if critical_alerts or not (docker_ok and host_ip_ok and http_healthy): - status = "unhealthy" - elif resource_alerts: - status = "degraded" - else: - status = "healthy" - - health_data = { - "status": status, - "docker": docker_ok, - "docker_mode": "async" if USE_ASYNC_DOCKER else "sync", - "host_ip_detection": host_ip_ok, - "detected_host_ip": detected_host_ip, - "http_connection_pool": http_pool_stats, - "storage_backend": "database" if USE_DATABASE_STORAGE else "json_file", - "active_sessions": len( - [s for s in session_manager.sessions.values() if s.status == "running"] - ), - "resource_limits": { - "memory_limit": CONTAINER_MEMORY_LIMIT, - "cpu_quota": CONTAINER_CPU_QUOTA, - "cpu_period": CONTAINER_CPU_PERIOD, - "max_concurrent_sessions": MAX_CONCURRENT_SESSIONS, - }, - "system_resources": resource_status.get("system_resources", {}) - if isinstance(resource_status, dict) - else {}, - "resource_alerts": resource_alerts, - "timestamp": datetime.now().isoformat(), - } - - # Add database information if using database storage - if USE_DATABASE_STORAGE and database_status: - health_data["database"] = database_status - - # Add container health information - if container_health_stats: - health_data["container_health"] = container_health_stats - - return health_data +from app import app # noqa: F401 - exported for uvicorn if __name__ == "__main__": - uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) + uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) diff --git a/session-manager/models.py b/session-manager/models.py new file mode 100644 index 0000000..e58bbe4 --- /dev/null +++ b/session-manager/models.py @@ -0,0 +1,24 @@ +""" +Data models for Session Management Service + +Pydantic models for session data and API request/response schemas. +""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel + + +class SessionData(BaseModel): + """Represents a user session with its associated container""" + + session_id: str + container_name: str + container_id: Optional[str] = None + host_dir: str + port: Optional[int] = None + auth_token: Optional[str] = None # Authentication token for the session + created_at: datetime + last_accessed: datetime + status: str = "creating" # creating, running, stopped, error diff --git a/session-manager/session_manager.py b/session-manager/session_manager.py new file mode 100644 index 0000000..ed6d1c1 --- /dev/null +++ b/session-manager/session_manager.py @@ -0,0 +1,410 @@ +import os +import uuid +import json +import asyncio +import shutil +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, Optional, List + +from fastapi import HTTPException + +from config import ( + SESSIONS_DIR, + SESSIONS_FILE, + CONTAINER_IMAGE, + MAX_CONCURRENT_SESSIONS, + SESSION_TIMEOUT_MINUTES, + USE_ASYNC_DOCKER, + USE_DATABASE_STORAGE, +) +from models import SessionData +from docker_service import DockerService +from database import SessionModel +from resource_manager import get_resource_limits, should_throttle_sessions +from session_auth import generate_session_auth_token, cleanup_expired_auth_tokens +from logging_config import get_logger + +logger = get_logger(__name__) + + +class SessionManager: + def __init__(self, docker_service: Optional[DockerService] = None): + if docker_service: + self.docker_service = docker_service + else: + self.docker_service = DockerService(use_async=USE_ASYNC_DOCKER) + + if USE_DATABASE_STORAGE: + self.sessions: Dict[str, SessionData] = {} + logger.info("Session storage initialized", extra={"backend": "database"}) + else: + self.sessions: Dict[str, SessionData] = {} + self._load_sessions_from_file() + logger.info("Session storage initialized", extra={"backend": "json_file"}) + + from container_health import get_container_health_monitor + + self.health_monitor = get_container_health_monitor() + + logger.info( + "SessionManager initialized", + extra={ + "docker_service_type": type(self.docker_service).__name__, + "storage_backend": "database" if USE_DATABASE_STORAGE else "json_file", + }, + ) + + def _load_sessions_from_file(self): + if SESSIONS_FILE.exists(): + try: + with open(SESSIONS_FILE, "r") as f: + data = json.load(f) + for session_id, session_dict in data.items(): + session_dict["created_at"] = datetime.fromisoformat( + session_dict["created_at"] + ) + session_dict["last_accessed"] = datetime.fromisoformat( + session_dict["last_accessed"] + ) + self.sessions[session_id] = SessionData(**session_dict) + logger.info( + "Sessions loaded from JSON file", + extra={"count": len(self.sessions)}, + ) + except (json.JSONDecodeError, KeyError) as e: + logger.warning("Could not load sessions file", extra={"error": str(e)}) + self.sessions = {} + + async def _load_sessions_from_database(self): + try: + db_sessions = await SessionModel.get_sessions_by_status("running") + db_sessions.extend(await SessionModel.get_sessions_by_status("creating")) + + self.sessions = {} + for session_dict in db_sessions: + session_data = SessionData(**session_dict) + self.sessions[session_dict["session_id"]] = session_data + + logger.info( + "Sessions loaded from database", extra={"count": len(self.sessions)} + ) + except Exception as e: + logger.error( + "Failed to load sessions from database", extra={"error": str(e)} + ) + self.sessions = {} + + def _save_sessions(self): + SESSIONS_DIR.mkdir(exist_ok=True) + data = {} + for session_id, session in self.sessions.items(): + data[session_id] = session.dict() + + with open(SESSIONS_FILE, "w") as f: + json.dump(data, f, indent=2, default=str) + + def _generate_session_id(self) -> str: + return str(uuid.uuid4()).replace("-", "")[:16] + + def _get_available_port(self) -> int: + used_ports = {s.port for s in self.sessions.values() if s.port} + port = 8081 + while port in used_ports: + port += 1 + return port + + def _check_container_limits(self) -> bool: + active_sessions = sum( + 1 for s in self.sessions.values() if s.status in ["creating", "running"] + ) + return active_sessions < MAX_CONCURRENT_SESSIONS + + async def _async_check_container_limits(self) -> bool: + return self._check_container_limits() + + async def create_session(self) -> SessionData: + if USE_ASYNC_DOCKER: + limits_ok = await self._async_check_container_limits() + else: + limits_ok = self._check_container_limits() + + if not limits_ok: + raise HTTPException( + status_code=429, + detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached", + ) + + should_throttle, reason = should_throttle_sessions() + if should_throttle: + raise HTTPException( + status_code=503, + detail=f"System resource constraints prevent new sessions: {reason}", + ) + + session_id = self._generate_session_id() + container_name = f"opencode-{session_id}" + host_dir = str(SESSIONS_DIR / session_id) + port = self._get_available_port() + + Path(host_dir).mkdir(parents=True, exist_ok=True) + + auth_token = generate_session_auth_token(session_id) + + session = SessionData( + session_id=session_id, + container_name=container_name, + host_dir=host_dir, + port=port, + auth_token=auth_token, + created_at=datetime.now(), + last_accessed=datetime.now(), + status="creating", + ) + + self.sessions[session_id] = session + + if USE_DATABASE_STORAGE: + try: + await SessionModel.create_session( + { + "session_id": session_id, + "container_name": container_name, + "host_dir": host_dir, + "port": port, + "auth_token": auth_token, + "status": "creating", + } + ) + logger.info( + "Session created in database", extra={"session_id": session_id} + ) + except Exception as e: + logger.error( + "Failed to create session in database", + extra={"session_id": session_id, "error": str(e)}, + ) + + if USE_ASYNC_DOCKER: + asyncio.create_task(self._start_container_async(session)) + else: + asyncio.create_task(self._start_container_sync(session)) + + return session + + async def _start_container_async(self, session: SessionData): + try: + resource_limits = get_resource_limits() + + logger.info( + f"Starting container {session.container_name} with resource limits: " + f"memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}" + ) + + container_info = await self.docker_service.create_container( + name=session.container_name, + image=CONTAINER_IMAGE, + volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}}, + ports={"8080": session.port}, + environment={ + "MCP_SERVER": os.getenv("MCP_SERVER", ""), + "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), + "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), + "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), + "SESSION_AUTH_TOKEN": session.auth_token or "", + "SESSION_ID": session.session_id, + }, + network_mode="bridge", + mem_limit=resource_limits.memory_limit, + cpu_quota=resource_limits.cpu_quota, + cpu_period=resource_limits.cpu_period, + tmpfs={ + "/tmp": "rw,noexec,nosuid,size=100m", + "/var/tmp": "rw,noexec,nosuid,size=50m", + }, + ) + + await self.docker_service.start_container(container_info.container_id) + + session.container_id = container_info.container_id + session.status = "running" + + self.sessions[session.session_id] = session + + if USE_DATABASE_STORAGE: + try: + await SessionModel.update_session( + session.session_id, + { + "container_id": container_info.container_id, + "status": "running", + }, + ) + except Exception as e: + logger.error( + "Failed to update session in database", + extra={"session_id": session.session_id, "error": str(e)}, + ) + + logger.info( + "Container started successfully", + extra={ + "session_id": session.session_id, + "container_name": session.container_name, + "container_id": container_info.container_id, + "port": session.port, + }, + ) + + except Exception as e: + session.status = "error" + self._save_sessions() + logger.error(f"Failed to start container {session.container_name}: {e}") + + async def _start_container_sync(self, session: SessionData): + try: + resource_limits = get_resource_limits() + + logger.info( + f"Starting container {session.container_name} with resource limits: " + f"memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}" + ) + + container_info = await self.docker_service.create_container( + name=session.container_name, + image=CONTAINER_IMAGE, + volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}}, + ports={"8080": session.port}, + environment={ + "MCP_SERVER": os.getenv("MCP_SERVER", ""), + "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), + "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), + "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), + "SESSION_AUTH_TOKEN": session.auth_token or "", + "SESSION_ID": session.session_id, + }, + network_mode="bridge", + mem_limit=resource_limits.memory_limit, + cpu_quota=resource_limits.cpu_quota, + cpu_period=resource_limits.cpu_period, + tmpfs={ + "/tmp": "rw,noexec,nosuid,size=100m", + "/var/tmp": "rw,noexec,nosuid,size=50m", + }, + ) + + session.container_id = container_info.container_id + session.status = "running" + + self.sessions[session.session_id] = session + + if USE_DATABASE_STORAGE: + try: + await SessionModel.update_session( + session.session_id, + { + "container_id": container_info.container_id, + "status": "running", + }, + ) + except Exception as e: + logger.error( + "Failed to update session in database", + extra={"session_id": session.session_id, "error": str(e)}, + ) + + logger.info( + "Container started successfully", + extra={ + "session_id": session.session_id, + "container_name": session.container_name, + "container_id": container_info.container_id, + "port": session.port, + }, + ) + + except Exception as e: + session.status = "error" + self._save_sessions() + logger.error(f"Failed to start container {session.container_name}: {e}") + + async def get_session(self, session_id: str) -> Optional[SessionData]: + session = self.sessions.get(session_id) + if session: + session.last_accessed = datetime.now() + if USE_DATABASE_STORAGE: + try: + await SessionModel.update_session( + session_id, {"last_accessed": datetime.now()} + ) + except Exception as e: + logger.warning( + "Failed to update session access time in database", + extra={"session_id": session_id, "error": str(e)}, + ) + return session + + if USE_DATABASE_STORAGE: + try: + db_session = await SessionModel.get_session(session_id) + if db_session: + session_data = SessionData(**db_session) + self.sessions[session_id] = session_data + logger.debug( + "Session loaded from database", extra={"session_id": session_id} + ) + return session_data + except Exception as e: + logger.error( + "Failed to load session from database", + extra={"session_id": session_id, "error": str(e)}, + ) + + return None + + async def list_sessions(self) -> List[SessionData]: + return list(self.sessions.values()) + + async def list_containers_async(self, all: bool = False) -> List: + return await self.docker_service.list_containers(all=all) + + async def cleanup_expired_sessions(self): + now = datetime.now() + expired_sessions = [] + + for session_id, session in self.sessions.items(): + if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES): + expired_sessions.append(session_id) + + try: + await self.docker_service.stop_container( + session.container_name, timeout=10 + ) + await self.docker_service.remove_container(session.container_name) + logger.info(f"Cleaned up container {session.container_name}") + except Exception as e: + logger.error( + f"Error cleaning up container {session.container_name}: {e}" + ) + + try: + shutil.rmtree(session.host_dir) + logger.info(f"Removed session directory {session.host_dir}") + except OSError as e: + logger.error( + f"Error removing session directory {session.host_dir}: {e}" + ) + + for session_id in expired_sessions: + del self.sessions[session_id] + + if expired_sessions: + self._save_sessions() + logger.info(f"Cleaned up {len(expired_sessions)} expired sessions") + + expired_tokens = cleanup_expired_auth_tokens() + if expired_tokens > 0: + logger.info(f"Cleaned up {expired_tokens} expired authentication tokens") + + +session_manager = SessionManager()