Compare commits

..

2 Commits

Author SHA1 Message Date
05aa70c4af connected zen 2026-02-03 00:36:22 +01:00
9281c0e02a refactored the big main.py file 2026-02-03 00:17:26 +01:00
14 changed files with 1164 additions and 1288 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
__pycache__
.env

View File

@@ -11,7 +11,7 @@ RUN apt-get update && apt-get install -y \
RUN curl -fsSL https://opencode.ai/install | bash
# Copy OpenCode configuration
COPY ../config_opencode /root/.config/opencode
COPY config_opencode /root/.config/opencode
# Copy session manager
COPY . /app
@@ -19,8 +19,8 @@ COPY . /app
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Create working directory
RUN mkdir -p /app/somedir
# Create working directory and auth directory
RUN mkdir -p /app/somedir /root/.local/share/opencode
# Expose port
EXPOSE 8080
@@ -28,5 +28,5 @@ EXPOSE 8080
# Set environment variables
ENV PYTHONPATH=/app
# Start OpenCode server
# Start OpenCode server (OPENCODE_API_KEY passed via environment)
CMD ["/bin/bash", "-c", "source /root/.bashrc && opencode serve --hostname 0.0.0.0 --port 8080 --mdns"]

View File

@@ -2,6 +2,7 @@
"$schema": "https://opencode.ai/config.json",
"theme": "opencode",
"autoupdate": false,
"model": "opencode/kimi-k2.5-free",
"plugin": [],
"mcp": {
"sequential-thinking": {

View File

@@ -22,6 +22,7 @@ services:
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
- GOOGLE_API_KEY=${GOOGLE_API_KEY:-}
- ZEN_API_KEY=${ZEN_API_KEY:-}
# Certificate paths (configurable via environment)
- DOCKER_CA_CERT=${DOCKER_CA_CERT:-/etc/docker/certs/ca.pem}
- DOCKER_CLIENT_CERT=${DOCKER_CLIENT_CERT:-/etc/docker/certs/client-cert.pem}

96
session-manager/app.py Normal file
View File

@@ -0,0 +1,96 @@
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from config import USE_ASYNC_DOCKER, USE_DATABASE_STORAGE
from session_manager import session_manager
from async_docker_client import get_async_docker_client
from http_pool import init_http_pool, shutdown_http_pool
from database import init_database, shutdown_database, run_migrations
from container_health import (
start_container_health_monitoring,
stop_container_health_monitoring,
)
from logging_config import get_logger, init_logging
from routes import sessions_router, auth_router, health_router, proxy_router
init_logging()
logger = get_logger(__name__)
_use_database_storage = USE_DATABASE_STORAGE
@asynccontextmanager
async def lifespan(app: FastAPI):
global _use_database_storage
logger.info("Starting Session Management Service")
await init_http_pool()
if _use_database_storage:
try:
await init_database()
await run_migrations()
await session_manager._load_sessions_from_database()
logger.info("Database initialized and sessions loaded")
except Exception as e:
logger.error("Database initialization failed", extra={"error": str(e)})
if _use_database_storage:
logger.warning("Falling back to JSON file storage")
_use_database_storage = False
session_manager._load_sessions_from_file()
try:
docker_client = None
if USE_ASYNC_DOCKER:
async with get_async_docker_client() as client:
docker_client = client._docker if hasattr(client, "_docker") else None
else:
docker_client = session_manager.docker_service
await start_container_health_monitoring(session_manager, docker_client)
logger.info("Container health monitoring started")
except Exception as e:
logger.error(
"Failed to start container health monitoring", extra={"error": str(e)}
)
async def cleanup_task():
while True:
await session_manager.cleanup_expired_sessions()
await asyncio.sleep(300)
cleanup_coro = asyncio.create_task(cleanup_task())
yield
logger.info("Shutting down Session Management Service")
cleanup_coro.cancel()
await shutdown_http_pool()
try:
await stop_container_health_monitoring()
logger.info("Container health monitoring stopped")
except Exception as e:
logger.error(
"Error stopping container health monitoring", extra={"error": str(e)}
)
if _use_database_storage:
await shutdown_database()
app = FastAPI(
title="Lovdata Chat Session Manager",
description="Manages isolated OpenCode containers for Norwegian legal research sessions",
version="1.0.0",
lifespan=lifespan,
)
app.include_router(sessions_router)
app.include_router(auth_router)
app.include_router(health_router)
app.include_router(proxy_router)

47
session-manager/config.py Normal file
View File

@@ -0,0 +1,47 @@
"""
Configuration module for Session Management Service
Centralized configuration loading from environment variables with defaults.
"""
import os
from pathlib import Path
# Session storage configuration
SESSIONS_DIR = Path("/app/sessions")
SESSIONS_FILE = Path("/app/sessions/sessions.json")
# Container configuration
CONTAINER_IMAGE = os.getenv("CONTAINER_IMAGE", "lovdata-opencode:latest")
# Resource limits - configurable via environment variables with defaults
CONTAINER_MEMORY_LIMIT = os.getenv(
"CONTAINER_MEMORY_LIMIT", "4g"
) # Memory limit per container
CONTAINER_CPU_QUOTA = int(
os.getenv("CONTAINER_CPU_QUOTA", "100000")
) # CPU quota (100000 = 1 core)
CONTAINER_CPU_PERIOD = int(
os.getenv("CONTAINER_CPU_PERIOD", "100000")
) # CPU period (microseconds)
# Session management
MAX_CONCURRENT_SESSIONS = int(
os.getenv("MAX_CONCURRENT_SESSIONS", "3")
) # Max concurrent sessions
SESSION_TIMEOUT_MINUTES = int(
os.getenv("SESSION_TIMEOUT_MINUTES", "60")
) # Auto-cleanup timeout
# Resource monitoring thresholds
MEMORY_WARNING_THRESHOLD = float(
os.getenv("MEMORY_WARNING_THRESHOLD", "0.8")
) # 80% memory usage
CPU_WARNING_THRESHOLD = float(
os.getenv("CPU_WARNING_THRESHOLD", "0.9")
) # 90% CPU usage
# Feature flags
USE_ASYNC_DOCKER = os.getenv("USE_ASYNC_DOCKER", "true").lower() == "true"
USE_DATABASE_STORAGE = os.getenv("USE_DATABASE_STORAGE", "true").lower() == "true"

File diff suppressed because it is too large Load Diff

24
session-manager/models.py Normal file
View File

@@ -0,0 +1,24 @@
"""
Data models for Session Management Service
Pydantic models for session data and API request/response schemas.
"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class SessionData(BaseModel):
"""Represents a user session with its associated container"""
session_id: str
container_name: str
container_id: Optional[str] = None
host_dir: str
port: Optional[int] = None
auth_token: Optional[str] = None # Authentication token for the session
created_at: datetime
last_accessed: datetime
status: str = "creating" # creating, running, stopped, error

View File

@@ -0,0 +1,6 @@
from .sessions import router as sessions_router
from .auth import router as auth_router
from .health import router as health_router
from .proxy import router as proxy_router
__all__ = ["sessions_router", "auth_router", "health_router", "proxy_router"]

View File

@@ -0,0 +1,56 @@
from fastapi import APIRouter, HTTPException
from session_manager import session_manager
from session_auth import (
get_session_auth_info as get_auth_info,
list_active_auth_sessions,
_session_token_manager,
)
router = APIRouter(tags=["auth"])
@router.get("/sessions/{session_id}/auth")
async def get_session_auth_info(session_id: str):
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
auth_info = get_auth_info(session_id)
if not auth_info:
raise HTTPException(status_code=404, detail="Authentication info not found")
return {
"session_id": session_id,
"auth_info": auth_info,
"has_token": session.auth_token is not None,
}
@router.post("/sessions/{session_id}/auth/rotate")
async def rotate_session_token(session_id: str):
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
new_token = _session_token_manager.rotate_session_token(session_id)
if not new_token:
raise HTTPException(status_code=404, detail="Failed to rotate token")
session.auth_token = new_token
session_manager._save_sessions()
return {
"session_id": session_id,
"new_token": new_token,
"message": "Token rotated successfully",
}
@router.get("/auth/sessions")
async def list_authenticated_sessions():
sessions = list_active_auth_sessions()
return {
"active_auth_sessions": len(sessions),
"sessions": sessions,
}

View File

@@ -0,0 +1,149 @@
from datetime import datetime
from fastapi import APIRouter, HTTPException
from config import (
CONTAINER_MEMORY_LIMIT,
CONTAINER_CPU_QUOTA,
CONTAINER_CPU_PERIOD,
MAX_CONCURRENT_SESSIONS,
USE_ASYNC_DOCKER,
USE_DATABASE_STORAGE,
)
from session_manager import session_manager
from host_ip_detector import async_get_host_ip
from resource_manager import check_system_resources
from http_pool import get_connection_pool_stats
from database import get_database_stats
from container_health import get_container_health_stats, get_container_health_history
from logging_config import get_logger
logger = get_logger(__name__)
router = APIRouter(tags=["health"])
@router.get("/health/container")
async def get_container_health():
stats = get_container_health_stats()
return stats
@router.get("/health/container/{session_id}")
async def get_session_container_health(session_id: str):
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
stats = get_container_health_stats(session_id)
history = get_container_health_history(session_id, limit=20)
return {
"session_id": session_id,
"container_id": session.container_id,
"stats": stats.get(f"session_{session_id}", {}),
"recent_history": history,
}
@router.get("/health")
async def health_check():
docker_ok = False
host_ip_ok = False
detected_host_ip = None
resource_status = {}
http_pool_stats = {}
try:
docker_ok = await session_manager.docker_service.ping()
except Exception as e:
logger.warning(f"Docker health check failed: {e}")
docker_ok = False
try:
detected_host_ip = await async_get_host_ip()
host_ip_ok = True
except Exception as e:
logger.warning(f"Host IP detection failed: {e}")
host_ip_ok = False
try:
resource_status = check_system_resources()
except Exception as e:
logger.warning("Resource monitoring failed", extra={"error": str(e)})
resource_status = {"error": str(e)}
try:
http_pool_stats = await get_connection_pool_stats()
except Exception as e:
logger.warning("HTTP pool stats failed", extra={"error": str(e)})
http_pool_stats = {"error": str(e)}
database_status = {}
if USE_DATABASE_STORAGE:
try:
database_status = await get_database_stats()
except Exception as e:
logger.warning("Database stats failed", extra={"error": str(e)})
database_status = {"status": "error", "error": str(e)}
container_health_stats = {}
try:
container_health_stats = get_container_health_stats()
except Exception as e:
logger.warning("Container health stats failed", extra={"error": str(e)})
container_health_stats = {"error": str(e)}
resource_alerts = (
resource_status.get("alerts", []) if isinstance(resource_status, dict) else []
)
critical_alerts = [
a
for a in resource_alerts
if isinstance(a, dict) and a.get("level") == "critical"
]
http_healthy = (
http_pool_stats.get("status") == "healthy"
if isinstance(http_pool_stats, dict)
else False
)
if critical_alerts or not (docker_ok and host_ip_ok and http_healthy):
status = "unhealthy"
elif resource_alerts:
status = "degraded"
else:
status = "healthy"
health_data = {
"status": status,
"docker": docker_ok,
"docker_mode": "async" if USE_ASYNC_DOCKER else "sync",
"host_ip_detection": host_ip_ok,
"detected_host_ip": detected_host_ip,
"http_connection_pool": http_pool_stats,
"storage_backend": "database" if USE_DATABASE_STORAGE else "json_file",
"active_sessions": len(
[s for s in session_manager.sessions.values() if s.status == "running"]
),
"resource_limits": {
"memory_limit": CONTAINER_MEMORY_LIMIT,
"cpu_quota": CONTAINER_CPU_QUOTA,
"cpu_period": CONTAINER_CPU_PERIOD,
"max_concurrent_sessions": MAX_CONCURRENT_SESSIONS,
},
"system_resources": resource_status.get("system_resources", {})
if isinstance(resource_status, dict)
else {},
"resource_alerts": resource_alerts,
"timestamp": datetime.now().isoformat(),
}
if USE_DATABASE_STORAGE and database_status:
health_data["database"] = database_status
if container_health_stats:
health_data["container_health"] = container_health_stats
return health_data

View File

@@ -0,0 +1,241 @@
import os
import re
import time
from urllib.parse import urlparse
from fastapi import APIRouter, HTTPException, Request, Response
import httpx
from session_manager import session_manager
from http_pool import make_http_request
from logging_config import (
RequestContext,
log_request,
log_session_operation,
log_security_event,
)
router = APIRouter(tags=["proxy"])
ALL_METHODS = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
def get_session_from_cookie(request: Request) -> str:
session_id = request.cookies.get("lovdata_session")
if not session_id:
raise HTTPException(
status_code=400,
detail="No active session - please access via /session/{id}/ first",
)
return session_id
@router.api_route("/global/{path:path}", methods=ALL_METHODS)
async def proxy_global_to_session(request: Request, path: str):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, f"global/{path}")
@router.api_route("/assets/{path:path}", methods=ALL_METHODS)
async def proxy_assets_to_session(request: Request, path: str):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, f"assets/{path}")
@router.api_route("/provider/{path:path}", methods=ALL_METHODS)
async def proxy_provider_path_to_session(request: Request, path: str):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, f"provider/{path}")
@router.api_route("/provider", methods=ALL_METHODS)
async def proxy_provider_to_session(request: Request):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, "provider")
@router.api_route("/project", methods=ALL_METHODS)
async def proxy_project_to_session(request: Request):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, "project")
@router.api_route("/path", methods=ALL_METHODS)
async def proxy_path_to_session(request: Request):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, "path")
@router.api_route("/find/{path:path}", methods=ALL_METHODS)
async def proxy_find_to_session(request: Request, path: str):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, f"find/{path}")
@router.api_route("/file", methods=ALL_METHODS)
async def proxy_file_to_session(request: Request):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, "file")
@router.api_route("/file/{path:path}", methods=ALL_METHODS)
async def proxy_file_path_to_session(request: Request, path: str):
session_id = get_session_from_cookie(request)
return await proxy_to_session(request, session_id, f"file/{path}")
@router.api_route("/session/{session_id}/{path:path}", methods=ALL_METHODS)
async def proxy_to_session(request: Request, session_id: str, path: str):
start_time = time.time()
with RequestContext():
log_request(
request.method,
f"/session/{session_id}/{path}",
200,
0,
operation="proxy_start",
session_id=session_id,
)
session = await session_manager.get_session(session_id)
if not session or session.status != "running":
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
404,
duration_ms,
session_id=session_id,
error="Session not found or not running",
)
raise HTTPException(
status_code=404, detail="Session not found or not running"
)
docker_host = os.getenv("DOCKER_HOST", "http://docker-daemon:2375")
parsed = urlparse(docker_host)
container_host = parsed.hostname or "docker-daemon"
container_url = f"http://{container_host}:{session.port}"
url = f"{container_url}/{path}"
if request.url.query:
url += f"?{request.url.query}"
body = await request.body()
headers = dict(request.headers)
headers.pop("host", None)
if session.auth_token:
headers["Authorization"] = f"Bearer {session.auth_token}"
headers["X-Session-Token"] = session.auth_token
headers["X-Session-ID"] = session.session_id
try:
log_session_operation(
session_id, "proxy_request", method=request.method, path=path
)
response = await make_http_request(
method=request.method,
url=url,
headers=headers,
content=body,
)
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
response.status_code,
duration_ms,
session_id=session_id,
operation="proxy_complete",
)
log_security_event(
"proxy_access",
"info",
session_id=session_id,
method=request.method,
path=path,
status_code=response.status_code,
)
content = response.content
response_headers = dict(response.headers)
content_type = response.headers.get("content-type", "")
if "text/html" in content_type:
try:
html = content.decode("utf-8")
session_prefix = f"/session/{session_id}"
html = re.sub(r'src="/', f'src="{session_prefix}/', html)
html = re.sub(r'href="/', f'href="{session_prefix}/', html)
html = re.sub(r'content="/', f'content="{session_prefix}/', html)
content = html.encode("utf-8")
response_headers["content-length"] = str(len(content))
except UnicodeDecodeError:
pass
resp = Response(
content=content,
status_code=response.status_code,
headers=response_headers,
)
resp.set_cookie(
key="lovdata_session",
value=session_id,
httponly=True,
samesite="lax",
max_age=86400,
)
return resp
except httpx.TimeoutException as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
504,
duration_ms,
session_id=session_id,
error="timeout",
)
log_security_event(
"proxy_timeout",
"warning",
session_id=session_id,
method=request.method,
path=path,
error=str(e),
)
raise HTTPException(
status_code=504, detail="Request to session container timed out"
)
except httpx.RequestError as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
request.method,
f"/session/{session_id}/{path}",
502,
duration_ms,
session_id=session_id,
error=str(e),
)
log_security_event(
"proxy_connection_error",
"error",
session_id=session_id,
method=request.method,
path=path,
error=str(e),
)
raise HTTPException(
status_code=502,
detail=f"Failed to connect to session container: {str(e)}",
)

View File

@@ -0,0 +1,121 @@
import time
from typing import List
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request
from models import SessionData
from session_manager import session_manager
from session_auth import revoke_session_auth_token
from logging_config import (
RequestContext,
log_performance,
log_request,
log_session_operation,
)
router = APIRouter(tags=["sessions"])
@router.post("/sessions", response_model=SessionData)
async def create_session(request: Request):
start_time = time.time()
with RequestContext():
try:
log_request("POST", "/sessions", 200, 0, operation="create_session_start")
session = await session_manager.create_session()
duration_ms = (time.time() - start_time) * 1000
log_session_operation(
session.session_id, "created", duration_ms=duration_ms
)
log_performance(
"create_session", duration_ms, session_id=session.session_id
)
return session
except HTTPException as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
"POST", "/sessions", e.status_code, duration_ms, error=str(e.detail)
)
raise
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
log_request("POST", "/sessions", 500, duration_ms, error=str(e))
raise HTTPException(
status_code=500, detail=f"Failed to create session: {str(e)}"
)
@router.get("/sessions/{session_id}", response_model=SessionData)
async def get_session(session_id: str, request: Request):
start_time = time.time()
with RequestContext():
try:
log_request(
"GET", f"/sessions/{session_id}", 200, 0, operation="get_session_start"
)
session = await session_manager.get_session(session_id)
if not session:
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET",
f"/sessions/{session_id}",
404,
duration_ms,
session_id=session_id,
)
raise HTTPException(status_code=404, detail="Session not found")
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET",
f"/sessions/{session_id}",
200,
duration_ms,
session_id=session_id,
)
log_session_operation(session_id, "accessed", duration_ms=duration_ms)
return session
except HTTPException:
raise
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
log_request(
"GET", f"/sessions/{session_id}", 500, duration_ms, error=str(e)
)
raise HTTPException(
status_code=500, detail=f"Failed to get session: {str(e)}"
)
@router.get("/sessions", response_model=List[SessionData])
async def list_sessions():
return await session_manager.list_sessions()
@router.delete("/sessions/{session_id}")
async def delete_session(session_id: str, background_tasks: BackgroundTasks):
session = await session_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
revoke_session_auth_token(session_id)
background_tasks.add_task(session_manager.cleanup_expired_sessions)
del session_manager.sessions[session_id]
session_manager._save_sessions()
return {"message": f"Session {session_id} scheduled for deletion"}
@router.post("/cleanup")
async def trigger_cleanup():
await session_manager.cleanup_expired_sessions()
return {"message": "Cleanup completed"}

View File

@@ -0,0 +1,412 @@
import os
import uuid
import json
import asyncio
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, List
from fastapi import HTTPException
from config import (
SESSIONS_DIR,
SESSIONS_FILE,
CONTAINER_IMAGE,
MAX_CONCURRENT_SESSIONS,
SESSION_TIMEOUT_MINUTES,
USE_ASYNC_DOCKER,
USE_DATABASE_STORAGE,
)
from models import SessionData
from docker_service import DockerService
from database import SessionModel
from resource_manager import get_resource_limits, should_throttle_sessions
from session_auth import generate_session_auth_token, cleanup_expired_auth_tokens
from logging_config import get_logger
logger = get_logger(__name__)
class SessionManager:
def __init__(self, docker_service: Optional[DockerService] = None):
if docker_service:
self.docker_service = docker_service
else:
self.docker_service = DockerService(use_async=USE_ASYNC_DOCKER)
if USE_DATABASE_STORAGE:
self.sessions: Dict[str, SessionData] = {}
logger.info("Session storage initialized", extra={"backend": "database"})
else:
self.sessions: Dict[str, SessionData] = {}
self._load_sessions_from_file()
logger.info("Session storage initialized", extra={"backend": "json_file"})
from container_health import get_container_health_monitor
self.health_monitor = get_container_health_monitor()
logger.info(
"SessionManager initialized",
extra={
"docker_service_type": type(self.docker_service).__name__,
"storage_backend": "database" if USE_DATABASE_STORAGE else "json_file",
},
)
def _load_sessions_from_file(self):
if SESSIONS_FILE.exists():
try:
with open(SESSIONS_FILE, "r") as f:
data = json.load(f)
for session_id, session_dict in data.items():
session_dict["created_at"] = datetime.fromisoformat(
session_dict["created_at"]
)
session_dict["last_accessed"] = datetime.fromisoformat(
session_dict["last_accessed"]
)
self.sessions[session_id] = SessionData(**session_dict)
logger.info(
"Sessions loaded from JSON file",
extra={"count": len(self.sessions)},
)
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Could not load sessions file", extra={"error": str(e)})
self.sessions = {}
async def _load_sessions_from_database(self):
try:
db_sessions = await SessionModel.get_sessions_by_status("running")
db_sessions.extend(await SessionModel.get_sessions_by_status("creating"))
self.sessions = {}
for session_dict in db_sessions:
session_data = SessionData(**session_dict)
self.sessions[session_dict["session_id"]] = session_data
logger.info(
"Sessions loaded from database", extra={"count": len(self.sessions)}
)
except Exception as e:
logger.error(
"Failed to load sessions from database", extra={"error": str(e)}
)
self.sessions = {}
def _save_sessions(self):
SESSIONS_DIR.mkdir(exist_ok=True)
data = {}
for session_id, session in self.sessions.items():
data[session_id] = session.dict()
with open(SESSIONS_FILE, "w") as f:
json.dump(data, f, indent=2, default=str)
def _generate_session_id(self) -> str:
return str(uuid.uuid4()).replace("-", "")[:16]
def _get_available_port(self) -> int:
used_ports = {s.port for s in self.sessions.values() if s.port}
port = 8081
while port in used_ports:
port += 1
return port
def _check_container_limits(self) -> bool:
active_sessions = sum(
1 for s in self.sessions.values() if s.status in ["creating", "running"]
)
return active_sessions < MAX_CONCURRENT_SESSIONS
async def _async_check_container_limits(self) -> bool:
return self._check_container_limits()
async def create_session(self) -> SessionData:
if USE_ASYNC_DOCKER:
limits_ok = await self._async_check_container_limits()
else:
limits_ok = self._check_container_limits()
if not limits_ok:
raise HTTPException(
status_code=429,
detail=f"Maximum concurrent sessions ({MAX_CONCURRENT_SESSIONS}) reached",
)
should_throttle, reason = should_throttle_sessions()
if should_throttle:
raise HTTPException(
status_code=503,
detail=f"System resource constraints prevent new sessions: {reason}",
)
session_id = self._generate_session_id()
container_name = f"opencode-{session_id}"
host_dir = str(SESSIONS_DIR / session_id)
port = self._get_available_port()
Path(host_dir).mkdir(parents=True, exist_ok=True)
auth_token = generate_session_auth_token(session_id)
session = SessionData(
session_id=session_id,
container_name=container_name,
host_dir=host_dir,
port=port,
auth_token=auth_token,
created_at=datetime.now(),
last_accessed=datetime.now(),
status="creating",
)
self.sessions[session_id] = session
if USE_DATABASE_STORAGE:
try:
await SessionModel.create_session(
{
"session_id": session_id,
"container_name": container_name,
"host_dir": host_dir,
"port": port,
"auth_token": auth_token,
"status": "creating",
}
)
logger.info(
"Session created in database", extra={"session_id": session_id}
)
except Exception as e:
logger.error(
"Failed to create session in database",
extra={"session_id": session_id, "error": str(e)},
)
if USE_ASYNC_DOCKER:
asyncio.create_task(self._start_container_async(session))
else:
asyncio.create_task(self._start_container_sync(session))
return session
async def _start_container_async(self, session: SessionData):
try:
resource_limits = get_resource_limits()
logger.info(
f"Starting container {session.container_name} with resource limits: "
f"memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}"
)
container_info = await self.docker_service.create_container(
name=session.container_name,
image=CONTAINER_IMAGE,
volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}},
ports={"8080": session.port},
environment={
"MCP_SERVER": os.getenv("MCP_SERVER", ""),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
"GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
"OPENCODE_API_KEY": os.getenv("ZEN_API_KEY", ""),
"SESSION_AUTH_TOKEN": session.auth_token or "",
"SESSION_ID": session.session_id,
},
network_mode="bridge",
mem_limit=resource_limits.memory_limit,
cpu_quota=resource_limits.cpu_quota,
cpu_period=resource_limits.cpu_period,
tmpfs={
"/tmp": "rw,noexec,nosuid,size=100m",
"/var/tmp": "rw,noexec,nosuid,size=50m",
},
)
await self.docker_service.start_container(container_info.container_id)
session.container_id = container_info.container_id
session.status = "running"
self.sessions[session.session_id] = session
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session.session_id,
{
"container_id": container_info.container_id,
"status": "running",
},
)
except Exception as e:
logger.error(
"Failed to update session in database",
extra={"session_id": session.session_id, "error": str(e)},
)
logger.info(
"Container started successfully",
extra={
"session_id": session.session_id,
"container_name": session.container_name,
"container_id": container_info.container_id,
"port": session.port,
},
)
except Exception as e:
session.status = "error"
self._save_sessions()
logger.error(f"Failed to start container {session.container_name}: {e}")
async def _start_container_sync(self, session: SessionData):
try:
resource_limits = get_resource_limits()
logger.info(
f"Starting container {session.container_name} with resource limits: "
f"memory={resource_limits.memory_limit}, cpu_quota={resource_limits.cpu_quota}"
)
container_info = await self.docker_service.create_container(
name=session.container_name,
image=CONTAINER_IMAGE,
volumes={session.host_dir: {"bind": "/app/somedir", "mode": "rw"}},
ports={"8080": session.port},
environment={
"MCP_SERVER": os.getenv("MCP_SERVER", ""),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""),
"GOOGLE_API_KEY": os.getenv("GOOGLE_API_KEY", ""),
"OPENCODE_API_KEY": os.getenv("ZEN_API_KEY", ""),
"SESSION_AUTH_TOKEN": session.auth_token or "",
"SESSION_ID": session.session_id,
},
network_mode="bridge",
mem_limit=resource_limits.memory_limit,
cpu_quota=resource_limits.cpu_quota,
cpu_period=resource_limits.cpu_period,
tmpfs={
"/tmp": "rw,noexec,nosuid,size=100m",
"/var/tmp": "rw,noexec,nosuid,size=50m",
},
)
session.container_id = container_info.container_id
session.status = "running"
self.sessions[session.session_id] = session
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session.session_id,
{
"container_id": container_info.container_id,
"status": "running",
},
)
except Exception as e:
logger.error(
"Failed to update session in database",
extra={"session_id": session.session_id, "error": str(e)},
)
logger.info(
"Container started successfully",
extra={
"session_id": session.session_id,
"container_name": session.container_name,
"container_id": container_info.container_id,
"port": session.port,
},
)
except Exception as e:
session.status = "error"
self._save_sessions()
logger.error(f"Failed to start container {session.container_name}: {e}")
async def get_session(self, session_id: str) -> Optional[SessionData]:
session = self.sessions.get(session_id)
if session:
session.last_accessed = datetime.now()
if USE_DATABASE_STORAGE:
try:
await SessionModel.update_session(
session_id, {"last_accessed": datetime.now()}
)
except Exception as e:
logger.warning(
"Failed to update session access time in database",
extra={"session_id": session_id, "error": str(e)},
)
return session
if USE_DATABASE_STORAGE:
try:
db_session = await SessionModel.get_session(session_id)
if db_session:
session_data = SessionData(**db_session)
self.sessions[session_id] = session_data
logger.debug(
"Session loaded from database", extra={"session_id": session_id}
)
return session_data
except Exception as e:
logger.error(
"Failed to load session from database",
extra={"session_id": session_id, "error": str(e)},
)
return None
async def list_sessions(self) -> List[SessionData]:
return list(self.sessions.values())
async def list_containers_async(self, all: bool = False) -> List:
return await self.docker_service.list_containers(all=all)
async def cleanup_expired_sessions(self):
now = datetime.now()
expired_sessions = []
for session_id, session in self.sessions.items():
if now - session.last_accessed > timedelta(minutes=SESSION_TIMEOUT_MINUTES):
expired_sessions.append(session_id)
try:
await self.docker_service.stop_container(
session.container_name, timeout=10
)
await self.docker_service.remove_container(session.container_name)
logger.info(f"Cleaned up container {session.container_name}")
except Exception as e:
logger.error(
f"Error cleaning up container {session.container_name}: {e}"
)
try:
shutil.rmtree(session.host_dir)
logger.info(f"Removed session directory {session.host_dir}")
except OSError as e:
logger.error(
f"Error removing session directory {session.host_dir}: {e}"
)
for session_id in expired_sessions:
del self.sessions[session_id]
if expired_sessions:
self._save_sessions()
logger.info(f"Cleaned up {len(expired_sessions)} expired sessions")
expired_tokens = cleanup_expired_auth_tokens()
if expired_tokens > 0:
logger.info(f"Cleaned up {expired_tokens} expired authentication tokens")
session_manager = SessionManager()