Files
lovdata-chat/session-manager/main.py

436 lines
14 KiB
Python

"""
Session Management Service for Lovdata Chat
This service manages the lifecycle of OpenCode containers for individual user sessions.
Each session gets its own isolated container with a dedicated working directory.
"""
import os
import uuid
import json
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, List
from contextlib import asynccontextmanager
import docker
from docker.errors import DockerException, NotFound
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
import uvicorn
import httpx
import asyncio
# Configuration
SESSIONS_DIR = Path("/app/sessions")
SESSIONS_FILE = Path("/app/sessions/sessions.json")
CONTAINER_IMAGE = "lovdata-opencode:latest"
MAX_CONCURRENT_SESSIONS = 3 # Workstation limit
SESSION_TIMEOUT_MINUTES = 60 # Auto-cleanup after 1 hour
CONTAINER_MEMORY_LIMIT = "4g"
CONTAINER_CPU_QUOTA = 100000 # 1 CPU core
class SessionData(BaseModel):
session_id: str
container_name: str
container_id: Optional[str] = None
host_dir: str
port: Optional[int] = None
created_at: datetime
last_accessed: datetime
status: str = "creating" # creating, running, stopped, error
class SimpleDockerClient:
"""Simple Docker client using direct HTTP requests to Unix socket"""
def __init__(self, socket_path="/var/run/docker.sock"):
self.socket_path = socket_path
self.base_url = (
"http://localhost" # Docker socket uses HTTP over Unix domain socket
)
async def _request(self, method, path, json_data=None):
"""Make HTTP request to Docker socket"""
import httpx
# Create Unix socket connector
connector = httpx.AsyncHTTPTransport(uds=self.socket_path)
async with httpx.AsyncClient(
transport=connector, base_url=self.base_url
) as client:
try:
response = await client.request(method, path, json=json_data)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
print(f"Docker API error: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
print(f"Docker request failed: {e}")
raise
async def ping(self):
"""Test Docker connectivity"""
result = await self._request("GET", "/_ping")
return result
async def create_container(self, image, name, **kwargs):
"""Create a container"""
data = {"Image": image, "Names": [name], **kwargs}
result = await self._request("POST", "/containers/create", json_data=data)
return result
async def start_container(self, container_id):
"""Start a container"""
await self._request("POST", f"/containers/{container_id}/start")
return True
async def stop_container(self, container_id):
"""Stop a container"""
await self._request("POST", f"/containers/{container_id}/stop")
return True
async def remove_container(self, container_id):
"""Remove a container"""
await self._request("DELETE", f"/containers/{container_id}")
return True
async def inspect_container(self, container_id):
"""Inspect a container"""
result = await self._request("GET", f"/containers/{container_id}/json")
return result
class SessionManager:
def __init__(self):
# Try Docker library first, fall back to httpx if it fails
self.docker_client = None
try:
# Set DOCKER_HOST to the mounted socket
os.environ["DOCKER_HOST"] = "unix:///var/run/docker.sock"
import docker
self.docker_client = docker.from_env()
# Test the connection
self.docker_client.ping()
print("Docker library client initialized successfully")
except Exception as e:
print(f"Docker library failed ({e}), falling back to httpx client")
self.docker_client = SimpleDockerClient()
self.sessions: Dict[str, SessionData] = {}
self._load_sessions()
def _load_sessions(self):
"""Load session data from persistent storage"""
if SESSIONS_FILE.exists():
try:
with open(SESSIONS_FILE, "r") as f:
data = json.load(f)
for session_id, session_dict in data.items():
# Convert datetime strings back to datetime objects
session_dict["created_at"] = datetime.fromisoformat(
session_dict["created_at"]
)
session_dict["last_accessed"] = datetime.fromisoformat(
session_dict["last_accessed"]
)
self.sessions[session_id] = SessionData(**session_dict)
except (json.JSONDecodeError, KeyError) as e:
print(f"Warning: Could not load sessions file: {e}")
self.sessions = {}
def _save_sessions(self):
"""Save session data to persistent storage"""
SESSIONS_DIR.mkdir(exist_ok=True)
data = {}
for session_id, session in self.sessions.items():
data[session_id] = session.dict()
with open(SESSIONS_FILE, "w") as f:
json.dump(data, f, indent=2, default=str)
def _generate_session_id(self) -> str:
"""Generate a unique session ID"""
return str(uuid.uuid4()).replace("-", "")[:16]
def _get_available_port(self) -> int:
"""Find an available port for the container"""
used_ports = {s.port for s in self.sessions.values() if s.port}
port = 8080
while port in used_ports:
port += 1
return port
def _check_container_limits(self) -> bool:
"""Check if we're within concurrent session limits"""
if not self.docker_client:
return False
active_sessions = sum(
1 for s in self.sessions.values() if s.status in ["creating", "running"]
)
return active_sessions < MAX_CONCURRENT_SESSIONS
async def create_session(self) -> SessionData:
"""Create a new OpenCode session with dedicated container"""
if not self._check_container_limits():
raise HTTPException(
status_code=429,
detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached",
)
session_id = self._generate_session_id()
container_name = f"opencode-{session_id}"
host_dir = str(SESSIONS_DIR / session_id)
port = self._get_available_port()
# Create host directory
Path(host_dir).mkdir(parents=True, exist_ok=True)
session = SessionData(
session_id=session_id,
container_name=container_name,
host_dir=host_dir,
port=port,
created_at=datetime.now(),
last_accessed=datetime.now(),
status="creating",
)
self.sessions[session_id] = session
self._save_sessions()
# Start container in background
asyncio.create_task(self._start_container(session))
return session
async def _start_container(self, session: SessionData):
"""Start the OpenCode container for a session"""
if not self.docker_client:
session.status = "error"
self._save_sessions()
print("Docker client not available")
return
try:
# Mock container creation for development
session.container_id = f"mock-{session.session_id}"
session.status = "running"
self._save_sessions()
print(f"Container {session.container_name} ready on port {session.port}")
except Exception as e:
session.status = "error"
self._save_sessions()
print(f"Failed to start container {session.container_name}: {e}")
async def get_session(self, session_id: str) -> Optional[SessionData]:
"""Get session information"""
session = self.sessions.get(session_id)
if session:
session.last_accessed = datetime.now()
self._save_sessions()
return session
async def list_sessions(self) -> List[SessionData]:
"""List all sessions"""
return list(self.sessions.values())
async def cleanup_expired_sessions(self):
"""Clean up expired sessions and their containers"""
now = datetime.now()
expired_sessions = []
for session_id, session in self.sessions.items():
# Check if session has expired
if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES):
expired_sessions.append(session_id)
# Stop and remove container
if not self.docker_client:
continue
try:
# Mock container cleanup for development
print(f"Cleaned up container {session.container_name}")
except Exception as e:
print(f"Error cleaning up container {session.container_name}: {e}")
# Remove session directory
try:
import shutil
shutil.rmtree(session.host_dir)
print(f"Removed session directory {session.host_dir}")
except OSError as e:
print(f"Error removing session directory {session.host_dir}: {e}")
for session_id in expired_sessions:
del self.sessions[session_id]
if expired_sessions:
self._save_sessions()
print(f"Cleaned up {len(expired_sessions)} expired sessions")
# Global session manager instance
session_manager = SessionManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
print("Starting Session Management Service")
# Start cleanup task
async def cleanup_task():
while True:
await session_manager.cleanup_expired_sessions()
await asyncio.sleep(300) # Run every 5 minutes
cleanup_coro = asyncio.create_task(cleanup_task())
yield
# Shutdown
print("Shutting down Session Management Service")
cleanup_coro.cancel()
app = FastAPI(
title="Lovdata Chat Session Manager",
description="Manages isolated OpenCode containers for Norwegian legal research sessions",
version="1.0.0",
lifespan=lifespan,
)
@app.post("/sessions", response_model=SessionData)
async def create_session():
"""Create a new session with dedicated container"""
try:
session = await session_manager.create_session()
return session
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to create session: {str(e)}"
)
@app.get("/sessions/{session_id}", response_model=SessionData)
async def get_session(session_id: str):
"""Get session information"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session
@app.get("/sessions", response_model=List[SessionData])
async def list_sessions():
"""List all active sessions"""
return await session_manager.list_sessions()
@app.delete("/sessions/{session_id}")
async def delete_session(session_id: str, background_tasks: BackgroundTasks):
"""Delete a session and its container"""
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
# Schedule cleanup
background_tasks.add_task(session_manager.cleanup_expired_sessions)
# Remove from sessions immediately
del session_manager.sessions[session_id]
session_manager._save_sessions()
return {"message": f"Session {session_id} scheduled for deletion"}
@app.post("/cleanup")
async def trigger_cleanup():
"""Manually trigger cleanup of expired sessions"""
await session_manager.cleanup_expired_sessions()
return {"message": "Cleanup completed"}
@app.api_route(
"/session/{session_id}/{path:path}",
methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"],
)
async def proxy_to_session(request: Request, session_id: str, path: str):
"""Proxy requests to session containers based on session ID in URL"""
session = await session_manager.get_session(session_id)
if not session or session.status != "running":
raise HTTPException(status_code=404, detail="Session not found or not running")
session = await session_manager.get_session(session_id)
if not session or session.status != "running":
raise HTTPException(status_code=404, detail="Session not found or not running")
# Proxy the request to the container
container_url = f"http://localhost:{session.port}"
# Prepare the request
url = f"{container_url}/{path}"
if request.url.query:
url += f"?{request.url.query}"
# Get request body
body = await request.body()
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.request(
method=request.method,
url=url,
headers={
k: v
for k, v in request.headers.items()
if k.lower() not in ["host", "x-session-id"]
},
content=body,
follow_redirects=False,
)
# Return the proxied response
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
)
except httpx.RequestError as e:
raise HTTPException(
status_code=502, detail=f"Container proxy error: {str(e)}"
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
docker_ok = True # Docker connectivity assumed for development
return {
"status": "healthy" if docker_ok else "unhealthy",
"docker": docker_ok,
"active_sessions": len(
[s for s in session_manager.sessions.values() if s.status == "running"]
),
"timestamp": datetime.now().isoformat(),
}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)