636 lines
21 KiB
Python
636 lines
21 KiB
Python
"""
|
|
Docker Service Layer
|
|
|
|
Provides a clean abstraction for Docker operations, separating container management
|
|
from business logic. Enables easy testing, mocking, and future Docker client changes.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from datetime import datetime
|
|
|
|
from logging_config import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class ContainerInfo:
|
|
"""Container information data structure."""
|
|
|
|
def __init__(
|
|
self,
|
|
container_id: str,
|
|
name: str,
|
|
image: str,
|
|
status: str,
|
|
ports: Optional[Dict[str, int]] = None,
|
|
created_at: Optional[datetime] = None,
|
|
health_status: Optional[str] = None,
|
|
):
|
|
self.container_id = container_id
|
|
self.name = name
|
|
self.image = image
|
|
self.status = status
|
|
self.ports = ports or {}
|
|
self.created_at = created_at or datetime.utcnow()
|
|
self.health_status = health_status
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"container_id": self.container_id,
|
|
"name": self.name,
|
|
"image": self.image,
|
|
"status": self.status,
|
|
"ports": self.ports,
|
|
"created_at": self.created_at.isoformat() if self.created_at else None,
|
|
"health_status": self.health_status,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "ContainerInfo":
|
|
"""Create from dictionary."""
|
|
return cls(
|
|
container_id=data["container_id"],
|
|
name=data["name"],
|
|
image=data["image"],
|
|
status=data["status"],
|
|
ports=data.get("ports", {}),
|
|
created_at=datetime.fromisoformat(data["created_at"])
|
|
if data.get("created_at")
|
|
else None,
|
|
health_status=data.get("health_status"),
|
|
)
|
|
|
|
|
|
class DockerOperationError(Exception):
|
|
"""Docker operation error."""
|
|
|
|
def __init__(
|
|
self, operation: str, container_id: Optional[str] = None, message: str = ""
|
|
):
|
|
self.operation = operation
|
|
self.container_id = container_id
|
|
self.message = message
|
|
super().__init__(f"Docker {operation} failed: {message}")
|
|
|
|
|
|
class DockerService:
|
|
"""
|
|
Docker service abstraction layer.
|
|
|
|
Provides a clean interface for container operations,
|
|
enabling easy testing and future Docker client changes.
|
|
"""
|
|
|
|
def __init__(self, use_async: bool = True):
|
|
"""
|
|
Initialize Docker service.
|
|
|
|
Args:
|
|
use_async: Whether to use async Docker operations
|
|
"""
|
|
self.use_async = use_async
|
|
self._docker_client = None
|
|
self._initialized = False
|
|
|
|
logger.info("Docker service initialized", extra={"async_mode": use_async})
|
|
|
|
async def initialize(self) -> None:
|
|
"""Initialize the Docker client connection."""
|
|
if self._initialized:
|
|
return
|
|
|
|
try:
|
|
if self.use_async:
|
|
# Initialize async Docker client
|
|
from async_docker_client import AsyncDockerClient
|
|
|
|
self._docker_client = AsyncDockerClient()
|
|
await self._docker_client.connect()
|
|
else:
|
|
# Initialize sync Docker client
|
|
import docker
|
|
|
|
tls_config = docker.tls.TLSConfig(
|
|
ca_cert=os.getenv("DOCKER_CA_CERT", "/etc/docker/certs/ca.pem"),
|
|
client_cert=(
|
|
os.getenv(
|
|
"DOCKER_CLIENT_CERT", "/etc/docker/certs/client-cert.pem"
|
|
),
|
|
os.getenv(
|
|
"DOCKER_CLIENT_KEY", "/etc/docker/certs/client-key.pem"
|
|
),
|
|
),
|
|
verify=True,
|
|
)
|
|
docker_host = os.getenv(
|
|
"DOCKER_HOST", "tcp://host.docker.internal:2376"
|
|
)
|
|
self._docker_client = docker.from_env()
|
|
self._docker_client.api = docker.APIClient(
|
|
base_url=docker_host, tls=tls_config, version="auto"
|
|
)
|
|
# Test connection
|
|
self._docker_client.ping()
|
|
|
|
self._initialized = True
|
|
logger.info("Docker service connection established")
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to initialize Docker service", extra={"error": str(e)})
|
|
raise DockerOperationError("initialization", message=str(e))
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Shutdown the Docker client connection."""
|
|
if not self._initialized:
|
|
return
|
|
|
|
try:
|
|
if self.use_async and self._docker_client:
|
|
await self._docker_client.disconnect()
|
|
# Sync client doesn't need explicit shutdown
|
|
|
|
self._initialized = False
|
|
logger.info("Docker service connection closed")
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Error during Docker service shutdown", extra={"error": str(e)}
|
|
)
|
|
|
|
async def ping(self) -> bool:
|
|
"""Test Docker daemon connectivity."""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
if self.use_async:
|
|
return await self._docker_client.ping()
|
|
else:
|
|
self._docker_client.ping()
|
|
return True
|
|
except Exception as e:
|
|
logger.warning("Docker ping failed", extra={"error": str(e)})
|
|
return False
|
|
|
|
async def create_container(
|
|
self,
|
|
name: str,
|
|
image: str,
|
|
volumes: Optional[Dict[str, Dict[str, str]]] = None,
|
|
ports: Optional[Dict[str, int]] = None,
|
|
environment: Optional[Dict[str, str]] = None,
|
|
network_mode: str = "bridge",
|
|
mem_limit: Optional[str] = None,
|
|
cpu_quota: Optional[int] = None,
|
|
cpu_period: Optional[int] = None,
|
|
tmpfs: Optional[Dict[str, str]] = None,
|
|
**kwargs,
|
|
) -> ContainerInfo:
|
|
"""
|
|
Create a Docker container.
|
|
|
|
Args:
|
|
name: Container name
|
|
image: Container image
|
|
volumes: Volume mounts
|
|
ports: Port mappings
|
|
environment: Environment variables
|
|
network_mode: Network mode
|
|
mem_limit: Memory limit
|
|
cpu_quota: CPU quota
|
|
cpu_period: CPU period
|
|
tmpfs: tmpfs mounts
|
|
**kwargs: Additional options
|
|
|
|
Returns:
|
|
ContainerInfo: Information about created container
|
|
|
|
Raises:
|
|
DockerOperationError: If container creation fails
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
logger.info(
|
|
"Creating container",
|
|
extra={
|
|
"container_name": name,
|
|
"image": image,
|
|
"memory_limit": mem_limit,
|
|
"cpu_quota": cpu_quota,
|
|
},
|
|
)
|
|
|
|
if self.use_async:
|
|
container = await self._docker_client.create_container(
|
|
image=image,
|
|
name=name,
|
|
volumes=volumes,
|
|
ports=ports,
|
|
environment=environment,
|
|
network_mode=network_mode,
|
|
mem_limit=mem_limit,
|
|
cpu_quota=cpu_quota,
|
|
cpu_period=cpu_period,
|
|
tmpfs=tmpfs,
|
|
**kwargs,
|
|
)
|
|
|
|
return ContainerInfo(
|
|
container_id=container.id,
|
|
name=name,
|
|
image=image,
|
|
status="created",
|
|
ports=ports,
|
|
)
|
|
else:
|
|
container = self._docker_client.containers.run(
|
|
image,
|
|
name=name,
|
|
volumes=volumes,
|
|
ports={f"{port}/tcp": port for port in ports.values()}
|
|
if ports
|
|
else None,
|
|
environment=environment,
|
|
network_mode=network_mode,
|
|
mem_limit=mem_limit,
|
|
cpu_quota=cpu_quota,
|
|
cpu_period=cpu_period,
|
|
tmpfs=tmpfs,
|
|
detach=True,
|
|
**kwargs,
|
|
)
|
|
|
|
return ContainerInfo(
|
|
container_id=container.id,
|
|
name=name,
|
|
image=image,
|
|
status="running",
|
|
ports=ports,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Container creation failed",
|
|
extra={"container_name": name, "image": image, "error": str(e)},
|
|
)
|
|
raise DockerOperationError("create_container", name, str(e))
|
|
|
|
async def start_container(self, container_id: str) -> None:
|
|
"""
|
|
Start a Docker container.
|
|
|
|
Args:
|
|
container_id: Container ID
|
|
|
|
Raises:
|
|
DockerOperationError: If container start fails
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
logger.info("Starting container", extra={"container_id": container_id})
|
|
|
|
if self.use_async:
|
|
container = await self._docker_client.get_container(container_id)
|
|
await self._docker_client.start_container(container)
|
|
else:
|
|
container = self._docker_client.containers.get(container_id)
|
|
container.start()
|
|
|
|
logger.info(
|
|
"Container started successfully", extra={"container_id": container_id}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Container start failed",
|
|
extra={"container_id": container_id, "error": str(e)},
|
|
)
|
|
raise DockerOperationError("start_container", container_id, str(e))
|
|
|
|
async def stop_container(self, container_id: str, timeout: int = 10) -> None:
|
|
"""
|
|
Stop a Docker container.
|
|
|
|
Args:
|
|
container_id: Container ID
|
|
timeout: Stop timeout in seconds
|
|
|
|
Raises:
|
|
DockerOperationError: If container stop fails
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
logger.info(
|
|
"Stopping container",
|
|
extra={"container_id": container_id, "timeout": timeout},
|
|
)
|
|
|
|
if self.use_async:
|
|
container = await self._docker_client.get_container(container_id)
|
|
await self._docker_client.stop_container(container, timeout)
|
|
else:
|
|
container = self._docker_client.containers.get(container_id)
|
|
container.stop(timeout=timeout)
|
|
|
|
logger.info(
|
|
"Container stopped successfully", extra={"container_id": container_id}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Container stop failed",
|
|
extra={"container_id": container_id, "error": str(e)},
|
|
)
|
|
raise DockerOperationError("stop_container", container_id, str(e))
|
|
|
|
async def remove_container(self, container_id: str, force: bool = False) -> None:
|
|
"""
|
|
Remove a Docker container.
|
|
|
|
Args:
|
|
container_id: Container ID
|
|
force: Force removal if running
|
|
|
|
Raises:
|
|
DockerOperationError: If container removal fails
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
logger.info(
|
|
"Removing container",
|
|
extra={"container_id": container_id, "force": force},
|
|
)
|
|
|
|
if self.use_async:
|
|
container = await self._docker_client.get_container(container_id)
|
|
await self._docker_client.remove_container(container, force)
|
|
else:
|
|
container = self._docker_client.containers.get(container_id)
|
|
container.remove(force=force)
|
|
|
|
logger.info(
|
|
"Container removed successfully", extra={"container_id": container_id}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Container removal failed",
|
|
extra={"container_id": container_id, "error": str(e)},
|
|
)
|
|
raise DockerOperationError("remove_container", container_id, str(e))
|
|
|
|
async def get_container_info(self, container_id: str) -> Optional[ContainerInfo]:
|
|
"""
|
|
Get information about a container.
|
|
|
|
Args:
|
|
container_id: Container ID
|
|
|
|
Returns:
|
|
ContainerInfo or None: Container information
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
if self.use_async:
|
|
container_info = await self._docker_client._get_container_info(
|
|
container_id
|
|
)
|
|
if container_info:
|
|
state = container_info.get("State", {})
|
|
config = container_info.get("Config", {})
|
|
return ContainerInfo(
|
|
container_id=container_id,
|
|
name=config.get("Name", "").lstrip("/"),
|
|
image=config.get("Image", ""),
|
|
status=state.get("Status", "unknown"),
|
|
health_status=state.get("Health", {}).get("Status"),
|
|
)
|
|
else:
|
|
container = self._docker_client.containers.get(container_id)
|
|
return ContainerInfo(
|
|
container_id=container.id,
|
|
name=container.name,
|
|
image=container.image.tags[0]
|
|
if container.image.tags
|
|
else container.image.id,
|
|
status=container.status,
|
|
)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(
|
|
"Container info retrieval failed",
|
|
extra={"container_id": container_id, "error": str(e)},
|
|
)
|
|
return None
|
|
|
|
async def list_containers(
|
|
self, all: bool = False, filters: Optional[Dict[str, Any]] = None
|
|
) -> List[ContainerInfo]:
|
|
"""
|
|
List Docker containers.
|
|
|
|
Args:
|
|
all: Include stopped containers
|
|
filters: Container filters
|
|
|
|
Returns:
|
|
List[ContainerInfo]: List of container information
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
if self.use_async:
|
|
containers = await self._docker_client.list_containers(
|
|
all=all, filters=filters
|
|
)
|
|
result = []
|
|
for container in containers:
|
|
container_info = await self._docker_client._get_container_info(
|
|
container.id
|
|
)
|
|
if container_info:
|
|
state = container_info.get("State", {})
|
|
config = container_info.get("Config", {})
|
|
result.append(
|
|
ContainerInfo(
|
|
container_id=container.id,
|
|
name=config.get("Name", "").lstrip("/"),
|
|
image=config.get("Image", ""),
|
|
status=state.get("Status", "unknown"),
|
|
health_status=state.get("Health", {}).get("Status"),
|
|
)
|
|
)
|
|
return result
|
|
else:
|
|
containers = self._docker_client.containers.list(
|
|
all=all, filters=filters
|
|
)
|
|
result = []
|
|
for container in containers:
|
|
result.append(
|
|
ContainerInfo(
|
|
container_id=container.id,
|
|
name=container.name,
|
|
image=container.image.tags[0]
|
|
if container.image.tags
|
|
else container.image.id,
|
|
status=container.status,
|
|
)
|
|
)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error("Container listing failed", extra={"error": str(e)})
|
|
return []
|
|
|
|
async def get_container_logs(self, container_id: str, tail: int = 100) -> str:
|
|
"""
|
|
Get container logs.
|
|
|
|
Args:
|
|
container_id: Container ID
|
|
tail: Number of log lines to retrieve
|
|
|
|
Returns:
|
|
str: Container logs
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
if self.use_async:
|
|
container = await self._docker_client.get_container(container_id)
|
|
logs = await container.log(stdout=True, stderr=True, tail=tail)
|
|
return "\n".join(logs)
|
|
else:
|
|
container = self._docker_client.containers.get(container_id)
|
|
logs = container.logs(tail=tail).decode("utf-8")
|
|
return logs
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Container log retrieval failed",
|
|
extra={"container_id": container_id, "error": str(e)},
|
|
)
|
|
return ""
|
|
|
|
async def get_system_info(self) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get Docker system information.
|
|
|
|
Returns:
|
|
Dict or None: System information
|
|
"""
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
try:
|
|
if self.use_async:
|
|
return await self._docker_client.get_system_info()
|
|
else:
|
|
return self._docker_client.info()
|
|
except Exception as e:
|
|
logger.warning("System info retrieval failed", extra={"error": str(e)})
|
|
return None
|
|
|
|
# Context manager support
|
|
async def __aenter__(self):
|
|
await self.initialize()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.shutdown()
|
|
|
|
|
|
class MockDockerService(DockerService):
|
|
"""
|
|
Mock Docker service for testing without actual Docker.
|
|
|
|
Provides the same interface but with in-memory operations.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__(use_async=False)
|
|
self._containers: Dict[str, ContainerInfo] = {}
|
|
self._next_id = 1
|
|
|
|
async def initialize(self) -> None:
|
|
"""Mock initialization - always succeeds."""
|
|
self._initialized = True
|
|
logger.info("Mock Docker service initialized")
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Mock shutdown."""
|
|
self._containers.clear()
|
|
self._initialized = False
|
|
logger.info("Mock Docker service shutdown")
|
|
|
|
async def ping(self) -> bool:
|
|
"""Mock ping - always succeeds."""
|
|
return True
|
|
|
|
async def create_container(self, name: str, image: str, **kwargs) -> ContainerInfo:
|
|
"""Mock container creation."""
|
|
container_id = f"mock-{self._next_id}"
|
|
self._next_id += 1
|
|
|
|
container = ContainerInfo(
|
|
container_id=container_id, name=name, image=image, status="created"
|
|
)
|
|
|
|
self._containers[container_id] = container
|
|
logger.info(
|
|
"Mock container created",
|
|
extra={
|
|
"container_id": container_id,
|
|
"container_name": name,
|
|
"image": image,
|
|
},
|
|
)
|
|
|
|
return container
|
|
|
|
async def start_container(self, container_id: str) -> None:
|
|
"""Mock container start."""
|
|
if container_id in self._containers:
|
|
self._containers[container_id].status = "running"
|
|
logger.info("Mock container started", extra={"container_id": container_id})
|
|
|
|
async def stop_container(self, container_id: str, timeout: int = 10) -> None:
|
|
"""Mock container stop."""
|
|
if container_id in self._containers:
|
|
self._containers[container_id].status = "exited"
|
|
logger.info("Mock container stopped", extra={"container_id": container_id})
|
|
|
|
async def remove_container(self, container_id: str, force: bool = False) -> None:
|
|
"""Mock container removal."""
|
|
if container_id in self._containers:
|
|
del self._containers[container_id]
|
|
logger.info("Mock container removed", extra={"container_id": container_id})
|
|
|
|
async def get_container_info(self, container_id: str) -> Optional[ContainerInfo]:
|
|
"""Mock container info retrieval."""
|
|
return self._containers.get(container_id)
|
|
|
|
async def list_containers(
|
|
self, all: bool = False, filters: Optional[Dict[str, Any]] = None
|
|
) -> List[ContainerInfo]:
|
|
"""Mock container listing."""
|
|
return list(self._containers.values())
|