249 lines
8.4 KiB
Python
249 lines
8.4 KiB
Python
"""
|
|
Resource Management and Monitoring Utilities
|
|
|
|
Provides validation, enforcement, and monitoring of container resource limits
|
|
to prevent resource exhaustion attacks and ensure fair resource allocation.
|
|
"""
|
|
|
|
import os
|
|
import psutil
|
|
import logging
|
|
from typing import Dict, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ResourceLimits:
|
|
"""Container resource limits configuration."""
|
|
|
|
memory_limit: str # e.g., "4g", "512m"
|
|
cpu_quota: int # CPU quota in microseconds
|
|
cpu_period: int # CPU period in microseconds
|
|
|
|
def validate(self) -> Tuple[bool, str]:
|
|
"""Validate resource limits configuration."""
|
|
# Validate memory limit format
|
|
memory_limit = self.memory_limit.lower()
|
|
if not (memory_limit.endswith(("g", "m", "k")) or memory_limit.isdigit()):
|
|
return (
|
|
False,
|
|
f"Invalid memory limit format: {self.memory_limit}. Use format like '4g', '512m', '256k'",
|
|
)
|
|
|
|
# Validate CPU quota and period
|
|
if self.cpu_quota <= 0:
|
|
return False, f"CPU quota must be positive, got {self.cpu_quota}"
|
|
if self.cpu_period <= 0:
|
|
return False, f"CPU period must be positive, got {self.cpu_period}"
|
|
if self.cpu_quota > self.cpu_period:
|
|
return (
|
|
False,
|
|
f"CPU quota ({self.cpu_quota}) cannot exceed CPU period ({self.cpu_period})",
|
|
)
|
|
|
|
return True, "Valid"
|
|
|
|
def to_docker_limits(self) -> Dict[str, any]:
|
|
"""Convert to Docker container limits format."""
|
|
return {
|
|
"mem_limit": self.memory_limit,
|
|
"cpu_quota": self.cpu_quota,
|
|
"cpu_period": self.cpu_period,
|
|
}
|
|
|
|
|
|
class ResourceMonitor:
|
|
"""Monitor system and container resource usage."""
|
|
|
|
def __init__(self):
|
|
self._last_check = datetime.now()
|
|
self._alerts_sent = set() # Track alerts to prevent spam
|
|
|
|
def get_system_resources(self) -> Dict[str, any]:
|
|
"""Get current system resource usage."""
|
|
try:
|
|
memory = psutil.virtual_memory()
|
|
cpu = psutil.cpu_percent(interval=1)
|
|
|
|
return {
|
|
"memory_percent": memory.percent / 100.0,
|
|
"memory_used_gb": memory.used / (1024**3),
|
|
"memory_total_gb": memory.total / (1024**3),
|
|
"cpu_percent": cpu / 100.0,
|
|
"cpu_count": psutil.cpu_count(),
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get system resources: {e}")
|
|
return {}
|
|
|
|
def check_resource_limits(
|
|
self, limits: ResourceLimits, warning_thresholds: Dict[str, float]
|
|
) -> Dict[str, any]:
|
|
"""Check if system resources are approaching limits."""
|
|
system_resources = self.get_system_resources()
|
|
alerts = []
|
|
|
|
# Check memory usage
|
|
memory_usage = system_resources.get("memory_percent", 0)
|
|
memory_threshold = warning_thresholds.get("memory", 0.8)
|
|
|
|
if memory_usage >= memory_threshold:
|
|
alerts.append(
|
|
{
|
|
"type": "memory",
|
|
"level": "warning" if memory_usage < 0.95 else "critical",
|
|
"message": f"System memory usage at {memory_usage:.1%}",
|
|
"current": memory_usage,
|
|
"threshold": memory_threshold,
|
|
}
|
|
)
|
|
|
|
# Check CPU usage
|
|
cpu_usage = system_resources.get("cpu_percent", 0)
|
|
cpu_threshold = warning_thresholds.get("cpu", 0.9)
|
|
|
|
if cpu_usage >= cpu_threshold:
|
|
alerts.append(
|
|
{
|
|
"type": "cpu",
|
|
"level": "warning" if cpu_usage < 0.95 else "critical",
|
|
"message": f"System CPU usage at {cpu_usage:.1%}",
|
|
"current": cpu_usage,
|
|
"threshold": cpu_threshold,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"system_resources": system_resources,
|
|
"alerts": alerts,
|
|
"timestamp": datetime.now(),
|
|
}
|
|
|
|
def should_throttle_sessions(self, resource_check: Dict) -> Tuple[bool, str]:
|
|
"""Determine if new sessions should be throttled based on resource usage."""
|
|
alerts = resource_check.get("alerts", [])
|
|
|
|
# Critical alerts always throttle
|
|
critical_alerts = [a for a in alerts if a["level"] == "critical"]
|
|
if critical_alerts:
|
|
return (
|
|
True,
|
|
f"Critical resource usage: {[a['message'] for a in critical_alerts]}",
|
|
)
|
|
|
|
# Multiple warnings also throttle
|
|
warning_alerts = [a for a in alerts if a["level"] == "warning"]
|
|
if len(warning_alerts) >= 2:
|
|
return (
|
|
True,
|
|
f"Multiple resource warnings: {[a['message'] for a in warning_alerts]}",
|
|
)
|
|
|
|
return False, "Resources OK"
|
|
|
|
|
|
class ResourceValidator:
|
|
"""Validate and parse resource limit configurations."""
|
|
|
|
@staticmethod
|
|
def parse_memory_limit(memory_str: str) -> Tuple[int, str]:
|
|
"""Parse memory limit string and return bytes."""
|
|
if not memory_str:
|
|
raise ValueError("Memory limit cannot be empty")
|
|
|
|
memory_str = memory_str.lower().strip()
|
|
|
|
# Handle different units
|
|
if memory_str.endswith("g"):
|
|
bytes_val = int(memory_str[:-1]) * (1024**3)
|
|
unit = "GB"
|
|
elif memory_str.endswith("m"):
|
|
bytes_val = int(memory_str[:-1]) * (1024**2)
|
|
unit = "MB"
|
|
elif memory_str.endswith("k"):
|
|
bytes_val = int(memory_str[:-1]) * 1024
|
|
unit = "KB"
|
|
else:
|
|
# Assume bytes if no unit
|
|
bytes_val = int(memory_str)
|
|
unit = "bytes"
|
|
|
|
if bytes_val <= 0:
|
|
raise ValueError(f"Memory limit must be positive, got {bytes_val}")
|
|
|
|
# Reasonable limits check
|
|
if bytes_val > 32 * (1024**3): # 32GB
|
|
logger.warning(f"Very high memory limit: {bytes_val} bytes")
|
|
|
|
return bytes_val, unit
|
|
|
|
@staticmethod
|
|
def validate_resource_config(
|
|
config: Dict[str, any],
|
|
) -> Tuple[bool, str, Optional[ResourceLimits]]:
|
|
"""Validate complete resource configuration."""
|
|
try:
|
|
limits = ResourceLimits(
|
|
memory_limit=config.get("memory_limit", "4g"),
|
|
cpu_quota=config.get("cpu_quota", 100000),
|
|
cpu_period=config.get("cpu_period", 100000),
|
|
)
|
|
|
|
valid, message = limits.validate()
|
|
if not valid:
|
|
return False, message, None
|
|
|
|
# Additional validation
|
|
memory_bytes, _ = ResourceValidator.parse_memory_limit(limits.memory_limit)
|
|
|
|
# Warn about potentially problematic configurations
|
|
if memory_bytes < 128 * (1024**2): # Less than 128MB
|
|
logger.warning("Very low memory limit may cause container instability")
|
|
|
|
return True, "Configuration valid", limits
|
|
|
|
except (ValueError, TypeError) as e:
|
|
return False, f"Invalid configuration: {e}", None
|
|
|
|
|
|
# Global instances
|
|
resource_monitor = ResourceMonitor()
|
|
|
|
|
|
def get_resource_limits() -> ResourceLimits:
|
|
"""Get validated resource limits from environment."""
|
|
config = {
|
|
"memory_limit": os.getenv("CONTAINER_MEMORY_LIMIT", "4g"),
|
|
"cpu_quota": int(os.getenv("CONTAINER_CPU_QUOTA", "100000")),
|
|
"cpu_period": int(os.getenv("CONTAINER_CPU_PERIOD", "100000")),
|
|
}
|
|
|
|
valid, message, limits = ResourceValidator.validate_resource_config(config)
|
|
if not valid or limits is None:
|
|
raise ValueError(f"Resource configuration error: {message}")
|
|
|
|
logger.info(
|
|
f"Using resource limits: memory={limits.memory_limit}, cpu_quota={limits.cpu_quota}"
|
|
)
|
|
return limits
|
|
|
|
|
|
def check_system_resources() -> Dict[str, any]:
|
|
"""Check current system resource status."""
|
|
limits = get_resource_limits()
|
|
warning_thresholds = {
|
|
"memory": float(os.getenv("MEMORY_WARNING_THRESHOLD", "0.8")),
|
|
"cpu": float(os.getenv("CPU_WARNING_THRESHOLD", "0.9")),
|
|
}
|
|
|
|
return resource_monitor.check_resource_limits(limits, warning_thresholds)
|
|
|
|
|
|
def should_throttle_sessions() -> Tuple[bool, str]:
|
|
"""Check if new sessions should be throttled due to resource constraints."""
|
|
resource_check = check_system_resources()
|
|
return resource_monitor.should_throttle_sessions(resource_check)
|