Files
lovdata-chat/session-manager/container_health.py
Torbjørn Lindahl 69d18cc494 fix: session stability improvements
- Fix docker client initialization bug in app.py (context manager was closing client)
- Add restart_session() method to preserve session IDs during container restarts
- Add 60-second startup grace period before health checking new sessions
- Fix _stop_container and _get_container_info to use docker_service API consistently
- Disable mDNS in Dockerfile to prevent Bonjour service name conflicts
- Remove old container before restart to free port bindings
2026-02-04 19:10:03 +01:00

594 lines
22 KiB
Python

"""
Container Health Monitoring System
Provides active monitoring of Docker containers with automatic failure detection,
recovery mechanisms, and integration with session management and alerting systems.
"""
import asyncio
import logging
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
from enum import Enum
from logging_config import get_logger, log_performance, log_security_event
logger = get_logger(__name__)
class ContainerStatus(Enum):
"""Container health status enumeration."""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
RESTARTING = "restarting"
FAILED = "failed"
UNKNOWN = "unknown"
class HealthCheckResult:
"""Result of a container health check."""
def __init__(
self,
session_id: str,
container_id: str,
status: ContainerStatus,
response_time: Optional[float] = None,
error_message: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.session_id = session_id
self.container_id = container_id
self.status = status
self.response_time = response_time
self.error_message = error_message
self.metadata = metadata or {}
self.timestamp = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for logging/serialization."""
return {
"session_id": self.session_id,
"container_id": self.container_id,
"status": self.status.value,
"response_time": self.response_time,
"error_message": self.error_message,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat(),
}
class ContainerHealthMonitor:
"""Monitors Docker container health and handles automatic recovery."""
def __init__(
self,
check_interval: int = 30, # seconds
health_timeout: float = 10.0, # seconds
max_restart_attempts: int = 3,
restart_delay: int = 5, # seconds
failure_threshold: int = 3, # consecutive failures before restart
):
self.check_interval = check_interval
self.health_timeout = health_timeout
self.max_restart_attempts = max_restart_attempts
self.restart_delay = restart_delay
self.failure_threshold = failure_threshold
# Monitoring state
self._monitoring = False
self._task: Optional[asyncio.Task] = None
self._health_history: Dict[str, List[HealthCheckResult]] = {}
self._restart_counts: Dict[str, int] = {}
# Dependencies (injected)
self.session_manager = None
self.docker_client = None
logger.info(
"Container health monitor initialized",
extra={
"check_interval": check_interval,
"health_timeout": health_timeout,
"max_restart_attempts": max_restart_attempts,
},
)
def set_dependencies(self, session_manager, docker_client):
"""Set dependencies for health monitoring."""
self.session_manager = session_manager
self.docker_client = docker_client
async def start_monitoring(self):
"""Start the health monitoring loop."""
if self._monitoring:
logger.warning("Health monitoring already running")
return
self._monitoring = True
self._task = asyncio.create_task(self._monitoring_loop())
logger.info("Container health monitoring started")
async def stop_monitoring(self):
"""Stop the health monitoring loop."""
if not self._monitoring:
return
self._monitoring = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Container health monitoring stopped")
async def _monitoring_loop(self):
"""Main monitoring loop."""
while self._monitoring:
try:
await self._check_all_containers()
await self._cleanup_old_history()
except Exception as e:
logger.error("Error in health monitoring loop", extra={"error": str(e)})
await asyncio.sleep(self.check_interval)
async def _check_all_containers(self):
"""Perform health checks on all running containers."""
if not self.session_manager:
return
from datetime import datetime, timedelta
# Startup grace period - don't check containers that started recently
startup_grace_period = timedelta(seconds=60)
now = datetime.now()
# Get all running sessions that are past the startup grace period
running_sessions = [
session
for session in self.session_manager.sessions.values()
if session.status == "running"
and (now - session.created_at) > startup_grace_period
]
if not running_sessions:
return
logger.debug(f"Checking health of {len(running_sessions)} running containers")
# Perform health checks concurrently
tasks = [self._check_container_health(session) for session in running_sessions]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(results):
session = running_sessions[i]
if isinstance(result, Exception):
logger.error(
"Health check failed",
extra={
"session_id": session.session_id,
"container_id": session.container_id,
"error": str(result),
},
)
continue
await self._process_health_result(result)
async def _check_container_health(self, session) -> HealthCheckResult:
"""Check the health of a single container."""
start_time = asyncio.get_event_loop().time()
try:
# Check if container exists and is running
if not session.container_id:
return HealthCheckResult(
session.session_id,
session.container_id or "unknown",
ContainerStatus.UNKNOWN,
error_message="No container ID",
)
# Get container status
container_info = await self._get_container_info(session.container_id)
if not container_info:
return HealthCheckResult(
session.session_id,
session.container_id,
ContainerStatus.FAILED,
error_message="Container not found",
)
# Check container state
state = container_info.get("State", {})
status = state.get("Status", "unknown")
if status != "running":
return HealthCheckResult(
session.session_id,
session.container_id,
ContainerStatus.FAILED,
error_message=f"Container status: {status}",
)
# Check health status if available
health = state.get("Health", {})
if health:
health_status = health.get("Status", "unknown")
if health_status == "healthy":
response_time = (
asyncio.get_event_loop().time() - start_time
) * 1000
return HealthCheckResult(
session.session_id,
session.container_id,
ContainerStatus.HEALTHY,
response_time=response_time,
metadata={
"docker_status": status,
"health_status": health_status,
},
)
elif health_status in ["unhealthy", "starting"]:
return HealthCheckResult(
session.session_id,
session.container_id,
ContainerStatus.UNHEALTHY,
error_message=f"Health check: {health_status}",
metadata={
"docker_status": status,
"health_status": health_status,
},
)
# If no health check configured, consider running containers healthy
response_time = (asyncio.get_event_loop().time() - start_time) * 1000
return HealthCheckResult(
session.session_id,
session.container_id,
ContainerStatus.HEALTHY,
response_time=response_time,
metadata={"docker_status": status},
)
except Exception as e:
response_time = (asyncio.get_event_loop().time() - start_time) * 1000
return HealthCheckResult(
session.session_id,
session.container_id or "unknown",
ContainerStatus.UNKNOWN,
response_time=response_time,
error_message=str(e),
)
async def _get_container_info(self, container_id: str) -> Optional[Dict[str, Any]]:
"""Get container information from Docker."""
try:
# Use session_manager.docker_service for consistent container access
if (
self.session_manager
and hasattr(self.session_manager, "docker_service")
and self.session_manager.docker_service
):
container_info = await self.session_manager.docker_service.get_container_info(container_id)
if container_info:
# Convert ContainerInfo to dict format expected by health check
return {
"State": {
"Status": container_info.status,
"Health": {"Status": container_info.health_status} if container_info.health_status else {}
}
}
elif self.docker_client and hasattr(self.docker_client, "get_container_info"):
container_info = await self.docker_client.get_container_info(container_id)
if container_info:
return {
"State": {
"Status": container_info.status,
"Health": {"Status": container_info.health_status} if container_info.health_status else {}
}
}
except Exception as e:
logger.debug(
f"Failed to get container info for {container_id}",
extra={"error": str(e)},
)
return None
async def _process_health_result(self, result: HealthCheckResult):
"""Process a health check result and take appropriate action."""
# Store result in history
if result.session_id not in self._health_history:
self._health_history[result.session_id] = []
self._health_history[result.session_id].append(result)
# Keep only recent history (last 10 checks)
if len(self._health_history[result.session_id]) > 10:
self._health_history[result.session_id] = self._health_history[
result.session_id
][-10:]
# Log result
log_extra = result.to_dict()
if result.status == ContainerStatus.HEALTHY:
logger.debug("Container health check passed", extra=log_extra)
elif result.status == ContainerStatus.UNHEALTHY:
logger.warning("Container health check failed", extra=log_extra)
elif result.status in [ContainerStatus.FAILED, ContainerStatus.UNKNOWN]:
logger.error("Container health check critical", extra=log_extra)
# Check if restart is needed
await self._check_restart_needed(result)
async def _check_restart_needed(self, result: HealthCheckResult):
"""Check if a container needs to be restarted based on health history."""
if result.status == ContainerStatus.HEALTHY:
# Reset restart count on successful health check
if result.session_id in self._restart_counts:
self._restart_counts[result.session_id] = 0
return
# Count recent failures
recent_results = self._health_history.get(result.session_id, [])
recent_failures = sum(
1
for r in recent_results[-self.failure_threshold :]
if r.status
in [
ContainerStatus.UNHEALTHY,
ContainerStatus.FAILED,
ContainerStatus.UNKNOWN,
]
)
if recent_failures >= self.failure_threshold:
await self._restart_container(result.session_id, result.container_id)
async def _restart_container(self, session_id: str, container_id: str):
"""Restart a failed container."""
# Check restart limit
restart_count = self._restart_counts.get(session_id, 0)
if restart_count >= self.max_restart_attempts:
logger.error(
"Container restart limit exceeded",
extra={
"session_id": session_id,
"container_id": container_id,
"restart_attempts": restart_count,
},
)
# Mark session as failed
await self._mark_session_failed(
session_id, f"Restart limit exceeded ({restart_count} attempts)"
)
return
logger.info(
"Attempting container restart",
extra={
"session_id": session_id,
"container_id": container_id,
"restart_attempt": restart_count + 1,
},
)
try:
# Stop the container
await self._stop_container(container_id)
# Wait before restart
await asyncio.sleep(self.restart_delay)
# Start new container for the session
session = await self.session_manager.get_session(session_id)
if session:
# Update restart count
self._restart_counts[session_id] = restart_count + 1
# Mark as restarting
await self._update_session_status(session_id, "restarting")
# Trigger container restart through session manager
if self.session_manager:
# Restart container for the SAME session (preserves session_id)
await self.session_manager.restart_session(session_id)
logger.info(
"Container restart initiated",
extra={
"session_id": session_id,
"restart_attempt": restart_count + 1,
},
)
# Log security event
log_security_event(
"container_restart",
"warning",
{
"session_id": session_id,
"container_id": container_id,
"reason": "health_check_failure",
},
)
except Exception as e:
logger.error(
"Container restart failed",
extra={
"session_id": session_id,
"container_id": container_id,
"error": str(e),
},
)
async def _stop_container(self, container_id: str):
"""Stop a container."""
try:
# Use session_manager.docker_service for container operations
# docker_service.stop_container takes container_id as a string
if (
self.session_manager
and hasattr(self.session_manager, "docker_service")
and self.session_manager.docker_service
):
await self.session_manager.docker_service.stop_container(container_id, timeout=10)
elif self.docker_client and hasattr(self.docker_client, "stop_container"):
# If docker_client is docker_service, use it directly
await self.docker_client.stop_container(container_id, timeout=10)
else:
logger.warning(
"No docker client available to stop container",
extra={"container_id": container_id},
)
except Exception as e:
logger.warning(
"Failed to stop container during restart",
extra={"container_id": container_id, "error": str(e)},
)
async def _update_session_status(self, session_id: str, status: str):
"""Update session status."""
if self.session_manager:
session = self.session_manager.sessions.get(session_id)
if session:
session.status = status
# Update in database if using database storage
if (
hasattr(self.session_manager, "USE_DATABASE_STORAGE")
and self.session_manager.USE_DATABASE_STORAGE
):
try:
from database import SessionModel
await SessionModel.update_session(
session_id, {"status": status}
)
except Exception as e:
logger.warning(
"Failed to update session status in database",
extra={"session_id": session_id, "error": str(e)},
)
async def _mark_session_failed(self, session_id: str, reason: str):
"""Mark a session as permanently failed."""
await self._update_session_status(session_id, "failed")
logger.error(
"Session marked as failed",
extra={"session_id": session_id, "reason": reason},
)
# Log security event
log_security_event(
"session_failure", "error", {"session_id": session_id, "reason": reason}
)
async def _cleanup_old_history(self):
"""Clean up old health check history."""
cutoff_time = datetime.utcnow() - timedelta(hours=1) # Keep last hour
for session_id in list(self._health_history.keys()):
# Remove old results
self._health_history[session_id] = [
result
for result in self._health_history[session_id]
if result.timestamp > cutoff_time
]
# Remove empty histories
if not self._health_history[session_id]:
del self._health_history[session_id]
def get_health_stats(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Get health monitoring statistics."""
stats = {
"monitoring_active": self._monitoring,
"check_interval": self.check_interval,
"total_sessions_monitored": len(self._health_history),
"sessions_with_failures": len(
[
sid
for sid, history in self._health_history.items()
if any(
r.status != ContainerStatus.HEALTHY for r in history[-5:]
) # Last 5 checks
]
),
"restart_counts": dict(self._restart_counts),
}
if session_id and session_id in self._health_history:
recent_results = self._health_history[session_id][-10:] # Last 10 checks
stats[f"session_{session_id}"] = {
"total_checks": len(recent_results),
"healthy_checks": sum(
1 for r in recent_results if r.status == ContainerStatus.HEALTHY
),
"failed_checks": sum(
1 for r in recent_results if r.status != ContainerStatus.HEALTHY
),
"average_response_time": sum(
r.response_time or 0 for r in recent_results if r.response_time
)
/ max(1, sum(1 for r in recent_results if r.response_time)),
"last_check": recent_results[-1].to_dict() if recent_results else None,
}
return stats
def get_health_history(
self, session_id: str, limit: int = 50
) -> List[Dict[str, Any]]:
"""Get health check history for a session."""
if session_id not in self._health_history:
return []
return [
result.to_dict() for result in self._health_history[session_id][-limit:]
]
# Global health monitor instance
_container_health_monitor = ContainerHealthMonitor()
def get_container_health_monitor() -> ContainerHealthMonitor:
"""Get the global container health monitor instance."""
return _container_health_monitor
async def start_container_health_monitoring(session_manager=None, docker_client=None):
"""Start container health monitoring."""
monitor = get_container_health_monitor()
if session_manager:
monitor.set_dependencies(session_manager, docker_client)
await monitor.start_monitoring()
async def stop_container_health_monitoring():
"""Stop container health monitoring."""
monitor = get_container_health_monitor()
await monitor.stop_monitoring()
def get_container_health_stats(session_id: Optional[str] = None) -> Dict[str, Any]:
"""Get container health statistics."""
monitor = get_container_health_monitor()
return monitor.get_health_stats(session_id)
def get_container_health_history(
session_id: str, limit: int = 50
) -> List[Dict[str, Any]]:
"""Get container health check history."""
monitor = get_container_health_monitor()
return monitor.get_health_history(session_id, limit)