Files
lovdata-chat/docker/scripts/test-http-connection-pool.py
2026-01-18 23:29:04 +01:00

326 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
HTTP Connection Pooling Test Script
Tests the HTTP connection pool implementation to verify performance improvements
and proper connection reuse for proxy operations.
"""
import os
import sys
import asyncio
import time
import statistics
from pathlib import Path
# Add session-manager to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from http_pool import HTTPConnectionPool, make_http_request, get_connection_pool_stats
# Set up logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def test_connection_pool_initialization():
"""Test HTTP connection pool initialization."""
print("🧪 Testing HTTP Connection Pool Initialization")
print("=" * 60)
pool = HTTPConnectionPool()
# Test client creation
print("1⃣ Testing pool initialization...")
try:
await pool.ensure_client()
print("✅ HTTP connection pool initialized successfully")
except Exception as e:
print(f"❌ Pool initialization failed: {e}")
return False
# Test pool stats
print("\n2⃣ Testing pool statistics...")
try:
stats = await pool.get_pool_stats()
print(f"✅ Pool stats retrieved: status={stats.get('status')}")
print(
f" Max keepalive connections: {stats.get('config', {}).get('max_keepalive_connections')}"
)
print(f" Max connections: {stats.get('config', {}).get('max_connections')}")
except Exception as e:
print(f"❌ Pool stats failed: {e}")
return False
return True
async def test_connection_reuse():
"""Test connection reuse vs creating new clients."""
print("\n🔄 Testing Connection Reuse Performance")
print("=" * 60)
# Test with connection pool
print("1⃣ Testing with connection pool...")
pool_times = []
pool = HTTPConnectionPool()
# Make multiple requests using the pool
for i in range(10):
start_time = time.time()
try:
# Use a simple HTTP endpoint for testing (we'll use httpbin.org)
response = await pool.request("GET", "https://httpbin.org/get")
if response.status_code == 200:
pool_times.append(time.time() - start_time)
else:
print(f"❌ Request {i + 1} failed with status {response.status_code}")
pool_times.append(float("inf"))
except Exception as e:
print(f"❌ Request {i + 1} failed: {e}")
pool_times.append(float("inf"))
# Test with new client each time (inefficient way)
print("\n2⃣ Testing with new client each request (inefficient)...")
new_client_times = []
for i in range(10):
start_time = time.time()
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get("https://httpbin.org/get")
if response.status_code == 200:
new_client_times.append(time.time() - start_time)
else:
print(
f"❌ Request {i + 1} failed with status {response.status_code}"
)
new_client_times.append(float("inf"))
except Exception as e:
print(f"❌ Request {i + 1} failed: {e}")
new_client_times.append(float("inf"))
# Filter out failed requests
pool_times = [t for t in pool_times if t != float("inf")]
new_client_times = [t for t in new_client_times if t != float("inf")]
if pool_times and new_client_times:
pool_avg = statistics.mean(pool_times)
new_client_avg = statistics.mean(new_client_times)
improvement = ((new_client_avg - pool_avg) / new_client_avg) * 100
print(f"\n📊 Performance Comparison:")
print(f" Connection pool average: {pool_avg:.3f}s")
print(f" New client average: {new_client_avg:.3f}s")
print(f" Performance improvement: {improvement:.1f}%")
if improvement > 10: # Expect at least 10% improvement
print("✅ Connection pooling provides significant performance improvement")
return True
else:
print("⚠️ Performance improvement below expected threshold")
return True # Still works, just not as much improvement
else:
print("❌ Insufficient successful requests for comparison")
return False
async def test_concurrent_requests():
"""Test handling multiple concurrent requests."""
print("\n⚡ Testing Concurrent Request Handling")
print("=" * 60)
async def make_concurrent_request(request_id: int):
"""Make a request and return timing info."""
start_time = time.time()
try:
response = await make_http_request("GET", "https://httpbin.org/get")
duration = time.time() - start_time
return {
"id": request_id,
"success": True,
"duration": duration,
"status": response.status_code,
}
except Exception as e:
duration = time.time() - start_time
return {
"id": request_id,
"success": False,
"duration": duration,
"error": str(e),
}
print("1⃣ Testing 20 concurrent requests...")
start_time = time.time()
# Launch 20 concurrent requests
tasks = [make_concurrent_request(i) for i in range(20)]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# Analyze results
successful = 0
failed = 0
durations = []
for result in results:
if isinstance(result, dict):
if result.get("success"):
successful += 1
durations.append(result.get("duration", 0))
else:
failed += 1
print(f"✅ Concurrent requests completed in {total_time:.2f}s")
print(f" Successful: {successful}/20")
print(f" Failed: {failed}/20")
if durations:
avg_duration = statistics.mean(durations)
max_duration = max(durations)
print(f" Average request time: {avg_duration:.3f}s")
print(f" Max request time: {max_duration:.3f}s")
# Check if requests were reasonably concurrent (not serialized)
if total_time < (max_duration * 1.5): # Allow some overhead
print("✅ Requests handled concurrently (not serialized)")
else:
print("⚠️ Requests may have been serialized")
if successful >= 15: # At least 75% success rate
print("✅ Concurrent request handling successful")
return True
else:
print("❌ Concurrent request handling failed")
return False
async def test_connection_pool_limits():
"""Test connection pool limits and behavior."""
print("\n🎛️ Testing Connection Pool Limits")
print("=" * 60)
pool = HTTPConnectionPool()
print("1⃣ Testing pool configuration...")
stats = await pool.get_pool_stats()
if isinstance(stats, dict):
config = stats.get("config", {})
max_keepalive = config.get("max_keepalive_connections")
max_connections = config.get("max_connections")
print(f"✅ Pool configured with:")
print(f" Max keepalive connections: {max_keepalive}")
print(f" Max total connections: {max_connections}")
print(f" Keepalive expiry: {config.get('keepalive_expiry')}s")
# Verify reasonable limits
if max_keepalive >= 10 and max_connections >= 50:
print("✅ Pool limits are reasonably configured")
return True
else:
print("⚠️ Pool limits may be too restrictive")
return True # Still functional
else:
print("❌ Could not retrieve pool configuration")
return False
async def test_error_handling():
"""Test error handling and recovery."""
print("\n🚨 Testing Error Handling and Recovery")
print("=" * 60)
pool = HTTPConnectionPool()
print("1⃣ Testing invalid URL handling...")
try:
response = await pool.request(
"GET", "http://invalid-domain-that-does-not-exist.com"
)
print("❌ Expected error but request succeeded")
return False
except Exception as e:
print(f"✅ Invalid URL properly handled: {type(e).__name__}")
print("\n2⃣ Testing timeout handling...")
try:
# Use a very short timeout to force timeout
response = await pool.request(
"GET", "https://httpbin.org/delay/10", timeout=1.0
)
print("❌ Expected timeout but request succeeded")
return False
except Exception as e:
print(f"✅ Timeout properly handled: {type(e).__name__}")
print("\n3⃣ Testing pool recovery after errors...")
try:
# Make a successful request after errors
response = await pool.request("GET", "https://httpbin.org/get")
if response.status_code == 200:
print("✅ Pool recovered successfully after errors")
return True
else:
print(f"❌ Pool recovery failed with status {response.status_code}")
return False
except Exception as e:
print(f"❌ Pool recovery failed: {e}")
return False
async def run_all_http_pool_tests():
"""Run all HTTP connection pool tests."""
print("🌐 HTTP Connection Pooling Test Suite")
print("=" * 70)
tests = [
("Connection Pool Initialization", test_connection_pool_initialization),
("Connection Reuse Performance", test_connection_reuse),
("Concurrent Request Handling", test_concurrent_requests),
("Connection Pool Limits", test_connection_pool_limits),
("Error Handling and Recovery", test_error_handling),
]
results = []
for test_name, test_func in tests:
print(f"\n{'=' * 25} {test_name} {'=' * 25}")
try:
result = await test_func()
results.append(result)
status = "✅ PASSED" if result else "❌ FAILED"
print(f"\n{status}: {test_name}")
except Exception as e:
print(f"\n❌ ERROR in {test_name}: {e}")
results.append(False)
# Summary
print(f"\n{'=' * 70}")
passed = sum(results)
total = len(results)
print(f"📊 Test Results: {passed}/{total} tests passed")
if passed == total:
print("🎉 All HTTP connection pooling tests completed successfully!")
print(
"🔗 Connection pooling is working correctly for improved proxy performance."
)
else:
print("⚠️ Some tests failed. Check the output above for details.")
print(
"💡 Ensure internet connectivity and httpbin.org is accessible for testing."
)
return passed == total
if __name__ == "__main__":
asyncio.run(run_all_http_pool_tests())