""" Session Management Service for Lovdata Chat This service manages the lifecycle of OpenCode containers for individual user sessions. Each session gets its own isolated container with a dedicated working directory. """ import os import uuid import json import asyncio from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Optional, List from contextlib import asynccontextmanager import docker from docker.errors import DockerException, NotFound from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Response from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel import uvicorn import httpx import asyncio # Configuration SESSIONS_DIR = Path("/app/sessions") SESSIONS_FILE = Path("/app/sessions/sessions.json") CONTAINER_IMAGE = "lovdata-opencode:latest" MAX_CONCURRENT_SESSIONS = 3 # Workstation limit SESSION_TIMEOUT_MINUTES = 60 # Auto-cleanup after 1 hour CONTAINER_MEMORY_LIMIT = "4g" CONTAINER_CPU_QUOTA = 100000 # 1 CPU core class SessionData(BaseModel): session_id: str container_name: str container_id: Optional[str] = None host_dir: str port: Optional[int] = None created_at: datetime last_accessed: datetime status: str = "creating" # creating, running, stopped, error class SessionManager: def __init__(self): # Use TLS certificates for secure Docker communication tls_config = docker.tls.TLSConfig( client_cert=("/certs/client/cert.pem", "/certs/client/key.pem"), ca_cert="/certs/client/ca.pem", verify=True, ) self.docker_client = docker.DockerClient( base_url=os.getenv("DOCKER_HOST", "tcp://docker-daemon:2376"), tls=tls_config, ) self.sessions: Dict[str, SessionData] = {} self._load_sessions() def _load_sessions(self): """Load session data from persistent storage""" if SESSIONS_FILE.exists(): try: with open(SESSIONS_FILE, "r") as f: data = json.load(f) for session_id, session_dict in data.items(): # Convert datetime strings back to datetime objects session_dict["created_at"] = datetime.fromisoformat( session_dict["created_at"] ) session_dict["last_accessed"] = datetime.fromisoformat( session_dict["last_accessed"] ) self.sessions[session_id] = SessionData(**session_dict) except (json.JSONDecodeError, KeyError) as e: print(f"Warning: Could not load sessions file: {e}") self.sessions = {} def _save_sessions(self): """Save session data to persistent storage""" SESSIONS_DIR.mkdir(exist_ok=True) data = {} for session_id, session in self.sessions.items(): data[session_id] = session.dict() with open(SESSIONS_FILE, "w") as f: json.dump(data, f, indent=2, default=str) def _generate_session_id(self) -> str: """Generate a unique session ID""" return str(uuid.uuid4()).replace("-", "")[:16] def _get_available_port(self) -> int: """Find an available port for the container""" used_ports = {s.port for s in self.sessions.values() if s.port} port = 8080 while port in used_ports: port += 1 return port def _check_container_limits(self) -> bool: """Check if we're within concurrent session limits""" active_sessions = sum( 1 for s in self.sessions.values() if s.status in ["creating", "running"] ) return active_sessions < MAX_CONCURRENT_SESSIONS async def create_session(self) -> SessionData: """Create a new OpenCode session with dedicated container""" if not self._check_container_limits(): raise HTTPException( status_code=429, detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached", ) session_id = self._generate_session_id() container_name = f"opencode-{session_id}" host_dir = str(SESSIONS_DIR / session_id) port = self._get_available_port() # Create host directory Path(host_dir).mkdir(parents=True, exist_ok=True) session = SessionData( session_id=session_id, container_name=container_name, host_dir=host_dir, port=port, created_at=datetime.now(), last_accessed=datetime.now(), status="creating", ) self.sessions[session_id] = session self._save_sessions() # Start container in background asyncio.create_task(self._start_container(session)) return session async def _start_container(self, session: SessionData): """Start the OpenCode container for a session""" try: # Check if container already exists try: existing = self.docker_client.containers.get(session.container_name) if existing.status == "running": session.status = "running" session.container_id = existing.id self._save_sessions() return else: existing.remove() except NotFound: pass # Create and start new container container = self.docker_client.containers.run( CONTAINER_IMAGE, name=session.container_name, volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}}, ports={f"8080/tcp": session.port}, detach=True, mem_limit=CONTAINER_MEMORY_LIMIT, cpu_quota=CONTAINER_CPU_QUOTA, environment={ "MCP_SERVER": os.getenv( "MCP_SERVER", "http://host.docker.internal:8001" ), "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), "GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""), }, network_mode="bridge", ) session.container_id = container.id session.status = "running" self._save_sessions() print(f"Started container {session.container_name} on port {session.port}") except DockerException as e: session.status = "error" self._save_sessions() print(f"Failed to start container {session.container_name}: {e}") async def get_session(self, session_id: str) -> Optional[SessionData]: """Get session information""" session = self.sessions.get(session_id) if session: session.last_accessed = datetime.now() self._save_sessions() return session async def list_sessions(self) -> List[SessionData]: """List all sessions""" return list(self.sessions.values()) async def cleanup_expired_sessions(self): """Clean up expired sessions and their containers""" now = datetime.now() expired_sessions = [] for session_id, session in self.sessions.items(): # Check if session has expired if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES): expired_sessions.append(session_id) # Stop and remove container try: container = self.docker_client.containers.get( session.container_name ) container.stop(timeout=10) container.remove() print(f"Cleaned up container {session.container_name}") except NotFound: pass except DockerException as e: print(f"Error cleaning up container {session.container_name}: {e}") # Remove session directory try: import shutil shutil.rmtree(session.host_dir) print(f"Removed session directory {session.host_dir}") except OSError as e: print(f"Error removing session directory {session.host_dir}: {e}") for session_id in expired_sessions: del self.sessions[session_id] if expired_sessions: self._save_sessions() print(f"Cleaned up {len(expired_sessions)} expired sessions") # Global session manager instance session_manager = SessionManager() @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" # Startup print("Starting Session Management Service") # Start cleanup task async def cleanup_task(): while True: await session_manager.cleanup_expired_sessions() await asyncio.sleep(300) # Run every 5 minutes cleanup_coro = asyncio.create_task(cleanup_task()) yield # Shutdown print("Shutting down Session Management Service") cleanup_coro.cancel() app = FastAPI( title="Lovdata Chat Session Manager", description="Manages isolated OpenCode containers for Norwegian legal research sessions", version="1.0.0", lifespan=lifespan, ) @app.post("/sessions", response_model=SessionData) async def create_session(): """Create a new session with dedicated container""" try: session = await session_manager.create_session() return session except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to create session: {str(e)}" ) @app.get("/sessions/{session_id}", response_model=SessionData) async def get_session(session_id: str): """Get session information""" session = await session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") return session @app.get("/sessions", response_model=List[SessionData]) async def list_sessions(): """List all active sessions""" return await session_manager.list_sessions() @app.delete("/sessions/{session_id}") async def delete_session(session_id: str, background_tasks: BackgroundTasks): """Delete a session and its container""" session = await session_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # Schedule cleanup background_tasks.add_task(session_manager.cleanup_expired_sessions) # Remove from sessions immediately del session_manager.sessions[session_id] session_manager._save_sessions() return {"message": f"Session {session_id} scheduled for deletion"} @app.post("/cleanup") async def trigger_cleanup(): """Manually trigger cleanup of expired sessions""" await session_manager.cleanup_expired_sessions() return {"message": "Cleanup completed"} @app.api_route( "/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"] ) async def proxy_to_session(request: Request, path: str): """Proxy requests to session containers based on X-Session-ID header""" session_id = request.headers.get("X-Session-ID") if not session_id: raise HTTPException(status_code=400, detail="Missing X-Session-ID header") session = await session_manager.get_session(session_id) if not session or session.status != "running": raise HTTPException(status_code=404, detail="Session not found or not running") # Proxy the request to the container container_url = f"http://localhost:{session.port}" # Prepare the request url = f"{container_url}/{path}" if request.url.query: url += f"?{request.url.query}" # Get request body body = await request.body() async with httpx.AsyncClient(timeout=30.0) as client: try: response = await client.request( method=request.method, url=url, headers={ k: v for k, v in request.headers.items() if k.lower() not in ["host", "x-session-id"] }, content=body, follow_redirects=False, ) # Return the proxied response return Response( content=response.content, status_code=response.status_code, headers=dict(response.headers), ) except httpx.RequestError as e: raise HTTPException( status_code=502, detail=f"Container proxy error: {str(e)}" ) @app.get("/health") async def health_check(): """Health check endpoint""" try: # Check Docker connectivity session_manager.docker_client.ping() docker_ok = True except: docker_ok = False return { "status": "healthy" if docker_ok else "unhealthy", "docker": docker_ok, "active_sessions": len( [s for s in session_manager.sessions.values() if s.status == "running"] ), "timestamp": datetime.now().isoformat(), } if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)