#!/usr/bin/env python3 """ Async Docker Operations Test Script Tests the async Docker client implementation to ensure non-blocking operations and improved concurrency in FastAPI async contexts. """ import os import sys import asyncio import time import logging from pathlib import Path # Add session-manager to path for imports sys.path.insert(0, str(Path(__file__).parent)) from async_docker_client import ( AsyncDockerClient, get_async_docker_client, async_docker_ping, async_create_container, async_start_container, async_stop_container, async_remove_container, async_list_containers, async_get_container, ) # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def test_async_docker_client(): """Test the async Docker client functionality.""" print("๐Ÿงช Testing Async Docker Client") print("=" * 50) # Test 1: Basic client initialization and ping print("1๏ธโƒฃ Testing client initialization and ping...") try: async with get_async_docker_client() as client: ping_result = await client.ping() if ping_result: print("โœ… Async Docker client ping successful") else: print("โŒ Async Docker client ping failed") return False except Exception as e: print(f"โŒ Async client initialization failed: {e}") return False # Test 2: Container listing print("\n2๏ธโƒฃ Testing container listing...") try: containers = await async_list_containers(all=True) print(f"โœ… Successfully listed {len(containers)} containers") except Exception as e: print(f"โŒ Container listing failed: {e}") return False # Test 3: System info retrieval print("\n3๏ธโƒฃ Testing system info retrieval...") try: async with get_async_docker_client() as client: system_info = await client.get_system_info() if system_info: server_version = system_info.get("ServerVersion", "Unknown") print( f"โœ… Docker system info retrieved: ServerVersion={server_version}" ) else: print("โš ๏ธ System info retrieval returned None") except Exception as e: print(f"โŒ System info retrieval failed: {e}") return False return True async def test_concurrent_operations(): """Test concurrent async Docker operations.""" print("\nโšก Testing Concurrent Operations") print("=" * 50) async def concurrent_task(task_id: int): """Simulate concurrent Docker operation.""" try: # Small delay to simulate processing await asyncio.sleep(0.1) # Perform a container listing operation containers = await async_list_containers(all=False) return f"Task {task_id}: Listed {len(containers)} containers" except Exception as e: return f"Task {task_id}: Failed - {e}" # Test concurrent execution print("1๏ธโƒฃ Testing concurrent container listings...") start_time = time.time() # Launch multiple concurrent operations tasks = [concurrent_task(i) for i in range(10)] results = await asyncio.gather(*tasks, return_exceptions=True) end_time = time.time() duration = end_time - start_time # Analyze results successful = sum( 1 for r in results if not isinstance(r, Exception) and "Failed" not in str(r) ) failed = len(results) - successful print(f"โœ… Concurrent operations completed in {duration:.2f}s") print(f" Successful: {successful}/10") print(f" Failed: {failed}/10") if successful >= 8: # Allow some tolerance print("โœ… Concurrent operations test passed") return True else: print("โŒ Concurrent operations test failed") return False async def test_container_lifecycle(): """Test full container lifecycle with async operations.""" print("\n๐Ÿณ Testing Container Lifecycle") print("=" * 50) container_name = f"test-async-container-{int(time.time())}" try: # Test container creation print("1๏ธโƒฃ Creating test container...") container = await async_create_container( image="alpine:latest", name=container_name, environment={"TEST": "async"}, command=["sleep", "30"], ) print(f"โœ… Container created: {container.id}") # Test container start print("\n2๏ธโƒฃ Starting container...") await async_start_container(container) print("โœ… Container started") # Small delay to let container start await asyncio.sleep(2) # Test container retrieval print("\n3๏ธโƒฃ Retrieving container info...") retrieved = await async_get_container(container_name) if retrieved and retrieved.id == container.id: print("โœ… Container retrieval successful") else: print("โŒ Container retrieval failed") return False # Test container stop print("\n4๏ธโƒฃ Stopping container...") await async_stop_container(container, timeout=5) print("โœ… Container stopped") # Test container removal print("\n5๏ธโƒฃ Removing container...") await async_remove_container(container) print("โœ… Container removed") return True except Exception as e: print(f"โŒ Container lifecycle test failed: {e}") # Cleanup on failure try: container = await async_get_container(container_name) if container: await async_stop_container(container, timeout=5) await async_remove_container(container) print("๐Ÿงน Cleaned up failed test container") except Exception: pass return False async def test_performance_comparison(): """Compare performance between sync and async operations.""" print("\n๐Ÿ“Š Performance Comparison") print("=" * 50) # Test async performance print("1๏ธโƒฃ Testing async operation performance...") async_start = time.time() # Perform multiple async operations tasks = [] for i in range(5): tasks.append(async_list_containers(all=False)) tasks.append(async_docker_ping()) results = await asyncio.gather(*tasks, return_exceptions=True) async_duration = time.time() - async_start successful_async = sum(1 for r in results if not isinstance(r, Exception)) print( f"โœ… Async operations: {successful_async}/{len(tasks)} successful in {async_duration:.3f}s" ) # Note: We can't easily test sync operations in the same process due to blocking print("โ„น๏ธ Note: Sync operations would block and cannot be tested concurrently") return successful_async == len(tasks) async def run_all_async_tests(): """Run all async Docker operation tests.""" print("๐Ÿš€ Async Docker Operations Test Suite") print("=" * 60) tests = [ ("Async Docker Client", test_async_docker_client), ("Concurrent Operations", test_concurrent_operations), ("Container Lifecycle", test_container_lifecycle), ("Performance Comparison", test_performance_comparison), ] results = [] for test_name, test_func in tests: print(f"\n{'=' * 20} {test_name} {'=' * 20}") try: result = await test_func() results.append(result) status = "โœ… PASSED" if result else "โŒ FAILED" print(f"\n{status}: {test_name}") except Exception as e: print(f"\nโŒ ERROR in {test_name}: {e}") results.append(False) # Summary print(f"\n{'=' * 60}") passed = sum(results) total = len(results) print(f"๐Ÿ“Š Test Results: {passed}/{total} tests passed") if passed == total: print("๐ŸŽ‰ All async Docker operation tests completed successfully!") print("โšก Async operations are working correctly for improved concurrency.") else: print("โš ๏ธ Some tests failed. Check the output above for details.") print("๐Ÿ’ก Ensure Docker daemon is running and accessible.") return passed == total if __name__ == "__main__": asyncio.run(run_all_async_tests())