""" Container Health Monitoring System Provides active monitoring of Docker containers with automatic failure detection, recovery mechanisms, and integration with session management and alerting systems. """ import asyncio import logging from typing import Dict, List, Optional, Tuple, Any from datetime import datetime, timedelta from enum import Enum from logging_config import get_logger, log_performance, log_security_event logger = get_logger(__name__) class ContainerStatus(Enum): """Container health status enumeration.""" HEALTHY = "healthy" UNHEALTHY = "unhealthy" RESTARTING = "restarting" FAILED = "failed" UNKNOWN = "unknown" class HealthCheckResult: """Result of a container health check.""" def __init__( self, session_id: str, container_id: str, status: ContainerStatus, response_time: Optional[float] = None, error_message: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ): self.session_id = session_id self.container_id = container_id self.status = status self.response_time = response_time self.error_message = error_message self.metadata = metadata or {} self.timestamp = datetime.utcnow() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for logging/serialization.""" return { "session_id": self.session_id, "container_id": self.container_id, "status": self.status.value, "response_time": self.response_time, "error_message": self.error_message, "metadata": self.metadata, "timestamp": self.timestamp.isoformat(), } class ContainerHealthMonitor: """Monitors Docker container health and handles automatic recovery.""" def __init__( self, check_interval: int = 30, # seconds health_timeout: float = 10.0, # seconds max_restart_attempts: int = 3, restart_delay: int = 5, # seconds failure_threshold: int = 3, # consecutive failures before restart ): self.check_interval = check_interval self.health_timeout = health_timeout self.max_restart_attempts = max_restart_attempts self.restart_delay = restart_delay self.failure_threshold = failure_threshold # Monitoring state self._monitoring = False self._task: Optional[asyncio.Task] = None self._health_history: Dict[str, List[HealthCheckResult]] = {} self._restart_counts: Dict[str, int] = {} # Dependencies (injected) self.session_manager = None self.docker_client = None logger.info( "Container health monitor initialized", extra={ "check_interval": check_interval, "health_timeout": health_timeout, "max_restart_attempts": max_restart_attempts, }, ) def set_dependencies(self, session_manager, docker_client): """Set dependencies for health monitoring.""" self.session_manager = session_manager self.docker_client = docker_client async def start_monitoring(self): """Start the health monitoring loop.""" if self._monitoring: logger.warning("Health monitoring already running") return self._monitoring = True self._task = asyncio.create_task(self._monitoring_loop()) logger.info("Container health monitoring started") async def stop_monitoring(self): """Stop the health monitoring loop.""" if not self._monitoring: return self._monitoring = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("Container health monitoring stopped") async def _monitoring_loop(self): """Main monitoring loop.""" while self._monitoring: try: await self._check_all_containers() await self._cleanup_old_history() except Exception as e: logger.error("Error in health monitoring loop", extra={"error": str(e)}) await asyncio.sleep(self.check_interval) async def _check_all_containers(self): """Perform health checks on all running containers.""" if not self.session_manager: return from datetime import datetime, timedelta # Startup grace period - don't check containers that started recently startup_grace_period = timedelta(seconds=60) now = datetime.now() # Get all running sessions that are past the startup grace period running_sessions = [ session for session in self.session_manager.sessions.values() if session.status == "running" and (now - session.created_at) > startup_grace_period ] if not running_sessions: return logger.debug(f"Checking health of {len(running_sessions)} running containers") # Perform health checks concurrently tasks = [self._check_container_health(session) for session in running_sessions] results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for i, result in enumerate(results): session = running_sessions[i] if isinstance(result, Exception): logger.error( "Health check failed", extra={ "session_id": session.session_id, "container_id": session.container_id, "error": str(result), }, ) continue await self._process_health_result(result) async def _check_container_health(self, session) -> HealthCheckResult: """Check the health of a single container.""" start_time = asyncio.get_event_loop().time() try: # Check if container exists and is running if not session.container_id: return HealthCheckResult( session.session_id, session.container_id or "unknown", ContainerStatus.UNKNOWN, error_message="No container ID", ) # Get container status container_info = await self._get_container_info(session.container_id) if not container_info: return HealthCheckResult( session.session_id, session.container_id, ContainerStatus.FAILED, error_message="Container not found", ) # Check container state state = container_info.get("State", {}) status = state.get("Status", "unknown") if status != "running": return HealthCheckResult( session.session_id, session.container_id, ContainerStatus.FAILED, error_message=f"Container status: {status}", ) # Check health status if available health = state.get("Health", {}) if health: health_status = health.get("Status", "unknown") if health_status == "healthy": response_time = ( asyncio.get_event_loop().time() - start_time ) * 1000 return HealthCheckResult( session.session_id, session.container_id, ContainerStatus.HEALTHY, response_time=response_time, metadata={ "docker_status": status, "health_status": health_status, }, ) elif health_status in ["unhealthy", "starting"]: return HealthCheckResult( session.session_id, session.container_id, ContainerStatus.UNHEALTHY, error_message=f"Health check: {health_status}", metadata={ "docker_status": status, "health_status": health_status, }, ) # If no health check configured, consider running containers healthy response_time = (asyncio.get_event_loop().time() - start_time) * 1000 return HealthCheckResult( session.session_id, session.container_id, ContainerStatus.HEALTHY, response_time=response_time, metadata={"docker_status": status}, ) except Exception as e: response_time = (asyncio.get_event_loop().time() - start_time) * 1000 return HealthCheckResult( session.session_id, session.container_id or "unknown", ContainerStatus.UNKNOWN, response_time=response_time, error_message=str(e), ) async def _get_container_info(self, container_id: str) -> Optional[Dict[str, Any]]: """Get container information from Docker.""" try: # Use session_manager.docker_service for consistent container access if ( self.session_manager and hasattr(self.session_manager, "docker_service") and self.session_manager.docker_service ): container_info = await self.session_manager.docker_service.get_container_info(container_id) if container_info: # Convert ContainerInfo to dict format expected by health check return { "State": { "Status": container_info.status, "Health": {"Status": container_info.health_status} if container_info.health_status else {} } } elif self.docker_client and hasattr(self.docker_client, "get_container_info"): container_info = await self.docker_client.get_container_info(container_id) if container_info: return { "State": { "Status": container_info.status, "Health": {"Status": container_info.health_status} if container_info.health_status else {} } } except Exception as e: logger.debug( f"Failed to get container info for {container_id}", extra={"error": str(e)}, ) return None async def _process_health_result(self, result: HealthCheckResult): """Process a health check result and take appropriate action.""" # Store result in history if result.session_id not in self._health_history: self._health_history[result.session_id] = [] self._health_history[result.session_id].append(result) # Keep only recent history (last 10 checks) if len(self._health_history[result.session_id]) > 10: self._health_history[result.session_id] = self._health_history[ result.session_id ][-10:] # Log result log_extra = result.to_dict() if result.status == ContainerStatus.HEALTHY: logger.debug("Container health check passed", extra=log_extra) elif result.status == ContainerStatus.UNHEALTHY: logger.warning("Container health check failed", extra=log_extra) elif result.status in [ContainerStatus.FAILED, ContainerStatus.UNKNOWN]: logger.error("Container health check critical", extra=log_extra) # Check if restart is needed await self._check_restart_needed(result) async def _check_restart_needed(self, result: HealthCheckResult): """Check if a container needs to be restarted based on health history.""" if result.status == ContainerStatus.HEALTHY: # Reset restart count on successful health check if result.session_id in self._restart_counts: self._restart_counts[result.session_id] = 0 return # Count recent failures recent_results = self._health_history.get(result.session_id, []) recent_failures = sum( 1 for r in recent_results[-self.failure_threshold :] if r.status in [ ContainerStatus.UNHEALTHY, ContainerStatus.FAILED, ContainerStatus.UNKNOWN, ] ) if recent_failures >= self.failure_threshold: await self._restart_container(result.session_id, result.container_id) async def _restart_container(self, session_id: str, container_id: str): """Restart a failed container.""" # Check restart limit restart_count = self._restart_counts.get(session_id, 0) if restart_count >= self.max_restart_attempts: logger.error( "Container restart limit exceeded", extra={ "session_id": session_id, "container_id": container_id, "restart_attempts": restart_count, }, ) # Mark session as failed await self._mark_session_failed( session_id, f"Restart limit exceeded ({restart_count} attempts)" ) return logger.info( "Attempting container restart", extra={ "session_id": session_id, "container_id": container_id, "restart_attempt": restart_count + 1, }, ) try: # Stop the container await self._stop_container(container_id) # Wait before restart await asyncio.sleep(self.restart_delay) # Start new container for the session session = await self.session_manager.get_session(session_id) if session: # Update restart count self._restart_counts[session_id] = restart_count + 1 # Mark as restarting await self._update_session_status(session_id, "restarting") # Trigger container restart through session manager if self.session_manager: # Restart container for the SAME session (preserves session_id) await self.session_manager.restart_session(session_id) logger.info( "Container restart initiated", extra={ "session_id": session_id, "restart_attempt": restart_count + 1, }, ) # Log security event log_security_event( "container_restart", "warning", { "session_id": session_id, "container_id": container_id, "reason": "health_check_failure", }, ) except Exception as e: logger.error( "Container restart failed", extra={ "session_id": session_id, "container_id": container_id, "error": str(e), }, ) async def _stop_container(self, container_id: str): """Stop a container.""" try: # Use session_manager.docker_service for container operations # docker_service.stop_container takes container_id as a string if ( self.session_manager and hasattr(self.session_manager, "docker_service") and self.session_manager.docker_service ): await self.session_manager.docker_service.stop_container(container_id, timeout=10) elif self.docker_client and hasattr(self.docker_client, "stop_container"): # If docker_client is docker_service, use it directly await self.docker_client.stop_container(container_id, timeout=10) else: logger.warning( "No docker client available to stop container", extra={"container_id": container_id}, ) except Exception as e: logger.warning( "Failed to stop container during restart", extra={"container_id": container_id, "error": str(e)}, ) async def _update_session_status(self, session_id: str, status: str): """Update session status.""" if self.session_manager: session = self.session_manager.sessions.get(session_id) if session: session.status = status # Update in database if using database storage if ( hasattr(self.session_manager, "USE_DATABASE_STORAGE") and self.session_manager.USE_DATABASE_STORAGE ): try: from database import SessionModel await SessionModel.update_session( session_id, {"status": status} ) except Exception as e: logger.warning( "Failed to update session status in database", extra={"session_id": session_id, "error": str(e)}, ) async def _mark_session_failed(self, session_id: str, reason: str): """Mark a session as permanently failed.""" await self._update_session_status(session_id, "failed") logger.error( "Session marked as failed", extra={"session_id": session_id, "reason": reason}, ) # Log security event log_security_event( "session_failure", "error", {"session_id": session_id, "reason": reason} ) async def _cleanup_old_history(self): """Clean up old health check history.""" cutoff_time = datetime.utcnow() - timedelta(hours=1) # Keep last hour for session_id in list(self._health_history.keys()): # Remove old results self._health_history[session_id] = [ result for result in self._health_history[session_id] if result.timestamp > cutoff_time ] # Remove empty histories if not self._health_history[session_id]: del self._health_history[session_id] def get_health_stats(self, session_id: Optional[str] = None) -> Dict[str, Any]: """Get health monitoring statistics.""" stats = { "monitoring_active": self._monitoring, "check_interval": self.check_interval, "total_sessions_monitored": len(self._health_history), "sessions_with_failures": len( [ sid for sid, history in self._health_history.items() if any( r.status != ContainerStatus.HEALTHY for r in history[-5:] ) # Last 5 checks ] ), "restart_counts": dict(self._restart_counts), } if session_id and session_id in self._health_history: recent_results = self._health_history[session_id][-10:] # Last 10 checks stats[f"session_{session_id}"] = { "total_checks": len(recent_results), "healthy_checks": sum( 1 for r in recent_results if r.status == ContainerStatus.HEALTHY ), "failed_checks": sum( 1 for r in recent_results if r.status != ContainerStatus.HEALTHY ), "average_response_time": sum( r.response_time or 0 for r in recent_results if r.response_time ) / max(1, sum(1 for r in recent_results if r.response_time)), "last_check": recent_results[-1].to_dict() if recent_results else None, } return stats def get_health_history( self, session_id: str, limit: int = 50 ) -> List[Dict[str, Any]]: """Get health check history for a session.""" if session_id not in self._health_history: return [] return [ result.to_dict() for result in self._health_history[session_id][-limit:] ] # Global health monitor instance _container_health_monitor = ContainerHealthMonitor() def get_container_health_monitor() -> ContainerHealthMonitor: """Get the global container health monitor instance.""" return _container_health_monitor async def start_container_health_monitoring(session_manager=None, docker_client=None): """Start container health monitoring.""" monitor = get_container_health_monitor() if session_manager: monitor.set_dependencies(session_manager, docker_client) await monitor.start_monitoring() async def stop_container_health_monitoring(): """Stop container health monitoring.""" monitor = get_container_health_monitor() await monitor.stop_monitoring() def get_container_health_stats(session_id: Optional[str] = None) -> Dict[str, Any]: """Get container health statistics.""" monitor = get_container_health_monitor() return monitor.get_health_stats(session_id) def get_container_health_history( session_id: str, limit: int = 50 ) -> List[Dict[str, Any]]: """Get container health check history.""" monitor = get_container_health_monitor() return monitor.get_health_history(session_id, limit)