#!/usr/bin/env python3 """ Communication Simulator Test for Gemini MCP Server This script provides comprehensive end-to-end testing of the Gemini MCP server by simulating real Claude CLI communications and validating conversation continuity, file handling, deduplication features, and clarification scenarios. Test Flow: 1. Setup fresh Docker environment with clean containers 2. Simulate Claude CLI tool calls via docker exec 3. Test conversation threading with file handling 4. Validate file deduplication in conversation history 5. Test requires_clarification scenarios and continuation flows 6. Validate edge cases like partial file provision and clarification loops 7. Check Docker logs for proper behavior 8. Cleanup and report results New Clarification Testing Features: - Debug tool clarification scenarios - Analyze tool clarification flows - Clarification with file deduplication across turns - Multiple round clarification loops - Partial file provision edge cases - Real clarification flows with ambiguous prompts Usage: python communication_simulator_test.py [--verbose] [--keep-logs] [--tests TEST_NAME...] [--individual TEST_NAME] [--skip-docker] --tests: Run specific tests only (space-separated) --list-tests: List all available tests --individual: Run a single test individually --skip-docker: Skip Docker setup (assumes containers are already running) Available tests: basic_conversation - Basic conversation flow with chat tool per_tool_deduplication - File deduplication for individual tools cross_tool_continuation - Cross-tool conversation continuation scenarios state_isolation - State isolation and contamination detection conversation_boundaries - Conversation boundaries and reset behavior clarification_scenarios - Requires clarification scenarios content_validation - Content validation and duplicate detection logs_validation - Docker logs validation redis_validation - Redis conversation memory validation Examples: # Run all tests python communication_simulator_test.py # Run only basic conversation and content validation tests python communication_simulator_test.py --tests basic_conversation content_validation # Run a single test individually (with full Docker setup) python communication_simulator_test.py --individual content_validation # Run a single test individually (assuming Docker is already running) python communication_simulator_test.py --individual content_validation --skip-docker # List available tests python communication_simulator_test.py --list-tests """ import argparse import json import logging import os import shutil import subprocess import sys import tempfile import time from typing import Optional class CommunicationSimulator: """Simulates real-world Claude CLI communication with MCP Gemini server""" def __init__(self, verbose: bool = False, keep_logs: bool = False, selected_tests: list[str] = None): self.verbose = verbose self.keep_logs = keep_logs self.selected_tests = selected_tests or [] self.temp_dir = None self.container_name = "gemini-mcp-server" self.redis_container = "gemini-mcp-redis" # Import test registry from simulator_tests import TEST_REGISTRY self.test_registry = TEST_REGISTRY # Available test methods mapping self.available_tests = { name: self._create_test_runner(test_class) for name, test_class in self.test_registry.items() } # Test result tracking self.test_results = { "basic_conversation": False, "per_tool_tests": {}, "cross_tool_scenarios": {}, "clarification_scenarios": {}, "content_validation": {}, "logs_validation": False, "redis_validation": False, } # Configure logging log_level = logging.DEBUG if verbose else logging.INFO logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s") self.logger = logging.getLogger(__name__) def _create_test_runner(self, test_class): """Create a test runner function for a test class""" def run_test(): test_instance = test_class(verbose=self.verbose) return test_instance.run_test() return run_test def setup_test_environment(self) -> bool: """Setup fresh Docker environment and test files""" try: self.logger.info("πŸš€ Setting up test environment...") # Create temporary directory for test files self.temp_dir = tempfile.mkdtemp(prefix="mcp_test_") self.logger.debug(f"Created temp directory: {self.temp_dir}") # Create test files self._create_test_files() # Setup Docker environment return self._setup_docker() except Exception as e: self.logger.error(f"Failed to setup test environment: {e}") return False def _create_test_files(self): """Create test files for the simulation in a location accessible by Docker""" # Test Python file python_content = '''""" Sample Python module for testing MCP conversation continuity """ def fibonacci(n): """Calculate fibonacci number recursively""" if n <= 1: return n return fibonacci(n-1) + fibonacci(n-2) def factorial(n): """Calculate factorial iteratively""" result = 1 for i in range(1, n + 1): result *= i return result class Calculator: """Simple calculator class""" def __init__(self): self.history = [] def add(self, a, b): result = a + b self.history.append(f"{a} + {b} = {result}") return result def multiply(self, a, b): result = a * b self.history.append(f"{a} * {b} = {result}") return result ''' # Test configuration file config_content = """{ "database": { "host": "localhost", "port": 5432, "name": "testdb", "ssl": true }, "cache": { "redis_url": "redis://localhost:6379", "ttl": 3600 }, "logging": { "level": "INFO", "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" } }""" # Create files in the current project directory so they're accessible to MCP tools # MCP tools can access files with absolute paths within the project current_dir = os.getcwd() test_dir = os.path.join(current_dir, "test_simulation_files") os.makedirs(test_dir, exist_ok=True) test_py = os.path.join(test_dir, "test_module.py") test_config = os.path.join(test_dir, "config.json") with open(test_py, "w") as f: f.write(python_content) with open(test_config, "w") as f: f.write(config_content) self.test_files = {"python": test_py, "config": test_config} # Store test directory for cleanup self.test_dir = test_dir self.logger.debug(f"Created test files: {list(self.test_files.values())}") def _setup_docker(self) -> bool: """Setup fresh Docker environment""" try: self.logger.info("🐳 Setting up Docker environment...") # Stop and remove existing containers self._run_command(["docker", "compose", "down", "--remove-orphans"], check=False, capture_output=True) # Clean up any old containers/images old_containers = [self.container_name, self.redis_container] for container in old_containers: self._run_command(["docker", "stop", container], check=False, capture_output=True) self._run_command(["docker", "rm", container], check=False, capture_output=True) # Build and start services self.logger.info("πŸ“¦ Building Docker images...") result = self._run_command(["docker", "compose", "build", "--no-cache"], capture_output=True) if result.returncode != 0: self.logger.error(f"Docker build failed: {result.stderr}") return False self.logger.info("πŸš€ Starting Docker services...") result = self._run_command(["docker", "compose", "up", "-d"], capture_output=True) if result.returncode != 0: self.logger.error(f"Docker startup failed: {result.stderr}") return False # Wait for services to be ready self.logger.info("⏳ Waiting for services to be ready...") time.sleep(10) # Give services time to initialize # Verify containers are running if not self._verify_containers(): return False self.logger.info("βœ… Docker environment ready") return True except Exception as e: self.logger.error(f"Docker setup failed: {e}") return False def _verify_containers(self) -> bool: """Verify that required containers are running""" try: result = self._run_command(["docker", "ps", "--format", "{{.Names}}"], capture_output=True) running_containers = result.stdout.decode().strip().split("\n") required = [self.container_name, self.redis_container] for container in required: if container not in running_containers: self.logger.error(f"Container not running: {container}") return False self.logger.debug(f"Verified containers running: {required}") return True except Exception as e: self.logger.error(f"Container verification failed: {e}") return False def simulate_claude_cli_session(self) -> bool: """Simulate a complete Claude CLI session with conversation continuity""" try: self.logger.info("πŸ€– Starting Claude CLI simulation...") # If specific tests are selected, run only those if self.selected_tests: return self._run_selected_tests() # Otherwise run all tests in order test_sequence = [ "basic_conversation", "per_tool_deduplication", "cross_tool_continuation", "state_isolation", "conversation_boundaries", "clarification_scenarios", "content_validation" ] for test_name in test_sequence: if not self._run_single_test(test_name): return False self.logger.info("βœ… All conversation continuity, clarification, and content validation tests passed") return True except Exception as e: self.logger.error(f"Claude CLI simulation failed: {e}") return False def _run_selected_tests(self) -> bool: """Run only the selected tests""" try: self.logger.info(f"🎯 Running selected tests: {', '.join(self.selected_tests)}") for test_name in self.selected_tests: if not self._run_single_test(test_name): return False self.logger.info("βœ… All selected tests passed") return True except Exception as e: self.logger.error(f"Selected tests failed: {e}") return False def _run_single_test(self, test_name: str) -> bool: """Run a single test by name""" try: if test_name not in self.available_tests: self.logger.error(f"Unknown test: {test_name}") self.logger.info(f"Available tests: {', '.join(self.available_tests.keys())}") return False self.logger.info(f"πŸ§ͺ Running test: {test_name}") test_function = self.available_tests[test_name] result = test_function() if result: self.logger.info(f"βœ… Test {test_name} passed") else: self.logger.error(f"❌ Test {test_name} failed") return result except Exception as e: self.logger.error(f"Test {test_name} failed with exception: {e}") return False def get_available_tests(self) -> dict[str, str]: """Get available tests with descriptions""" descriptions = {} for name, test_class in self.test_registry.items(): # Create temporary instance to get description temp_instance = test_class(verbose=False) descriptions[name] = temp_instance.test_description return descriptions def _test_basic_conversation_flow(self) -> bool: """Test basic conversation flow with chat tool""" try: self.logger.info("πŸ“ Test 1: Basic conversation flow") # Initial chat tool call with file self.logger.info(" 1.1: Initial chat with file analysis") response1, continuation_id = self._call_mcp_tool( "chat", {"prompt": "Analyze this Python code and explain what it does", "files": [self.test_files["python"]]}, ) if not response1 or not continuation_id: self.logger.error("Failed to get initial response with continuation_id") return False self.logger.info(f" βœ… Got continuation_id: {continuation_id}") # Continue conversation with same file (should be deduplicated) self.logger.info(" 1.2: Continue conversation with same file") response2, _ = self._call_mcp_tool( "chat", { "prompt": "Now focus on the Calculator class specifically. Are there any improvements you'd suggest?", "files": [self.test_files["python"]], # Same file - should be deduplicated "continuation_id": continuation_id, }, ) if not response2: self.logger.error("Failed to continue conversation") return False # Continue with additional file self.logger.info(" 1.3: Continue conversation with additional file") response3, _ = self._call_mcp_tool( "chat", { "prompt": "Now also analyze this configuration file and see how it might relate to the Python code", "files": [self.test_files["python"], self.test_files["config"]], "continuation_id": continuation_id, }, ) if not response3: self.logger.error("Failed to continue with additional file") return False self.logger.info(" βœ… Basic conversation flow working") self.test_results["basic_conversation"] = True return True except Exception as e: self.logger.error(f"Basic conversation flow test failed: {e}") return False def _test_per_tool_file_deduplication(self) -> bool: """Test file deduplication for each individual tool""" try: self.logger.info("πŸ“„ Test 2: Per-tool file deduplication") tools_to_test = [ ( "thinkdeep", { "prompt": "Think deeply about this Python code and identify potential architectural improvements", "files": [self.test_files["python"]], }, ), ("analyze", {"files": [self.test_files["python"]], "analysis_type": "architecture"}), ( "debug", { "files": [self.test_files["python"]], "issue_description": "The fibonacci function seems slow for large numbers", }, ), ( "codereview", { "files": [self.test_files["python"]], "context": "General code review for quality and best practices", }, ), ] for tool_name, initial_params in tools_to_test: self.logger.info(f" 2.{tool_name}: Testing {tool_name} tool file deduplication") # Initial call response1, continuation_id = self._call_mcp_tool(tool_name, initial_params) if not response1: self.logger.warning(f" ⚠️ {tool_name} tool initial call failed, skipping") continue if not continuation_id: self.logger.warning(f" ⚠️ {tool_name} tool didn't provide continuation_id, skipping") continue # Continue with same file - should be deduplicated continue_params = initial_params.copy() continue_params["continuation_id"] = continuation_id if tool_name == "thinkdeep": continue_params["prompt"] = "Now focus specifically on the recursive fibonacci implementation" elif tool_name == "analyze": continue_params["analysis_type"] = "performance" elif tool_name == "debug": continue_params["issue_description"] = "How can we optimize the fibonacci function?" elif tool_name == "codereview": continue_params["context"] = "Focus on the Calculator class implementation" response2, _ = self._call_mcp_tool(tool_name, continue_params) if response2: self.logger.info(f" βœ… {tool_name} tool file deduplication working") self.test_results["per_tool_tests"][tool_name] = True else: self.logger.warning(f" ⚠️ {tool_name} tool continuation failed") self.test_results["per_tool_tests"][tool_name] = False self.logger.info(" βœ… Per-tool file deduplication tests completed") return True except Exception as e: self.logger.error(f"Per-tool file deduplication test failed: {e}") return False def _test_cross_tool_continuation(self) -> bool: """Test comprehensive cross-tool continuation scenarios""" try: self.logger.info("πŸ”§ Test 3: Cross-tool continuation scenarios") # Scenario 1: chat -> thinkdeep -> codereview self.logger.info(" 3.1: Testing chat -> thinkdeep -> codereview") # Start with chat chat_response, chat_id = self._call_mcp_tool( "chat", { "prompt": "Look at this Python code and tell me what you think about it", "files": [self.test_files["python"]], }, ) if not chat_response or not chat_id: self.logger.error("Failed to start chat conversation") return False # Continue with thinkdeep thinkdeep_response, _ = self._call_mcp_tool( "thinkdeep", { "prompt": "Think deeply about potential performance issues in this code", "files": [self.test_files["python"]], # Same file should be deduplicated "continuation_id": chat_id, }, ) if not thinkdeep_response: self.logger.error("Failed chat -> thinkdeep continuation") return False # Continue with codereview codereview_response, _ = self._call_mcp_tool( "codereview", { "files": [self.test_files["python"]], # Same file should be deduplicated "context": "Building on our previous analysis, provide a comprehensive code review", "continuation_id": chat_id, }, ) if not codereview_response: self.logger.error("Failed thinkdeep -> codereview continuation") return False self.logger.info(" βœ… chat -> thinkdeep -> codereview working") self.test_results["cross_tool_scenarios"]["chat_thinkdeep_codereview"] = True # Scenario 2: analyze -> debug -> thinkdeep self.logger.info(" 3.2: Testing analyze -> debug -> thinkdeep") # Start with analyze analyze_response, analyze_id = self._call_mcp_tool( "analyze", {"files": [self.test_files["python"]], "analysis_type": "code_quality"} ) if not analyze_response or not analyze_id: self.logger.warning("Failed to start analyze conversation, skipping scenario 2") else: # Continue with debug debug_response, _ = self._call_mcp_tool( "debug", { "files": [self.test_files["python"]], # Same file should be deduplicated "issue_description": "Based on our analysis, help debug the performance issue in fibonacci", "continuation_id": analyze_id, }, ) if debug_response: # Continue with thinkdeep final_response, _ = self._call_mcp_tool( "thinkdeep", { "prompt": "Think deeply about the architectural implications of the issues we've found", "files": [self.test_files["python"]], # Same file should be deduplicated "continuation_id": analyze_id, }, ) if final_response: self.logger.info(" βœ… analyze -> debug -> thinkdeep working") self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = True else: self.logger.warning(" ⚠️ debug -> thinkdeep continuation failed") self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = False else: self.logger.warning(" ⚠️ analyze -> debug continuation failed") self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = False # Scenario 3: Multi-file cross-tool continuation self.logger.info(" 3.3: Testing multi-file cross-tool continuation") # Start with both files multi_response, multi_id = self._call_mcp_tool( "chat", { "prompt": "Analyze both the Python code and configuration file", "files": [self.test_files["python"], self.test_files["config"]], }, ) if not multi_response or not multi_id: self.logger.warning("Failed to start multi-file conversation, skipping scenario 3") else: # Switch to codereview with same files (should use conversation history) multi_review, _ = self._call_mcp_tool( "codereview", { "files": [self.test_files["python"], self.test_files["config"]], # Same files "context": "Review both files in the context of our previous discussion", "continuation_id": multi_id, }, ) if multi_review: self.logger.info(" βœ… Multi-file cross-tool continuation working") self.test_results["cross_tool_scenarios"]["multi_file_continuation"] = True else: self.logger.warning(" ⚠️ Multi-file cross-tool continuation failed") self.test_results["cross_tool_scenarios"]["multi_file_continuation"] = False self.logger.info(" βœ… Cross-tool continuation scenarios completed") return True except Exception as e: self.logger.error(f"Cross-tool continuation test failed: {e}") return False def _test_state_isolation(self) -> bool: """Test that different conversation threads don't contaminate each other""" try: self.logger.info("πŸ”’ Test 4: State isolation and contamination detection") # Create a test file specifically for this test isolation_content = '''""" Test file for state isolation testing """ def isolated_function(): """This function should only appear in isolation tests""" return "ISOLATION_TEST_MARKER" class IsolationTestClass: """Class that should not leak between conversations""" def __init__(self): self.marker = "ISOLATION_BOUNDARY" ''' isolation_file = os.path.join(self.test_dir, "isolation_test.py") with open(isolation_file, "w") as f: f.write(isolation_content) # Test 1: Start two separate conversation threads self.logger.info(" 4.1: Creating separate conversation threads") # Thread A: Chat about original Python file response_a1, thread_a = self._call_mcp_tool( "chat", {"prompt": "Analyze this Python module", "files": [self.test_files["python"]]} ) if not response_a1 or not thread_a: self.logger.error("Failed to create thread A") return False # Thread B: Chat about isolation test file response_b1, thread_b = self._call_mcp_tool( "chat", {"prompt": "Analyze this isolation test file", "files": [isolation_file]} ) if not response_b1 or not thread_b: self.logger.error("Failed to create thread B") return False # Verify threads are different if thread_a == thread_b: self.logger.error("Threads are not isolated - same continuation_id returned") return False self.logger.info(f" βœ… Created isolated threads: {thread_a[:8]}... and {thread_b[:8]}...") # Test 2: Continue both threads and check for contamination self.logger.info(" 4.2: Testing cross-thread contamination") # Continue thread A - should only know about original Python file response_a2, _ = self._call_mcp_tool( "chat", {"prompt": "What functions did we discuss in the previous file?", "continuation_id": thread_a} ) # Continue thread B - should only know about isolation file response_b2, _ = self._call_mcp_tool( "chat", {"prompt": "What functions did we discuss in the previous file?", "continuation_id": thread_b} ) if not response_a2 or not response_b2: self.logger.error("Failed to continue isolated threads") return False # Parse responses to check for contamination response_a2_data = json.loads(response_a2) response_b2_data = json.loads(response_b2) content_a = response_a2_data.get("content", "") content_b = response_b2_data.get("content", "") # Thread A should mention fibonacci/factorial, not isolation functions # Thread B should mention isolation functions, not fibonacci/factorial contamination_detected = False if "isolated_function" in content_a or "IsolationTestClass" in content_a: self.logger.error("Thread A contaminated with Thread B content") contamination_detected = True if "fibonacci" in content_b or "factorial" in content_b or "Calculator" in content_b: self.logger.error("Thread B contaminated with Thread A content") contamination_detected = True if contamination_detected: self.test_results["cross_tool_scenarios"]["state_isolation"] = False return False self.logger.info(" βœ… No cross-thread contamination detected") # Test 3: Cross-tool switching with isolation self.logger.info(" 4.3: Testing cross-tool state isolation") # Switch thread A to codereview response_a3, _ = self._call_mcp_tool( "codereview", { "files": [self.test_files["python"]], "context": "Review the code we discussed", "continuation_id": thread_a, }, ) # Switch thread B to codereview response_b3, _ = self._call_mcp_tool( "codereview", {"files": [isolation_file], "context": "Review the isolation test code", "continuation_id": thread_b}, ) if response_a3 and response_b3: self.logger.info(" βœ… Cross-tool isolation maintained") self.test_results["cross_tool_scenarios"]["state_isolation"] = True else: self.logger.warning(" ⚠️ Cross-tool isolation test incomplete") self.test_results["cross_tool_scenarios"]["state_isolation"] = False # Cleanup isolation test file os.remove(isolation_file) self.logger.info(" βœ… State isolation tests completed") return True except Exception as e: self.logger.error(f"State isolation test failed: {e}") return False def _test_conversation_boundaries(self) -> bool: """Test conversation boundaries and proper reset behavior""" try: self.logger.info("🚧 Test 5: Conversation boundaries and reset behavior") # Test 1: Tool-to-tool-to-tool with fresh start self.logger.info(" 5.1: Testing A->B->A pattern with fresh conversations") # Start with chat response1, thread1 = self._call_mcp_tool( "chat", {"prompt": "Analyze the fibonacci function in this code", "files": [self.test_files["python"]]} ) if not response1 or not thread1: self.logger.warning("Failed to start boundary test, skipping") return True # Switch to codereview (continue conversation) response2, _ = self._call_mcp_tool( "codereview", { "files": [self.test_files["python"]], "context": "Building on our fibonacci discussion", "continuation_id": thread1, }, ) if not response2: self.logger.warning("Failed codereview continuation") return True # Switch back to chat but start FRESH conversation (no continuation_id) self.logger.info(" 5.2: Testing fresh conversation after previous context") response3, thread3 = self._call_mcp_tool( "chat", { "prompt": "Tell me about the Calculator class in this file", # Different focus "files": [self.test_files["python"]], # Same file but fresh context }, ) if not response3 or not thread3: self.logger.warning("Failed fresh conversation test") return True # Verify it's a truly fresh conversation if thread1 == thread3: self.logger.error("Fresh conversation got same thread ID - boundary violation!") self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False return False self.logger.info(f" βœ… Fresh conversation created: {thread3[:8]}... (vs {thread1[:8]}...)") # Test 2: Verify fresh conversation doesn't have stale context self.logger.info(" 5.3: Testing stale context isolation") # Continue the fresh conversation - should not reference fibonacci discussion response4, _ = self._call_mcp_tool( "chat", {"prompt": "What did we just discuss about this code?", "continuation_id": thread3} ) if response4: response4_data = json.loads(response4) content4 = response4_data.get("content", "") # Should reference Calculator class, not fibonacci from previous thread if "fibonacci" in content4.lower() and "calculator" not in content4.lower(): self.logger.error("Fresh conversation contaminated with stale context!") self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False return False else: self.logger.info(" βœ… Fresh conversation properly isolated from previous context") # Test 3: File access without continuation should work self.logger.info(" 5.4: Testing file access in fresh conversations") # New conversation with same files - should read files fresh response5, thread5 = self._call_mcp_tool( "chat", {"prompt": "What's the purpose of this configuration file?", "files": [self.test_files["config"]]}, ) if response5 and thread5: # Verify it can access the file content response5_data = json.loads(response5) content5 = response5_data.get("content", "") if "database" in content5.lower() or "redis" in content5.lower(): self.logger.info(" βœ… Fresh conversation can access files correctly") self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = True else: self.logger.warning(" ⚠️ Fresh conversation may not be reading files properly") self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False else: self.logger.warning(" ⚠️ Fresh conversation with config file failed") self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False self.logger.info(" βœ… Conversation boundary tests completed") return True except Exception as e: self.logger.error(f"Conversation boundary test failed: {e}") return False def _test_content_validation(self) -> bool: """Test that tools don't duplicate file content in their responses""" try: self.logger.info("πŸ“„ Test 7: Content validation and duplicate detection") # Create a test file with distinctive content for validation validation_content = '''""" Configuration file for content validation testing This content should appear only ONCE in any tool response """ # Configuration constants MAX_CONTENT_TOKENS = 800_000 # This line should appear exactly once TEMPERATURE_ANALYTICAL = 0.2 # This should also appear exactly once UNIQUE_VALIDATION_MARKER = "CONTENT_VALIDATION_TEST_12345" # Database settings DATABASE_CONFIG = { "host": "localhost", "port": 5432, "name": "validation_test_db" } ''' validation_file = os.path.join(self.test_dir, "validation_config.py") with open(validation_file, "w") as f: f.write(validation_content) # Test 1: Precommit tool with files parameter (where the bug occurred) self.logger.info(" 7.1: Testing precommit tool content duplication") # Call precommit tool with the validation file response1, thread_id = self._call_mcp_tool( "precommit", { "path": os.getcwd(), "files": [validation_file], "original_request": "Test for content duplication in precommit tool" } ) if response1: # Parse response and check for content duplication try: response_data = json.loads(response1) content = response_data.get("content", "") # Count occurrences of distinctive markers max_content_count = content.count("MAX_CONTENT_TOKENS = 800_000") temp_analytical_count = content.count("TEMPERATURE_ANALYTICAL = 0.2") unique_marker_count = content.count("UNIQUE_VALIDATION_MARKER") # Validate no duplication duplication_detected = False issues = [] if max_content_count > 1: issues.append(f"MAX_CONTENT_TOKENS appears {max_content_count} times") duplication_detected = True if temp_analytical_count > 1: issues.append(f"TEMPERATURE_ANALYTICAL appears {temp_analytical_count} times") duplication_detected = True if unique_marker_count > 1: issues.append(f"UNIQUE_VALIDATION_MARKER appears {unique_marker_count} times") duplication_detected = True if duplication_detected: self.logger.error(f" ❌ Content duplication detected in precommit tool: {'; '.join(issues)}") self.test_results["content_validation"]["precommit_duplication"] = False return False else: self.logger.info(" βœ… No content duplication in precommit tool") self.test_results["content_validation"]["precommit_duplication"] = True except json.JSONDecodeError: self.logger.warning(" ⚠️ Could not parse precommit response as JSON") else: self.logger.warning(" ⚠️ Precommit tool failed to respond") # Test 2: Other tools that use files parameter tools_to_test = [ ("chat", {"prompt": "Analyze this config file", "files": [validation_file]}), ("codereview", {"files": [validation_file], "context": "Review this configuration"}), ("analyze", {"files": [validation_file], "analysis_type": "code_quality"}) ] for tool_name, params in tools_to_test: self.logger.info(f" 7.{tool_name}: Testing {tool_name} tool content duplication") response, _ = self._call_mcp_tool(tool_name, params) if response: try: response_data = json.loads(response) content = response_data.get("content", "") # Check for duplication marker_count = content.count("UNIQUE_VALIDATION_MARKER") if marker_count > 1: self.logger.error(f" ❌ Content duplication in {tool_name}: marker appears {marker_count} times") self.test_results["content_validation"][f"{tool_name}_duplication"] = False else: self.logger.info(f" βœ… No content duplication in {tool_name}") self.test_results["content_validation"][f"{tool_name}_duplication"] = True except json.JSONDecodeError: self.logger.warning(f" ⚠️ Could not parse {tool_name} response") else: self.logger.warning(f" ⚠️ {tool_name} tool failed to respond") # Test 3: Cross-tool content validation with file deduplication self.logger.info(" 7.cross: Testing cross-tool content consistency") if thread_id: # Continue conversation with same file - content should be deduplicated in conversation history response2, _ = self._call_mcp_tool( "chat", { "prompt": "Continue analyzing this configuration file", "files": [validation_file], # Same file should be deduplicated "continuation_id": thread_id, }, ) if response2: try: response_data = json.loads(response2) content = response_data.get("content", "") # In continuation, the file content shouldn't be duplicated either marker_count = content.count("UNIQUE_VALIDATION_MARKER") if marker_count > 1: self.logger.error(f" ❌ Content duplication in cross-tool continuation: marker appears {marker_count} times") self.test_results["content_validation"]["cross_tool_duplication"] = False else: self.logger.info(" βœ… No content duplication in cross-tool continuation") self.test_results["content_validation"]["cross_tool_duplication"] = True except json.JSONDecodeError: self.logger.warning(" ⚠️ Could not parse continuation response") # Cleanup os.remove(validation_file) # Check if all content validation tests passed validation_results = self.test_results["content_validation"] all_passed = all(result for result in validation_results.values() if isinstance(result, bool)) if all_passed: self.logger.info(" βœ… All content validation tests passed") else: self.logger.error(" ❌ Some content validation tests failed") return False return True except Exception as e: self.logger.error(f"Content validation test failed: {e}") return False def _test_clarification_scenarios(self) -> bool: """Test requires_clarification scenarios and continuation with additional files""" try: self.logger.info("πŸ” Test 6: Requires clarification scenarios") # Test 1: Debug tool asking for missing files if not self._test_debug_clarification(): return False # Test 2: Analyze tool asking for related files if not self._test_analyze_clarification(): return False # Test 3: Clarification with file deduplication if not self._test_clarification_with_deduplication(): return False # Test 4: Multiple round clarification (clarification loop) if not self._test_clarification_loop(): return False # Test 5: Partial file provision edge case if not self._test_partial_file_provision(): return False # Test 6: Real clarification flow (might actually trigger requires_clarification) if not self._test_real_clarification_flow(): return False self.logger.info(" βœ… Clarification scenario tests completed") return True except Exception as e: self.logger.error(f"Clarification scenario test failed: {e}") return False def _test_debug_clarification(self) -> bool: """Test debug tool requesting clarification for missing files""" try: self.logger.info(" 6.1: Testing debug tool clarification flow") # Create a problematic file that imports from utils.py problematic_content = '''""" Main module with a bug that requires utils.py to debug """ import utils def main(): result = utils.calculate_something("hello") print(f"Result: {result}") if __name__ == "__main__": main() ''' # Create the problematic file problem_file = os.path.join(self.test_dir, "bug_main.py") with open(problem_file, "w") as f: f.write(problematic_content) # Step 1: Call debug tool with only the main file (should trigger clarification) # We'll simulate clarification by creating a mock response response1 = self._simulate_clarification_request( "debug", { "files": [problem_file], "error_description": "The application crashes with TypeError when running main()", }, ) if not response1: self.logger.warning(" ⚠️ Debug clarification simulation failed") return True # Don't fail entire test suite for simulation issues # For real testing, we would need the server to actually return requires_clarification # This is a proof of concept showing how to structure the test self.test_results["clarification_scenarios"]["debug_clarification"] = True self.logger.info(" βœ… Debug clarification flow structure verified") # Cleanup os.remove(problem_file) return True except Exception as e: self.logger.error(f"Debug clarification test failed: {e}") return False def _test_analyze_clarification(self) -> bool: """Test analyze tool requesting clarification for architecture analysis""" try: self.logger.info(" 6.2: Testing analyze tool clarification flow") # Create an incomplete file structure that would need more context partial_model = '''""" Partial model file that references other components """ from .base import BaseModel from .validators import validate_user_data class User(BaseModel): def __init__(self, username: str, email: str): self.username = username self.email = validate_user_data(email) super().__init__() ''' partial_file = os.path.join(self.test_dir, "partial_model.py") with open(partial_file, "w") as f: f.write(partial_model) # Simulate analyze tool clarification response1 = self._simulate_clarification_request( "analyze", { "files": [partial_file], "question": "Analyze the architecture and dependencies of this model", "analysis_type": "architecture", }, ) if response1: self.test_results["clarification_scenarios"]["analyze_clarification"] = True self.logger.info(" βœ… Analyze clarification flow structure verified") # Cleanup os.remove(partial_file) return True except Exception as e: self.logger.error(f"Analyze clarification test failed: {e}") return False def _test_clarification_with_deduplication(self) -> bool: """Test that clarification preserves file deduplication across turns""" try: self.logger.info(" 6.3: Testing clarification with file deduplication") # Start conversation with file A response1, thread_id = self._call_mcp_tool( "chat", {"prompt": "Analyze this Python code", "files": [self.test_files["python"]]} ) if not response1 or not thread_id: self.logger.warning(" ⚠️ Initial conversation failed") return True # Continue conversation asking for additional analysis with same file + new file # This should deduplicate the original file response2, _ = self._call_mcp_tool( "chat", { "prompt": "Now also analyze this config file in relation to the Python code", "files": [ self.test_files["python"], self.test_files["config"], ], # python file should be deduplicated "continuation_id": thread_id, }, ) if response2: self.test_results["clarification_scenarios"]["clarification_deduplication"] = True self.logger.info(" βœ… Clarification with file deduplication working") return True except Exception as e: self.logger.error(f"Clarification deduplication test failed: {e}") return False def _test_clarification_loop(self) -> bool: """Test multiple rounds of clarification in a single conversation""" try: self.logger.info(" 6.4: Testing clarification loop scenarios") # Create a complex file that would need multiple clarifications complex_content = '''""" Complex module with multiple dependencies and configurations """ import config import database import cache from external_api import APIClient def process_data(data): # Complex processing that would need clarification on each component conn = database.get_connection(config.DB_CONFIG) cached_result = cache.get(data.id) api_result = APIClient().fetch_additional_data(data.external_id) return combine_results(cached_result, api_result) ''' complex_file = os.path.join(self.test_dir, "complex_module.py") with open(complex_file, "w") as f: f.write(complex_content) # Simulate multiple clarification rounds # This is a structure test - in real implementation, each round would provide more files responses = [] # Round 1: Initial request response1 = self._simulate_clarification_request( "debug", {"files": [complex_file], "error_description": "Complex error in data processing pipeline"} ) responses.append(response1) # Round 2: Provide config.py but still need database.py if response1: response2 = self._simulate_clarification_request( "debug", { "files": [complex_file, self.test_files["config"]], "error_description": "Still need database configuration", "continuation_id": "mock_thread_id", }, ) responses.append(response2) if all(responses): self.test_results["clarification_scenarios"]["clarification_loop"] = True self.logger.info(" βœ… Clarification loop structure verified") # Cleanup os.remove(complex_file) return True except Exception as e: self.logger.error(f"Clarification loop test failed: {e}") return False def _test_partial_file_provision(self) -> bool: """Test edge case where user provides only some of requested files""" try: self.logger.info(" 6.5: Testing partial file provision edge case") # This test would verify that when a tool asks for multiple files # but user only provides some, the conversation can continue gracefully # Create multiple related files file1_content = '''"""File 1 - main module""" def main_function(): return "main" ''' file2_content = '''"""File 2 - utility module""" def utility_function(): return "utility" ''' file1_path = os.path.join(self.test_dir, "file1.py") file2_path = os.path.join(self.test_dir, "file2.py") with open(file1_path, "w") as f: f.write(file1_content) with open(file2_path, "w") as f: f.write(file2_content) # Simulate tool asking for both files # Simulate user providing only file1.py (partial provision) # In real implementation, this should trigger another clarification for file2.py partial_response = self._simulate_partial_file_response([file1_path]) if partial_response: self.test_results["clarification_scenarios"]["partial_file_provision"] = True self.logger.info(" βœ… Partial file provision edge case structure verified") # Cleanup os.remove(file1_path) os.remove(file2_path) return True except Exception as e: self.logger.error(f"Partial file provision test failed: {e}") return False def _simulate_clarification_request(self, tool_name: str, params: dict) -> Optional[str]: """ Simulate a tool call that would trigger requires_clarification. In real implementation, this would intercept the actual Gemini response. """ try: # This is a mock implementation showing the structure # In a real test, we would: # 1. Mock the Gemini API response to return requires_clarification # 2. Call the actual MCP tool # 3. Verify the response format and conversation ID preservation mock_response = { "status": "requires_clarification", "question": f"Mock clarification from {tool_name} tool", "files_needed": ["additional_file.py"], "conversation_id": f"mock_thread_{tool_name}", } self.logger.debug(f" πŸ“ Simulated {tool_name} clarification: {mock_response}") return json.dumps(mock_response) except Exception as e: self.logger.error(f"Clarification simulation failed: {e}") return None def _simulate_partial_file_response(self, provided_files: list[str]) -> Optional[str]: """Simulate user providing only some of the requested files""" try: # This would test the server's handling of incomplete file provision mock_response = { "status": "partial_provision", "provided_files": provided_files, "still_needed": ["missing_file.py"], } self.logger.debug(f" πŸ“ Simulated partial file provision: {mock_response}") return json.dumps(mock_response) except Exception as e: self.logger.error(f"Partial file response simulation failed: {e}") return None def _test_real_clarification_flow(self) -> bool: """Test a real clarification flow that might trigger requires_clarification from Gemini""" try: self.logger.info(" 6.6: Testing real clarification flow with ambiguous prompts") # Create an intentionally ambiguous debugging scenario ambiguous_content = '''""" Ambiguous code that would be hard to debug without context """ def mysterious_function(data): result = process_data(data) # Where is process_data defined? return result.transform() # What is the structure of result? class DataProcessor: def __init__(self): self.config = load_config() # Where is load_config from? def run(self): return mysterious_function(self.get_data()) # Where is get_data? ''' ambiguous_file = os.path.join(self.test_dir, "ambiguous.py") with open(ambiguous_file, "w") as f: f.write(ambiguous_content) # Try debug tool with minimal context - this might trigger clarification response1, thread_id = self._call_mcp_tool( "debug", {"files": [ambiguous_file], "error_description": "Code crashes with AttributeError"} ) if response1: try: response_data = json.loads(response1) if response_data.get("status") == "requires_clarification": self.logger.info(" 🎯 Real clarification response received!") self.test_results["clarification_scenarios"]["real_clarification_flow"] = True # Test continuation with additional context if thread_id: # Provide additional files continuation_response, _ = self._call_mcp_tool( "debug", { "files": [ambiguous_file, self.test_files["python"]], "error_description": "Additional context provided", "continuation_id": thread_id, }, ) if continuation_response: self.logger.info(" βœ… Clarification continuation working") else: self.logger.info(" ℹ️ No clarification triggered (Gemini provided analysis directly)") self.test_results["clarification_scenarios"]["real_clarification_flow"] = True except json.JSONDecodeError: self.logger.warning(" ⚠️ Could not parse response as JSON") # Cleanup os.remove(ambiguous_file) return True except Exception as e: self.logger.error(f"Real clarification flow test failed: {e}") return False def _call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]: """Simulate calling an MCP tool via Claude CLI (docker exec)""" try: # Prepare the MCP initialization and tool call sequence init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "clientInfo": {"name": "communication-simulator", "version": "1.0.0"}, }, } # Send initialized notification initialized_notification = {"jsonrpc": "2.0", "method": "notifications/initialized"} # Prepare the tool call request tool_request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": tool_name, "arguments": params}, } # Combine all messages messages = [json.dumps(init_request), json.dumps(initialized_notification), json.dumps(tool_request)] # Join with newlines as MCP expects input_data = "\n".join(messages) + "\n" # Simulate Claude CLI calling the MCP server via docker exec docker_cmd = ["docker", "exec", "-i", self.container_name, "python", "server.py"] self.logger.debug(f"Calling MCP tool {tool_name} with proper initialization") # Execute the command result = subprocess.run( docker_cmd, input=input_data, text=True, capture_output=True, timeout=120 # 2 minute timeout ) if result.returncode != 0: self.logger.error(f"Docker exec failed: {result.stderr}") return None, None # Parse the response - look for the tool call response response_data = self._parse_mcp_response(result.stdout, expected_id=2) if not response_data: return None, None # Extract continuation_id if present continuation_id = self._extract_continuation_id(response_data) return response_data, continuation_id except subprocess.TimeoutExpired: self.logger.error(f"MCP tool call timed out: {tool_name}") return None, None except Exception as e: self.logger.error(f"MCP tool call failed: {e}") return None, None def _parse_mcp_response(self, stdout: str, expected_id: int = 2) -> Optional[str]: """Parse MCP JSON-RPC response from stdout""" try: lines = stdout.strip().split("\n") for line in lines: if line.strip() and line.startswith("{"): response = json.loads(line) # Look for the tool call response with the expected ID if response.get("id") == expected_id and "result" in response: # Extract the actual content from the response result = response["result"] # Handle new response format with 'content' array if isinstance(result, dict) and "content" in result: content_array = result["content"] if isinstance(content_array, list) and len(content_array) > 0: return content_array[0].get("text", "") # Handle legacy format elif isinstance(result, list) and len(result) > 0: return result[0].get("text", "") elif response.get("id") == expected_id and "error" in response: self.logger.error(f"MCP error: {response['error']}") return None # If we get here, log all responses for debugging self.logger.warning(f"No valid tool call response found for ID {expected_id}") self.logger.debug(f"Full stdout: {stdout}") return None except json.JSONDecodeError as e: self.logger.error(f"Failed to parse MCP response: {e}") self.logger.debug(f"Stdout that failed to parse: {stdout}") return None def _extract_continuation_id(self, response_text: str) -> Optional[str]: """Extract continuation_id from response metadata""" try: # Parse the response text as JSON to look for continuation metadata response_data = json.loads(response_text) # Look for continuation_id in various places if isinstance(response_data, dict): # Check metadata metadata = response_data.get("metadata", {}) if "thread_id" in metadata: return metadata["thread_id"] # Check follow_up_request follow_up = response_data.get("follow_up_request", {}) if follow_up and "continuation_id" in follow_up: return follow_up["continuation_id"] # Check continuation_offer continuation_offer = response_data.get("continuation_offer", {}) if continuation_offer and "continuation_id" in continuation_offer: return continuation_offer["continuation_id"] self.logger.debug(f"No continuation_id found in response: {response_data}") return None except json.JSONDecodeError as e: self.logger.debug(f"Failed to parse response for continuation_id: {e}") return None def validate_docker_logs(self) -> bool: """Validate Docker logs to confirm file deduplication behavior""" try: self.logger.info("πŸ“‹ Validating Docker logs for file deduplication...") # Get server logs from both main container and activity logs result = self._run_command(["docker", "logs", self.container_name], capture_output=True) if result.returncode != 0: self.logger.error(f"Failed to get Docker logs: {result.stderr}") return False main_logs = result.stdout.decode() + result.stderr.decode() # Also get activity logs for more detailed conversation tracking activity_result = self._run_command( ["docker", "exec", self.container_name, "cat", "/tmp/mcp_activity.log"], capture_output=True ) activity_logs = "" if activity_result.returncode == 0: activity_logs = activity_result.stdout.decode() logs = main_logs + "\n" + activity_logs # Look for conversation threading patterns that indicate the system is working conversation_patterns = [ "CONVERSATION_RESUME", "CONVERSATION_CONTEXT", "previous turns loaded", "tool embedding", "files included", "files truncated", "already in conversation history", ] conversation_lines = [] for line in logs.split("\n"): for pattern in conversation_patterns: if pattern.lower() in line.lower(): conversation_lines.append(line.strip()) break # Look for evidence of conversation threading and file handling conversation_threading_found = False multi_turn_conversations = False for line in conversation_lines: lower_line = line.lower() if "conversation_resume" in lower_line: conversation_threading_found = True self.logger.debug(f"πŸ“„ Conversation threading: {line}") elif "previous turns loaded" in lower_line: multi_turn_conversations = True self.logger.debug(f"πŸ“„ Multi-turn conversation: {line}") elif "already in conversation" in lower_line: self.logger.info(f"βœ… Found explicit deduplication: {line}") return True # Conversation threading with multiple turns is evidence of file deduplication working if conversation_threading_found and multi_turn_conversations: self.logger.info("βœ… Conversation threading with multi-turn context working") self.logger.info( "βœ… File deduplication working implicitly (files embedded once in conversation history)" ) self.test_results["logs_validation"] = True return True elif conversation_threading_found: self.logger.info("βœ… Conversation threading detected") return True else: self.logger.warning("⚠️ No clear evidence of conversation threading in logs") self.logger.debug(f"Found {len(conversation_lines)} conversation-related log lines") return False except Exception as e: self.logger.error(f"Log validation failed: {e}") return False def validate_conversation_memory(self) -> bool: """Validate that conversation memory is working via Redis""" try: self.logger.info("πŸ’Ύ Validating conversation memory via Redis...") # Check Redis for stored conversations result = self._run_command( ["docker", "exec", self.redis_container, "redis-cli", "KEYS", "thread:*"], capture_output=True ) if result.returncode != 0: self.logger.error("Failed to query Redis") return False keys = result.stdout.decode().strip().split("\n") thread_keys = [k for k in keys if k.startswith("thread:")] if thread_keys: self.logger.info(f"βœ… Found {len(thread_keys)} conversation threads in Redis") # Get details of first thread if thread_keys: thread_key = thread_keys[0] result = self._run_command( ["docker", "exec", self.redis_container, "redis-cli", "GET", thread_key], capture_output=True ) if result.returncode == 0: thread_data = result.stdout.decode() try: parsed = json.loads(thread_data) turns = parsed.get("turns", []) self.logger.info(f"βœ… Thread has {len(turns)} turns") self.test_results["redis_validation"] = True return True except json.JSONDecodeError: self.logger.warning("Could not parse thread data") self.test_results["redis_validation"] = True return True else: self.logger.warning("⚠️ No conversation threads found in Redis") return False except Exception as e: self.logger.error(f"Conversation memory validation failed: {e}") return False def cleanup(self): """Cleanup test environment""" try: self.logger.info("🧹 Cleaning up test environment...") if not self.keep_logs: # Stop Docker services self._run_command(["docker", "compose", "down", "--remove-orphans"], check=False, capture_output=True) else: self.logger.info("πŸ“‹ Keeping Docker services running for log inspection") # Remove temp directory if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.logger.debug(f"Removed temp directory: {self.temp_dir}") # Remove test files directory if hasattr(self, "test_dir") and self.test_dir and os.path.exists(self.test_dir): shutil.rmtree(self.test_dir) self.logger.debug(f"Removed test files directory: {self.test_dir}") except Exception as e: self.logger.error(f"Cleanup failed: {e}") def _run_command(self, cmd: list[str], check: bool = True, capture_output: bool = False, **kwargs): """Run a shell command with logging""" if self.verbose: self.logger.debug(f"Running: {' '.join(cmd)}") return subprocess.run(cmd, check=check, capture_output=capture_output, **kwargs) def print_test_summary(self): """Print comprehensive test results summary""" print("\n" + "=" * 70) print("πŸ§ͺ GEMINI MCP COMMUNICATION SIMULATOR - TEST RESULTS SUMMARY") print("=" * 70) # Basic conversation flow status = "βœ… PASS" if self.test_results["basic_conversation"] else "❌ FAIL" print(f"πŸ“ Basic Conversation Flow: {status}") # Per-tool tests print("\nπŸ“„ Per-Tool File Deduplication Tests:") tools_tested = len(self.test_results["per_tool_tests"]) tools_passed = sum(1 for passed in self.test_results["per_tool_tests"].values() if passed) if tools_tested > 0: for tool, passed in self.test_results["per_tool_tests"].items(): status = "βœ… PASS" if passed else "❌ FAIL" print(f" β€’ {tool}: {status}") print(f" β†’ Summary: {tools_passed}/{tools_tested} tools passed") else: print(" β†’ No tools tested") # Cross-tool scenarios print("\nπŸ”§ Cross-Tool Continuation Scenarios:") scenarios_tested = len(self.test_results["cross_tool_scenarios"]) scenarios_passed = sum(1 for passed in self.test_results["cross_tool_scenarios"].values() if passed is True) if scenarios_tested > 0: scenario_names = { "chat_thinkdeep_codereview": "chat β†’ thinkdeep β†’ codereview", "analyze_debug_thinkdeep": "analyze β†’ debug β†’ thinkdeep", "multi_file_continuation": "Multi-file continuation", "state_isolation": "State isolation (contamination detection)", "conversation_boundaries": "Conversation boundaries & reset behavior", } for scenario, passed in self.test_results["cross_tool_scenarios"].items(): name = scenario_names.get(scenario, scenario) if passed is True: status = "βœ… PASS" elif passed is False: status = "❌ FAIL" else: status = "⏸️ SKIP" print(f" β€’ {name}: {status}") print(f" β†’ Summary: {scenarios_passed}/{scenarios_tested} scenarios passed") else: print(" β†’ No scenarios tested") # Content validation print("\nπŸ“„ Content Validation (Duplicate Detection):") content_validation_tested = len(self.test_results["content_validation"]) content_validation_passed = sum( 1 for passed in self.test_results["content_validation"].values() if passed is True ) if content_validation_tested > 0: content_validation_names = { "precommit_duplication": "Precommit tool content duplication", "chat_duplication": "Chat tool content duplication", "codereview_duplication": "Code review tool content duplication", "analyze_duplication": "Analyze tool content duplication", "cross_tool_duplication": "Cross-tool content duplication", } for test, passed in self.test_results["content_validation"].items(): name = content_validation_names.get(test, test) if passed is True: status = "βœ… PASS" elif passed is False: status = "❌ FAIL" else: status = "⏸️ SKIP" print(f" β€’ {name}: {status}") print(f" β†’ Summary: {content_validation_passed}/{content_validation_tested} content validation tests passed") else: print(" β†’ No content validation tests run") # Clarification scenarios print("\nπŸ” Requires Clarification Scenarios:") clarification_tested = len(self.test_results["clarification_scenarios"]) clarification_passed = sum( 1 for passed in self.test_results["clarification_scenarios"].values() if passed is True ) if clarification_tested > 0: clarification_names = { "debug_clarification": "Debug tool clarification flow", "analyze_clarification": "Analyze tool clarification flow", "clarification_deduplication": "Clarification with file deduplication", "clarification_loop": "Multiple round clarification (loop)", "partial_file_provision": "Partial file provision edge case", "real_clarification_flow": "Real clarification flow with ambiguous prompts", } for scenario, passed in self.test_results["clarification_scenarios"].items(): name = clarification_names.get(scenario, scenario) if passed is True: status = "βœ… PASS" elif passed is False: status = "❌ FAIL" else: status = "⏸️ SKIP" print(f" β€’ {name}: {status}") print(f" β†’ Summary: {clarification_passed}/{clarification_tested} clarification scenarios passed") else: print(" β†’ No clarification scenarios tested") # System validation print("\nπŸ’Ύ System Validation:") logs_status = "βœ… PASS" if self.test_results["logs_validation"] else "❌ FAIL" redis_status = "βœ… PASS" if self.test_results["redis_validation"] else "❌ FAIL" print(f" β€’ Docker logs (conversation threading): {logs_status}") print(f" β€’ Redis memory (conversation persistence): {redis_status}") # Overall result all_core_tests = [ self.test_results["basic_conversation"], self.test_results["logs_validation"], self.test_results["redis_validation"], ] tool_tests_ok = tools_tested == 0 or tools_passed > 0 scenario_tests_ok = scenarios_tested == 0 or scenarios_passed > 0 clarification_tests_ok = clarification_tested == 0 or clarification_passed > 0 content_validation_ok = content_validation_tested == 0 or content_validation_passed > 0 overall_success = all(all_core_tests) and tool_tests_ok and scenario_tests_ok and clarification_tests_ok and content_validation_ok print(f"\n🎯 OVERALL RESULT: {'πŸŽ‰ SUCCESS' if overall_success else '❌ FAILURE'}") if overall_success: print("βœ… MCP server conversation continuity and file deduplication working correctly!") print("βœ… All core systems validated") if tools_passed > 0: print(f"βœ… {tools_passed} tools working with file deduplication") if scenarios_passed > 0: print(f"βœ… {scenarios_passed} cross-tool scenarios working") if clarification_passed > 0: print(f"βœ… {clarification_passed} clarification scenarios verified") if content_validation_passed > 0: print(f"βœ… {content_validation_passed} content validation tests passed") else: print("⚠️ Some tests failed - check individual results above") print("=" * 70) return overall_success def run_individual_test(self, test_name: str, skip_docker_setup: bool = False) -> bool: """Run a single test individually with optional Docker setup skip""" try: if test_name not in self.available_tests: self.logger.error(f"Unknown test: {test_name}") self.logger.info(f"Available tests: {', '.join(self.available_tests.keys())}") return False self.logger.info(f"πŸ§ͺ Running individual test: {test_name}") # Setup environment unless skipped if not skip_docker_setup: if not self.setup_test_environment(): self.logger.error("❌ Environment setup failed") return False # Run the single test test_function = self.available_tests[test_name] result = test_function() if result: self.logger.info(f"βœ… Individual test {test_name} passed") else: self.logger.error(f"❌ Individual test {test_name} failed") return result except Exception as e: self.logger.error(f"Individual test {test_name} failed with exception: {e}") return False finally: if not skip_docker_setup and not self.keep_logs: self.cleanup() def run_full_test_suite(self) -> bool: """Run the complete test suite""" try: self.logger.info("πŸš€ Starting Gemini MCP Communication Simulator Test Suite") # Setup if not self.setup_test_environment(): self.logger.error("❌ Environment setup failed") return False # Main simulation if not self.simulate_claude_cli_session(): self.logger.error("❌ Claude CLI simulation failed") return False # Validation self.validate_docker_logs() self.validate_conversation_memory() # Print comprehensive summary overall_success = self.print_test_summary() return overall_success except Exception as e: self.logger.error(f"Test suite failed: {e}") return False finally: if not self.keep_logs: self.cleanup() def parse_arguments(): """Parse and validate command line arguments""" parser = argparse.ArgumentParser(description="Gemini MCP Communication Simulator Test") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") parser.add_argument("--keep-logs", action="store_true", help="Keep Docker services running for log inspection") parser.add_argument("--tests", "-t", nargs="+", help="Specific tests to run (space-separated)") parser.add_argument("--list-tests", action="store_true", help="List available tests and exit") parser.add_argument("--individual", "-i", help="Run a single test individually") parser.add_argument("--skip-docker", action="store_true", help="Skip Docker setup (assumes containers are already running)") return parser.parse_args() def list_available_tests(): """List all available tests and exit""" simulator = CommunicationSimulator() print("Available tests:") for test_name, description in simulator.get_available_tests().items(): print(f" {test_name:<25} - {description}") def run_individual_test(simulator, test_name, skip_docker): """Run a single test individually""" try: success = simulator.run_individual_test(test_name, skip_docker_setup=skip_docker) if success: print(f"\nπŸŽ‰ INDIVIDUAL TEST {test_name.upper()}: PASSED") return 0 else: print(f"\n❌ INDIVIDUAL TEST {test_name.upper()}: FAILED") return 1 except KeyboardInterrupt: print(f"\nπŸ›‘ Individual test {test_name} interrupted by user") if not skip_docker: simulator.cleanup() return 130 except Exception as e: print(f"\nπŸ’₯ Individual test {test_name} failed with error: {e}") if not skip_docker: simulator.cleanup() return 1 def run_test_suite(simulator): """Run the full test suite or selected tests""" try: success = simulator.run_full_test_suite() if success: print("\nπŸŽ‰ COMPREHENSIVE MCP COMMUNICATION TEST: PASSED") return 0 else: print("\n❌ COMPREHENSIVE MCP COMMUNICATION TEST: FAILED") print("⚠️ Check detailed results above") return 1 except KeyboardInterrupt: print("\nπŸ›‘ Test interrupted by user") simulator.cleanup() return 130 except Exception as e: print(f"\nπŸ’₯ Unexpected error: {e}") simulator.cleanup() return 1 def main(): """Main entry point""" args = parse_arguments() # Handle list tests request if args.list_tests: list_available_tests() return # Initialize simulator consistently for all use cases simulator = CommunicationSimulator( verbose=args.verbose, keep_logs=args.keep_logs, selected_tests=args.tests ) # Determine execution mode and run if args.individual: exit_code = run_individual_test(simulator, args.individual, args.skip_docker) else: exit_code = run_test_suite(simulator) sys.exit(exit_code) if __name__ == "__main__": main()