From 98eab46abfd12152df3a1df1b931c97ba2694445 Mon Sep 17 00:00:00 2001 From: Fahad Date: Wed, 11 Jun 2025 13:24:59 +0400 Subject: [PATCH] WIP - improvements to token usage tracking, simulator added for live testing, improvements to file loading --- README.md | 7 + communication_simulator_test.py | 1246 +++++++++++++++++++++++++ config.py | 12 +- server.py | 11 +- tests/test_conversation_memory.py | 13 +- tests/test_cross_tool_continuation.py | 2 +- tests/test_large_prompt_handling.py | 10 +- tests/test_precommit.py | 23 +- tests/test_prompt_regression.py | 8 +- tools/base.py | 69 +- tools/chat.py | 11 +- tools/precommit.py | 12 +- utils/conversation_memory.py | 23 +- 13 files changed, 1383 insertions(+), 64 deletions(-) create mode 100755 communication_simulator_test.py diff --git a/README.md b/README.md index b491b5f..84ea01f 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,13 @@ The ultimate development partner for Claude - a Model Context Protocol server th **Think of it as Claude Code _for_ Claude Code.** +--- + +> ⚠️ **Active Development Notice** +> This project is under rapid development with frequent commits and changes over the past few days. +> The goal is to expand support beyond Gemini to include additional AI models and providers. +> **Watch this space** for new capabilities and potentially breaking changes in between updates! + ## Quick Navigation - **Getting Started** diff --git a/communication_simulator_test.py b/communication_simulator_test.py new file mode 100755 index 0000000..b6d9dbe --- /dev/null +++ b/communication_simulator_test.py @@ -0,0 +1,1246 @@ +#!/usr/bin/env python3 +""" +Communication Simulator Test for Gemini MCP Server + +This script provides comprehensive end-to-end testing of the Gemini MCP server +by simulating real Claude CLI communications and validating conversation +continuity, file handling, and deduplication features. + +Test Flow: +1. Setup fresh Docker environment with clean containers +2. Simulate Claude CLI tool calls via docker exec +3. Test conversation threading with file handling +4. Validate file deduplication in conversation history +5. Check Docker logs for proper behavior +6. Cleanup and report results + +Usage: + python communication_simulator_test.py [--verbose] [--keep-logs] +""" + +import argparse +import json +import logging +import os +import shutil +import subprocess +import sys +import tempfile +import time +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +class CommunicationSimulator: + """Simulates real-world Claude CLI communication with MCP Gemini server""" + + def __init__(self, verbose: bool = False, keep_logs: bool = False): + self.verbose = verbose + self.keep_logs = keep_logs + self.temp_dir = None + self.test_files = {} + self.container_name = "gemini-mcp-server" + self.redis_container = "gemini-mcp-redis" + + # Test result tracking + self.test_results = { + "basic_conversation": False, + "per_tool_tests": {}, + "cross_tool_scenarios": {}, + "logs_validation": False, + "redis_validation": False + } + + # Configure logging + log_level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + self.logger = logging.getLogger(__name__) + + def setup_test_environment(self) -> bool: + """Setup fresh Docker environment and test files""" + try: + self.logger.info("πŸš€ Setting up test environment...") + + # Create temporary directory for test files + self.temp_dir = tempfile.mkdtemp(prefix="mcp_test_") + self.logger.debug(f"Created temp directory: {self.temp_dir}") + + # Create test files + self._create_test_files() + + # Setup Docker environment + return self._setup_docker() + + except Exception as e: + self.logger.error(f"Failed to setup test environment: {e}") + return False + + def _create_test_files(self): + """Create test files for the simulation in a location accessible by Docker""" + # Test Python file + python_content = '''""" +Sample Python module for testing MCP conversation continuity +""" + +def fibonacci(n): + """Calculate fibonacci number recursively""" + if n <= 1: + return n + return fibonacci(n-1) + fibonacci(n-2) + +def factorial(n): + """Calculate factorial iteratively""" + result = 1 + for i in range(1, n + 1): + result *= i + return result + +class Calculator: + """Simple calculator class""" + + def __init__(self): + self.history = [] + + def add(self, a, b): + result = a + b + self.history.append(f"{a} + {b} = {result}") + return result + + def multiply(self, a, b): + result = a * b + self.history.append(f"{a} * {b} = {result}") + return result +''' + + # Test configuration file + config_content = '''{ + "database": { + "host": "localhost", + "port": 5432, + "name": "testdb", + "ssl": true + }, + "cache": { + "redis_url": "redis://localhost:6379", + "ttl": 3600 + }, + "logging": { + "level": "INFO", + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + } +}''' + + # Create files in the current project directory so they're accessible to MCP tools + # MCP tools can access files with absolute paths within the project + current_dir = os.getcwd() + test_dir = os.path.join(current_dir, "test_simulation_files") + os.makedirs(test_dir, exist_ok=True) + + test_py = os.path.join(test_dir, "test_module.py") + test_config = os.path.join(test_dir, "config.json") + + with open(test_py, 'w') as f: + f.write(python_content) + with open(test_config, 'w') as f: + f.write(config_content) + + self.test_files = { + "python": test_py, + "config": test_config + } + + # Store test directory for cleanup + self.test_dir = test_dir + + self.logger.debug(f"Created test files: {list(self.test_files.values())}") + + def _setup_docker(self) -> bool: + """Setup fresh Docker environment""" + try: + self.logger.info("🐳 Setting up Docker environment...") + + # Stop and remove existing containers + self._run_command(["docker", "compose", "down", "--remove-orphans"], + check=False, capture_output=True) + + # Clean up any old containers/images + old_containers = [self.container_name, self.redis_container] + for container in old_containers: + self._run_command(["docker", "stop", container], + check=False, capture_output=True) + self._run_command(["docker", "rm", container], + check=False, capture_output=True) + + # Build and start services + self.logger.info("πŸ“¦ Building Docker images...") + result = self._run_command(["docker", "compose", "build", "--no-cache"], + capture_output=True) + if result.returncode != 0: + self.logger.error(f"Docker build failed: {result.stderr}") + return False + + self.logger.info("πŸš€ Starting Docker services...") + result = self._run_command(["docker", "compose", "up", "-d"], + capture_output=True) + if result.returncode != 0: + self.logger.error(f"Docker startup failed: {result.stderr}") + return False + + # Wait for services to be ready + self.logger.info("⏳ Waiting for services to be ready...") + time.sleep(10) # Give services time to initialize + + # Verify containers are running + if not self._verify_containers(): + return False + + self.logger.info("βœ… Docker environment ready") + return True + + except Exception as e: + self.logger.error(f"Docker setup failed: {e}") + return False + + def _verify_containers(self) -> bool: + """Verify that required containers are running""" + try: + result = self._run_command(["docker", "ps", "--format", "{{.Names}}"], + capture_output=True) + running_containers = result.stdout.decode().strip().split('\n') + + required = [self.container_name, self.redis_container] + for container in required: + if container not in running_containers: + self.logger.error(f"Container not running: {container}") + return False + + self.logger.debug(f"Verified containers running: {required}") + return True + + except Exception as e: + self.logger.error(f"Container verification failed: {e}") + return False + + def simulate_claude_cli_session(self) -> bool: + """Simulate a complete Claude CLI session with conversation continuity""" + try: + self.logger.info("πŸ€– Starting Claude CLI simulation...") + + # Test basic conversation continuity + if not self._test_basic_conversation_flow(): + return False + + # Test per-tool file deduplication + if not self._test_per_tool_file_deduplication(): + return False + + # Test comprehensive cross-tool continuation + if not self._test_cross_tool_continuation(): + return False + + # Test state isolation and contamination detection + if not self._test_state_isolation(): + return False + + # Test conversation boundaries and reset behavior + if not self._test_conversation_boundaries(): + return False + + self.logger.info("βœ… All conversation continuity tests passed") + return True + + except Exception as e: + self.logger.error(f"Claude CLI simulation failed: {e}") + return False + + def _test_basic_conversation_flow(self) -> bool: + """Test basic conversation flow with chat tool""" + try: + self.logger.info("πŸ“ Test 1: Basic conversation flow") + + # Initial chat tool call with file + self.logger.info(" 1.1: Initial chat with file analysis") + response1, continuation_id = self._call_mcp_tool( + "chat", + { + "prompt": "Analyze this Python code and explain what it does", + "files": [self.test_files["python"]] + } + ) + + if not response1 or not continuation_id: + self.logger.error("Failed to get initial response with continuation_id") + return False + + self.logger.info(f" βœ… Got continuation_id: {continuation_id}") + + # Continue conversation with same file (should be deduplicated) + self.logger.info(" 1.2: Continue conversation with same file") + response2, _ = self._call_mcp_tool( + "chat", + { + "prompt": "Now focus on the Calculator class specifically. Are there any improvements you'd suggest?", + "files": [self.test_files["python"]], # Same file - should be deduplicated + "continuation_id": continuation_id + } + ) + + if not response2: + self.logger.error("Failed to continue conversation") + return False + + # Continue with additional file + self.logger.info(" 1.3: Continue conversation with additional file") + response3, _ = self._call_mcp_tool( + "chat", + { + "prompt": "Now also analyze this configuration file and see how it might relate to the Python code", + "files": [self.test_files["python"], self.test_files["config"]], + "continuation_id": continuation_id + } + ) + + if not response3: + self.logger.error("Failed to continue with additional file") + return False + + self.logger.info(" βœ… Basic conversation flow working") + self.test_results["basic_conversation"] = True + return True + + except Exception as e: + self.logger.error(f"Basic conversation flow test failed: {e}") + return False + + def _test_per_tool_file_deduplication(self) -> bool: + """Test file deduplication for each individual tool""" + try: + self.logger.info("πŸ“„ Test 2: Per-tool file deduplication") + + tools_to_test = [ + ("thinkdeep", { + "prompt": "Think deeply about this Python code and identify potential architectural improvements", + "files": [self.test_files["python"]] + }), + ("analyze", { + "files": [self.test_files["python"]], + "analysis_type": "architecture" + }), + ("debug", { + "files": [self.test_files["python"]], + "issue_description": "The fibonacci function seems slow for large numbers" + }), + ("codereview", { + "files": [self.test_files["python"]], + "context": "General code review for quality and best practices" + }) + ] + + for tool_name, initial_params in tools_to_test: + self.logger.info(f" 2.{tool_name}: Testing {tool_name} tool file deduplication") + + # Initial call + response1, continuation_id = self._call_mcp_tool(tool_name, initial_params) + if not response1: + self.logger.warning(f" ⚠️ {tool_name} tool initial call failed, skipping") + continue + + if not continuation_id: + self.logger.warning(f" ⚠️ {tool_name} tool didn't provide continuation_id, skipping") + continue + + # Continue with same file - should be deduplicated + continue_params = initial_params.copy() + continue_params["continuation_id"] = continuation_id + + if tool_name == "thinkdeep": + continue_params["prompt"] = "Now focus specifically on the recursive fibonacci implementation" + elif tool_name == "analyze": + continue_params["analysis_type"] = "performance" + elif tool_name == "debug": + continue_params["issue_description"] = "How can we optimize the fibonacci function?" + elif tool_name == "codereview": + continue_params["context"] = "Focus on the Calculator class implementation" + + response2, _ = self._call_mcp_tool(tool_name, continue_params) + if response2: + self.logger.info(f" βœ… {tool_name} tool file deduplication working") + self.test_results["per_tool_tests"][tool_name] = True + else: + self.logger.warning(f" ⚠️ {tool_name} tool continuation failed") + self.test_results["per_tool_tests"][tool_name] = False + + self.logger.info(" βœ… Per-tool file deduplication tests completed") + return True + + except Exception as e: + self.logger.error(f"Per-tool file deduplication test failed: {e}") + return False + + def _test_cross_tool_continuation(self) -> bool: + """Test comprehensive cross-tool continuation scenarios""" + try: + self.logger.info("πŸ”§ Test 3: Cross-tool continuation scenarios") + + # Scenario 1: chat -> thinkdeep -> codereview + self.logger.info(" 3.1: Testing chat -> thinkdeep -> codereview") + + # Start with chat + chat_response, chat_id = self._call_mcp_tool( + "chat", + { + "prompt": "Look at this Python code and tell me what you think about it", + "files": [self.test_files["python"]] + } + ) + + if not chat_response or not chat_id: + self.logger.error("Failed to start chat conversation") + return False + + # Continue with thinkdeep + thinkdeep_response, _ = self._call_mcp_tool( + "thinkdeep", + { + "prompt": "Think deeply about potential performance issues in this code", + "files": [self.test_files["python"]], # Same file should be deduplicated + "continuation_id": chat_id + } + ) + + if not thinkdeep_response: + self.logger.error("Failed chat -> thinkdeep continuation") + return False + + # Continue with codereview + codereview_response, _ = self._call_mcp_tool( + "codereview", + { + "files": [self.test_files["python"]], # Same file should be deduplicated + "context": "Building on our previous analysis, provide a comprehensive code review", + "continuation_id": chat_id + } + ) + + if not codereview_response: + self.logger.error("Failed thinkdeep -> codereview continuation") + return False + + self.logger.info(" βœ… chat -> thinkdeep -> codereview working") + self.test_results["cross_tool_scenarios"]["chat_thinkdeep_codereview"] = True + + # Scenario 2: analyze -> debug -> thinkdeep + self.logger.info(" 3.2: Testing analyze -> debug -> thinkdeep") + + # Start with analyze + analyze_response, analyze_id = self._call_mcp_tool( + "analyze", + { + "files": [self.test_files["python"]], + "analysis_type": "code_quality" + } + ) + + if not analyze_response or not analyze_id: + self.logger.warning("Failed to start analyze conversation, skipping scenario 2") + else: + # Continue with debug + debug_response, _ = self._call_mcp_tool( + "debug", + { + "files": [self.test_files["python"]], # Same file should be deduplicated + "issue_description": "Based on our analysis, help debug the performance issue in fibonacci", + "continuation_id": analyze_id + } + ) + + if debug_response: + # Continue with thinkdeep + final_response, _ = self._call_mcp_tool( + "thinkdeep", + { + "prompt": "Think deeply about the architectural implications of the issues we've found", + "files": [self.test_files["python"]], # Same file should be deduplicated + "continuation_id": analyze_id + } + ) + + if final_response: + self.logger.info(" βœ… analyze -> debug -> thinkdeep working") + self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = True + else: + self.logger.warning(" ⚠️ debug -> thinkdeep continuation failed") + self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = False + else: + self.logger.warning(" ⚠️ analyze -> debug continuation failed") + self.test_results["cross_tool_scenarios"]["analyze_debug_thinkdeep"] = False + + # Scenario 3: Multi-file cross-tool continuation + self.logger.info(" 3.3: Testing multi-file cross-tool continuation") + + # Start with both files + multi_response, multi_id = self._call_mcp_tool( + "chat", + { + "prompt": "Analyze both the Python code and configuration file", + "files": [self.test_files["python"], self.test_files["config"]] + } + ) + + if not multi_response or not multi_id: + self.logger.warning("Failed to start multi-file conversation, skipping scenario 3") + else: + # Switch to codereview with same files (should use conversation history) + multi_review, _ = self._call_mcp_tool( + "codereview", + { + "files": [self.test_files["python"], self.test_files["config"]], # Same files + "context": "Review both files in the context of our previous discussion", + "continuation_id": multi_id + } + ) + + if multi_review: + self.logger.info(" βœ… Multi-file cross-tool continuation working") + self.test_results["cross_tool_scenarios"]["multi_file_continuation"] = True + else: + self.logger.warning(" ⚠️ Multi-file cross-tool continuation failed") + self.test_results["cross_tool_scenarios"]["multi_file_continuation"] = False + + self.logger.info(" βœ… Cross-tool continuation scenarios completed") + return True + + except Exception as e: + self.logger.error(f"Cross-tool continuation test failed: {e}") + return False + + def _test_state_isolation(self) -> bool: + """Test that different conversation threads don't contaminate each other""" + try: + self.logger.info("πŸ”’ Test 4: State isolation and contamination detection") + + # Create a test file specifically for this test + isolation_content = '''""" +Test file for state isolation testing +""" + +def isolated_function(): + """This function should only appear in isolation tests""" + return "ISOLATION_TEST_MARKER" + +class IsolationTestClass: + """Class that should not leak between conversations""" + def __init__(self): + self.marker = "ISOLATION_BOUNDARY" +''' + + isolation_file = os.path.join(self.test_dir, "isolation_test.py") + with open(isolation_file, 'w') as f: + f.write(isolation_content) + + # Test 1: Start two separate conversation threads + self.logger.info(" 4.1: Creating separate conversation threads") + + # Thread A: Chat about original Python file + response_a1, thread_a = self._call_mcp_tool( + "chat", + { + "prompt": "Analyze this Python module", + "files": [self.test_files["python"]] + } + ) + + if not response_a1 or not thread_a: + self.logger.error("Failed to create thread A") + return False + + # Thread B: Chat about isolation test file + response_b1, thread_b = self._call_mcp_tool( + "chat", + { + "prompt": "Analyze this isolation test file", + "files": [isolation_file] + } + ) + + if not response_b1 or not thread_b: + self.logger.error("Failed to create thread B") + return False + + # Verify threads are different + if thread_a == thread_b: + self.logger.error("Threads are not isolated - same continuation_id returned") + return False + + self.logger.info(f" βœ… Created isolated threads: {thread_a[:8]}... and {thread_b[:8]}...") + + # Test 2: Continue both threads and check for contamination + self.logger.info(" 4.2: Testing cross-thread contamination") + + # Continue thread A - should only know about original Python file + response_a2, _ = self._call_mcp_tool( + "chat", + { + "prompt": "What functions did we discuss in the previous file?", + "continuation_id": thread_a + } + ) + + # Continue thread B - should only know about isolation file + response_b2, _ = self._call_mcp_tool( + "chat", + { + "prompt": "What functions did we discuss in the previous file?", + "continuation_id": thread_b + } + ) + + if not response_a2 or not response_b2: + self.logger.error("Failed to continue isolated threads") + return False + + # Parse responses to check for contamination + response_a2_data = json.loads(response_a2) + response_b2_data = json.loads(response_b2) + + content_a = response_a2_data.get("content", "") + content_b = response_b2_data.get("content", "") + + # Thread A should mention fibonacci/factorial, not isolation functions + # Thread B should mention isolation functions, not fibonacci/factorial + contamination_detected = False + + if "isolated_function" in content_a or "IsolationTestClass" in content_a: + self.logger.error("Thread A contaminated with Thread B content") + contamination_detected = True + + if "fibonacci" in content_b or "factorial" in content_b or "Calculator" in content_b: + self.logger.error("Thread B contaminated with Thread A content") + contamination_detected = True + + if contamination_detected: + self.test_results["cross_tool_scenarios"]["state_isolation"] = False + return False + + self.logger.info(" βœ… No cross-thread contamination detected") + + # Test 3: Cross-tool switching with isolation + self.logger.info(" 4.3: Testing cross-tool state isolation") + + # Switch thread A to codereview + response_a3, _ = self._call_mcp_tool( + "codereview", + { + "files": [self.test_files["python"]], + "context": "Review the code we discussed", + "continuation_id": thread_a + } + ) + + # Switch thread B to codereview + response_b3, _ = self._call_mcp_tool( + "codereview", + { + "files": [isolation_file], + "context": "Review the isolation test code", + "continuation_id": thread_b + } + ) + + if response_a3 and response_b3: + self.logger.info(" βœ… Cross-tool isolation maintained") + self.test_results["cross_tool_scenarios"]["state_isolation"] = True + else: + self.logger.warning(" ⚠️ Cross-tool isolation test incomplete") + self.test_results["cross_tool_scenarios"]["state_isolation"] = False + + # Cleanup isolation test file + os.remove(isolation_file) + + self.logger.info(" βœ… State isolation tests completed") + return True + + except Exception as e: + self.logger.error(f"State isolation test failed: {e}") + return False + + def _test_conversation_boundaries(self) -> bool: + """Test conversation boundaries and proper reset behavior""" + try: + self.logger.info("🚧 Test 5: Conversation boundaries and reset behavior") + + # Test 1: Tool-to-tool-to-tool with fresh start + self.logger.info(" 5.1: Testing A->B->A pattern with fresh conversations") + + # Start with chat + response1, thread1 = self._call_mcp_tool( + "chat", + { + "prompt": "Analyze the fibonacci function in this code", + "files": [self.test_files["python"]] + } + ) + + if not response1 or not thread1: + self.logger.warning("Failed to start boundary test, skipping") + return True + + # Switch to codereview (continue conversation) + response2, _ = self._call_mcp_tool( + "codereview", + { + "files": [self.test_files["python"]], + "context": "Building on our fibonacci discussion", + "continuation_id": thread1 + } + ) + + if not response2: + self.logger.warning("Failed codereview continuation") + return True + + # Switch back to chat but start FRESH conversation (no continuation_id) + self.logger.info(" 5.2: Testing fresh conversation after previous context") + response3, thread3 = self._call_mcp_tool( + "chat", + { + "prompt": "Tell me about the Calculator class in this file", # Different focus + "files": [self.test_files["python"]] # Same file but fresh context + } + ) + + if not response3 or not thread3: + self.logger.warning("Failed fresh conversation test") + return True + + # Verify it's a truly fresh conversation + if thread1 == thread3: + self.logger.error("Fresh conversation got same thread ID - boundary violation!") + self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False + return False + + self.logger.info(f" βœ… Fresh conversation created: {thread3[:8]}... (vs {thread1[:8]}...)") + + # Test 2: Verify fresh conversation doesn't have stale context + self.logger.info(" 5.3: Testing stale context isolation") + + # Continue the fresh conversation - should not reference fibonacci discussion + response4, _ = self._call_mcp_tool( + "chat", + { + "prompt": "What did we just discuss about this code?", + "continuation_id": thread3 + } + ) + + if response4: + response4_data = json.loads(response4) + content4 = response4_data.get("content", "") + + # Should reference Calculator class, not fibonacci from previous thread + if "fibonacci" in content4.lower() and "calculator" not in content4.lower(): + self.logger.error("Fresh conversation contaminated with stale context!") + self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False + return False + else: + self.logger.info(" βœ… Fresh conversation properly isolated from previous context") + + # Test 3: File access without continuation should work + self.logger.info(" 5.4: Testing file access in fresh conversations") + + # New conversation with same files - should read files fresh + response5, thread5 = self._call_mcp_tool( + "chat", + { + "prompt": "What's the purpose of this configuration file?", + "files": [self.test_files["config"]] + } + ) + + if response5 and thread5: + # Verify it can access the file content + response5_data = json.loads(response5) + content5 = response5_data.get("content", "") + + if "database" in content5.lower() or "redis" in content5.lower(): + self.logger.info(" βœ… Fresh conversation can access files correctly") + self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = True + else: + self.logger.warning(" ⚠️ Fresh conversation may not be reading files properly") + self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False + else: + self.logger.warning(" ⚠️ Fresh conversation with config file failed") + self.test_results["cross_tool_scenarios"]["conversation_boundaries"] = False + + self.logger.info(" βœ… Conversation boundary tests completed") + return True + + except Exception as e: + self.logger.error(f"Conversation boundary test failed: {e}") + return False + + def _call_mcp_tool(self, tool_name: str, params: Dict) -> Tuple[Optional[str], Optional[str]]: + """Simulate calling an MCP tool via Claude CLI (docker exec)""" + try: + # Prepare the MCP initialization and tool call sequence + init_request = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} + }, + "clientInfo": { + "name": "communication-simulator", + "version": "1.0.0" + } + } + } + + # Send initialized notification + initialized_notification = { + "jsonrpc": "2.0", + "method": "notifications/initialized" + } + + # Prepare the tool call request + tool_request = { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": tool_name, + "arguments": params + } + } + + # Combine all messages + messages = [ + json.dumps(init_request), + json.dumps(initialized_notification), + json.dumps(tool_request) + ] + + # Join with newlines as MCP expects + input_data = "\n".join(messages) + "\n" + + # Simulate Claude CLI calling the MCP server via docker exec + docker_cmd = [ + "docker", "exec", "-i", self.container_name, + "python", "server.py" + ] + + self.logger.debug(f"Calling MCP tool {tool_name} with proper initialization") + + # Execute the command + result = subprocess.run( + docker_cmd, + input=input_data, + text=True, + capture_output=True, + timeout=120 # 2 minute timeout + ) + + if result.returncode != 0: + self.logger.error(f"Docker exec failed: {result.stderr}") + return None, None + + # Parse the response - look for the tool call response + response_data = self._parse_mcp_response(result.stdout, expected_id=2) + if not response_data: + return None, None + + # Extract continuation_id if present + continuation_id = self._extract_continuation_id(response_data) + + return response_data, continuation_id + + except subprocess.TimeoutExpired: + self.logger.error(f"MCP tool call timed out: {tool_name}") + return None, None + except Exception as e: + self.logger.error(f"MCP tool call failed: {e}") + return None, None + + def _parse_mcp_response(self, stdout: str, expected_id: int = 2) -> Optional[str]: + """Parse MCP JSON-RPC response from stdout""" + try: + lines = stdout.strip().split('\n') + for line in lines: + if line.strip() and line.startswith('{'): + response = json.loads(line) + # Look for the tool call response with the expected ID + if response.get("id") == expected_id and "result" in response: + # Extract the actual content from the response + result = response["result"] + # Handle new response format with 'content' array + if isinstance(result, dict) and "content" in result: + content_array = result["content"] + if isinstance(content_array, list) and len(content_array) > 0: + return content_array[0].get("text", "") + # Handle legacy format + elif isinstance(result, list) and len(result) > 0: + return result[0].get("text", "") + elif response.get("id") == expected_id and "error" in response: + self.logger.error(f"MCP error: {response['error']}") + return None + + # If we get here, log all responses for debugging + self.logger.warning(f"No valid tool call response found for ID {expected_id}") + self.logger.debug(f"Full stdout: {stdout}") + return None + + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse MCP response: {e}") + self.logger.debug(f"Stdout that failed to parse: {stdout}") + return None + + def _extract_continuation_id(self, response_text: str) -> Optional[str]: + """Extract continuation_id from response metadata""" + try: + # Parse the response text as JSON to look for continuation metadata + response_data = json.loads(response_text) + + # Look for continuation_id in various places + if isinstance(response_data, dict): + # Check metadata + metadata = response_data.get("metadata", {}) + if "thread_id" in metadata: + return metadata["thread_id"] + + # Check follow_up_request + follow_up = response_data.get("follow_up_request", {}) + if follow_up and "continuation_id" in follow_up: + return follow_up["continuation_id"] + + # Check continuation_offer + continuation_offer = response_data.get("continuation_offer", {}) + if continuation_offer and "continuation_id" in continuation_offer: + return continuation_offer["continuation_id"] + + self.logger.debug(f"No continuation_id found in response: {response_data}") + return None + + except json.JSONDecodeError as e: + self.logger.debug(f"Failed to parse response for continuation_id: {e}") + return None + + def validate_docker_logs(self) -> bool: + """Validate Docker logs to confirm file deduplication behavior""" + try: + self.logger.info("πŸ“‹ Validating Docker logs for file deduplication...") + + # Get server logs from both main container and activity logs + result = self._run_command( + ["docker", "logs", self.container_name], + capture_output=True + ) + + if result.returncode != 0: + self.logger.error(f"Failed to get Docker logs: {result.stderr}") + return False + + main_logs = result.stdout.decode() + result.stderr.decode() + + # Also get activity logs for more detailed conversation tracking + activity_result = self._run_command( + ["docker", "exec", self.container_name, "cat", "/tmp/mcp_activity.log"], + capture_output=True + ) + + activity_logs = "" + if activity_result.returncode == 0: + activity_logs = activity_result.stdout.decode() + + logs = main_logs + "\n" + activity_logs + + # Look for conversation threading patterns that indicate the system is working + conversation_patterns = [ + 'CONVERSATION_RESUME', + 'CONVERSATION_CONTEXT', + 'previous turns loaded', + 'tool embedding', + 'files included', + 'files truncated', + 'already in conversation history' + ] + + conversation_lines = [] + for line in logs.split('\n'): + for pattern in conversation_patterns: + if pattern.lower() in line.lower(): + conversation_lines.append(line.strip()) + break + + # Look for evidence of conversation threading and file handling + conversation_threading_found = False + multi_turn_conversations = False + + for line in conversation_lines: + lower_line = line.lower() + if 'conversation_resume' in lower_line: + conversation_threading_found = True + self.logger.debug(f"πŸ“„ Conversation threading: {line}") + elif 'previous turns loaded' in lower_line: + multi_turn_conversations = True + self.logger.debug(f"πŸ“„ Multi-turn conversation: {line}") + elif 'already in conversation' in lower_line: + self.logger.info(f"βœ… Found explicit deduplication: {line}") + return True + + # Conversation threading with multiple turns is evidence of file deduplication working + if conversation_threading_found and multi_turn_conversations: + self.logger.info("βœ… Conversation threading with multi-turn context working") + self.logger.info("βœ… File deduplication working implicitly (files embedded once in conversation history)") + self.test_results["logs_validation"] = True + return True + elif conversation_threading_found: + self.logger.info("βœ… Conversation threading detected") + return True + else: + self.logger.warning("⚠️ No clear evidence of conversation threading in logs") + self.logger.debug(f"Found {len(conversation_lines)} conversation-related log lines") + return False + + except Exception as e: + self.logger.error(f"Log validation failed: {e}") + return False + + def validate_conversation_memory(self) -> bool: + """Validate that conversation memory is working via Redis""" + try: + self.logger.info("πŸ’Ύ Validating conversation memory via Redis...") + + # Check Redis for stored conversations + result = self._run_command([ + "docker", "exec", self.redis_container, + "redis-cli", "KEYS", "thread:*" + ], capture_output=True) + + if result.returncode != 0: + self.logger.error("Failed to query Redis") + return False + + keys = result.stdout.decode().strip().split('\n') + thread_keys = [k for k in keys if k.startswith('thread:')] + + if thread_keys: + self.logger.info(f"βœ… Found {len(thread_keys)} conversation threads in Redis") + + # Get details of first thread + if thread_keys: + thread_key = thread_keys[0] + result = self._run_command([ + "docker", "exec", self.redis_container, + "redis-cli", "GET", thread_key + ], capture_output=True) + + if result.returncode == 0: + thread_data = result.stdout.decode() + try: + parsed = json.loads(thread_data) + turns = parsed.get("turns", []) + self.logger.info(f"βœ… Thread has {len(turns)} turns") + self.test_results["redis_validation"] = True + return True + except json.JSONDecodeError: + self.logger.warning("Could not parse thread data") + + self.test_results["redis_validation"] = True + return True + else: + self.logger.warning("⚠️ No conversation threads found in Redis") + return False + + except Exception as e: + self.logger.error(f"Conversation memory validation failed: {e}") + return False + + def cleanup(self): + """Cleanup test environment""" + try: + self.logger.info("🧹 Cleaning up test environment...") + + if not self.keep_logs: + # Stop Docker services + self._run_command(["docker", "compose", "down", "--remove-orphans"], + check=False, capture_output=True) + else: + self.logger.info("πŸ“‹ Keeping Docker services running for log inspection") + + # Remove temp directory + if self.temp_dir and os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + self.logger.debug(f"Removed temp directory: {self.temp_dir}") + + # Remove test files directory + if hasattr(self, 'test_dir') and self.test_dir and os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + self.logger.debug(f"Removed test files directory: {self.test_dir}") + + except Exception as e: + self.logger.error(f"Cleanup failed: {e}") + + def _run_command(self, cmd: List[str], check: bool = True, capture_output: bool = False, **kwargs): + """Run a shell command with logging""" + if self.verbose: + self.logger.debug(f"Running: {' '.join(cmd)}") + + return subprocess.run(cmd, check=check, capture_output=capture_output, **kwargs) + + def print_test_summary(self): + """Print comprehensive test results summary""" + print("\n" + "="*70) + print("πŸ§ͺ GEMINI MCP COMMUNICATION SIMULATOR - TEST RESULTS SUMMARY") + print("="*70) + + # Basic conversation flow + status = "βœ… PASS" if self.test_results["basic_conversation"] else "❌ FAIL" + print(f"πŸ“ Basic Conversation Flow: {status}") + + # Per-tool tests + print(f"\nπŸ“„ Per-Tool File Deduplication Tests:") + tools_tested = len(self.test_results["per_tool_tests"]) + tools_passed = sum(1 for passed in self.test_results["per_tool_tests"].values() if passed) + + if tools_tested > 0: + for tool, passed in self.test_results["per_tool_tests"].items(): + status = "βœ… PASS" if passed else "❌ FAIL" + print(f" β€’ {tool}: {status}") + print(f" β†’ Summary: {tools_passed}/{tools_tested} tools passed") + else: + print(" β†’ No tools tested") + + # Cross-tool scenarios + print(f"\nπŸ”§ Cross-Tool Continuation Scenarios:") + scenarios_tested = len(self.test_results["cross_tool_scenarios"]) + scenarios_passed = sum(1 for passed in self.test_results["cross_tool_scenarios"].values() if passed is True) + + if scenarios_tested > 0: + scenario_names = { + "chat_thinkdeep_codereview": "chat β†’ thinkdeep β†’ codereview", + "analyze_debug_thinkdeep": "analyze β†’ debug β†’ thinkdeep", + "multi_file_continuation": "Multi-file continuation", + "state_isolation": "State isolation (contamination detection)", + "conversation_boundaries": "Conversation boundaries & reset behavior" + } + + for scenario, passed in self.test_results["cross_tool_scenarios"].items(): + name = scenario_names.get(scenario, scenario) + if passed is True: + status = "βœ… PASS" + elif passed is False: + status = "❌ FAIL" + else: + status = "⏸️ SKIP" + print(f" β€’ {name}: {status}") + print(f" β†’ Summary: {scenarios_passed}/{scenarios_tested} scenarios passed") + else: + print(" β†’ No scenarios tested") + + # System validation + print(f"\nπŸ’Ύ System Validation:") + logs_status = "βœ… PASS" if self.test_results["logs_validation"] else "❌ FAIL" + redis_status = "βœ… PASS" if self.test_results["redis_validation"] else "❌ FAIL" + print(f" β€’ Docker logs (conversation threading): {logs_status}") + print(f" β€’ Redis memory (conversation persistence): {redis_status}") + + # Overall result + all_core_tests = [ + self.test_results["basic_conversation"], + self.test_results["logs_validation"], + self.test_results["redis_validation"] + ] + + tool_tests_ok = tools_tested == 0 or tools_passed > 0 + scenario_tests_ok = scenarios_tested == 0 or scenarios_passed > 0 + + overall_success = all(all_core_tests) and tool_tests_ok and scenario_tests_ok + + print(f"\n🎯 OVERALL RESULT: {'πŸŽ‰ SUCCESS' if overall_success else '❌ FAILURE'}") + + if overall_success: + print("βœ… MCP server conversation continuity and file deduplication working correctly!") + print("βœ… All core systems validated") + if tools_passed > 0: + print(f"βœ… {tools_passed} tools working with file deduplication") + if scenarios_passed > 0: + print(f"βœ… {scenarios_passed} cross-tool scenarios working") + else: + print("⚠️ Some tests failed - check individual results above") + + print("="*70) + return overall_success + + def run_full_test_suite(self) -> bool: + """Run the complete test suite""" + try: + self.logger.info("πŸš€ Starting Gemini MCP Communication Simulator Test Suite") + + # Setup + if not self.setup_test_environment(): + self.logger.error("❌ Environment setup failed") + return False + + # Main simulation + if not self.simulate_claude_cli_session(): + self.logger.error("❌ Claude CLI simulation failed") + return False + + # Validation + logs_valid = self.validate_docker_logs() + memory_valid = self.validate_conversation_memory() + + # Print comprehensive summary + overall_success = self.print_test_summary() + + return overall_success + + except Exception as e: + self.logger.error(f"Test suite failed: {e}") + return False + finally: + if not self.keep_logs: + self.cleanup() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description="Gemini MCP Communication Simulator Test") + parser.add_argument("--verbose", "-v", action="store_true", + help="Enable verbose logging") + parser.add_argument("--keep-logs", action="store_true", + help="Keep Docker services running for log inspection") + + args = parser.parse_args() + + simulator = CommunicationSimulator(verbose=args.verbose, keep_logs=args.keep_logs) + + try: + success = simulator.run_full_test_suite() + + if success: + print("\nπŸŽ‰ COMPREHENSIVE MCP COMMUNICATION TEST: PASSED") + sys.exit(0) + else: + print("\n❌ COMPREHENSIVE MCP COMMUNICATION TEST: FAILED") + print("⚠️ Check detailed results above") + sys.exit(1) + + except KeyboardInterrupt: + print("\nπŸ›‘ Test interrupted by user") + simulator.cleanup() + sys.exit(130) + except Exception as e: + print(f"\nπŸ’₯ Unexpected error: {e}") + simulator.cleanup() + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/config.py b/config.py index 4ab08a3..314742a 100644 --- a/config.py +++ b/config.py @@ -23,11 +23,13 @@ __author__ = "Fahad Gilani" # Primary maintainer # This should be a stable, high-performance model suitable for code analysis GEMINI_MODEL = "gemini-2.5-pro-preview-06-05" -# MAX_CONTEXT_TOKENS: Maximum number of tokens that can be included in a single request -# This limit includes both the prompt and expected response -# Gemini Pro models support up to 1M tokens, but practical usage should reserve -# space for the model's response (typically 50K-100K tokens reserved) -MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens for Gemini Pro +# Token allocation for Gemini Pro (1M total capacity) +# MAX_CONTEXT_TOKENS: Total model capacity +# MAX_CONTENT_TOKENS: Available for prompts, conversation history, and files +# RESPONSE_RESERVE_TOKENS: Reserved for model response generation +MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens total capacity for Gemini Pro +MAX_CONTENT_TOKENS = 800_000 # 800K tokens for content (prompts + files + history) +RESPONSE_RESERVE_TOKENS = 200_000 # 200K tokens reserved for response generation # Temperature defaults for different tool types # Temperature controls the randomness/creativity of model responses diff --git a/server.py b/server.py index 342b547..1ee6480 100644 --- a/server.py +++ b/server.py @@ -328,8 +328,8 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any if not success: logger.warning(f"Failed to add user turn to thread {continuation_id}") - # Build conversation history - conversation_history = build_conversation_history(context) + # Build conversation history and track token usage + conversation_history, conversation_tokens = build_conversation_history(context) # Add dynamic follow-up instructions based on turn count follow_up_instructions = get_follow_up_instructions(len(context.turns)) @@ -343,9 +343,14 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any else: enhanced_prompt = f"{original_prompt}\n\n{follow_up_instructions}" - # Update arguments with enhanced context + # Update arguments with enhanced context and remaining token budget enhanced_arguments = arguments.copy() enhanced_arguments["prompt"] = enhanced_prompt + + # Calculate remaining token budget for current request files/content + from config import MAX_CONTENT_TOKENS + remaining_tokens = MAX_CONTENT_TOKENS - conversation_tokens + enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative # Merge original context parameters (files, etc.) with new request if context.initial_context: diff --git a/tests/test_conversation_memory.py b/tests/test_conversation_memory.py index e2b93f7..935d99c 100644 --- a/tests/test_conversation_memory.py +++ b/tests/test_conversation_memory.py @@ -166,7 +166,7 @@ class TestConversationMemory: initial_context={}, ) - history = build_conversation_history(context) + history, tokens = build_conversation_history(context) # Test basic structure assert "CONVERSATION HISTORY" in history @@ -207,8 +207,9 @@ class TestConversationMemory: initial_context={}, ) - history = build_conversation_history(context) + history, tokens = build_conversation_history(context) assert history == "" + assert tokens == 0 class TestConversationFlow: @@ -373,7 +374,7 @@ class TestConversationFlow: initial_context={}, ) - history = build_conversation_history(context) + history, tokens = build_conversation_history(context) expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}" assert expected_turn_text in history @@ -595,7 +596,7 @@ class TestConversationFlow: initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]}, ) - history = build_conversation_history(final_context) + history, tokens = build_conversation_history(final_context) # Verify chronological order and speaker identification assert "--- Turn 1 (Gemini using analyze) ---" in history @@ -670,7 +671,7 @@ class TestConversationFlow: mock_client.get.return_value = context_with_followup.model_dump_json() # Build history to verify follow-up is preserved - history = build_conversation_history(context_with_followup) + history, tokens = build_conversation_history(context_with_followup) assert "Found potential issue in authentication" in history assert "[Gemini's Follow-up: Should I examine the authentication middleware?]" in history @@ -762,7 +763,7 @@ class TestConversationFlow: ) # Build conversation history (should handle token limits gracefully) - history = build_conversation_history(context) + history, tokens = build_conversation_history(context) # Verify the history was built successfully assert "=== CONVERSATION HISTORY ===" in history diff --git a/tests/test_cross_tool_continuation.py b/tests/test_cross_tool_continuation.py index ef3861a..d015556 100644 --- a/tests/test_cross_tool_continuation.py +++ b/tests/test_cross_tool_continuation.py @@ -247,7 +247,7 @@ class TestCrossToolContinuation: # Build conversation history from utils.conversation_memory import build_conversation_history - history = build_conversation_history(thread_context) + history, tokens = build_conversation_history(thread_context) # Verify tool names are included in the history assert "Turn 1 (Gemini using test_analysis)" in history diff --git a/tests/test_large_prompt_handling.py b/tests/test_large_prompt_handling.py index 48fbb2d..0b6c3ca 100644 --- a/tests/test_large_prompt_handling.py +++ b/tests/test_large_prompt_handling.py @@ -214,15 +214,15 @@ class TestLargePromptHandling: mock_model.generate_content.return_value = mock_response mock_create_model.return_value = mock_model - # Mock read_files to avoid file system access - with patch("tools.chat.read_files") as mock_read_files: - mock_read_files.return_value = "File content" + # Mock the centralized file preparation method to avoid file system access + with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files: + mock_prepare_files.return_value = "File content" await tool.execute({"prompt": "", "files": [temp_prompt_file, other_file]}) # Verify prompt.txt was removed from files list - mock_read_files.assert_called_once() - files_arg = mock_read_files.call_args[0][0] + mock_prepare_files.assert_called_once() + files_arg = mock_prepare_files.call_args[0][0] assert len(files_arg) == 1 assert files_arg[0] == other_file diff --git a/tests/test_precommit.py b/tests/test_precommit.py index c4adf4c..2fb7237 100644 --- a/tests/test_precommit.py +++ b/tests/test_precommit.py @@ -228,10 +228,8 @@ class TestPrecommitTool: @patch("tools.precommit.find_git_repositories") @patch("tools.precommit.get_git_status") @patch("tools.precommit.run_git_command") - @patch("tools.precommit.read_files") async def test_files_parameter_with_context( self, - mock_read_files, mock_run_git, mock_status, mock_find_repos, @@ -254,14 +252,15 @@ class TestPrecommitTool: (True, ""), # unstaged files list (empty) ] - # Mock read_files - mock_read_files.return_value = "=== FILE: config.py ===\nCONFIG_VALUE = 42\n=== END FILE ===" + # Mock the centralized file preparation method + with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files: + mock_prepare_files.return_value = "=== FILE: config.py ===\nCONFIG_VALUE = 42\n=== END FILE ===" - request = PrecommitRequest( - path="/absolute/repo/path", - files=["/absolute/repo/path/config.py"], - ) - result = await tool.prepare_prompt(request) + request = PrecommitRequest( + path="/absolute/repo/path", + files=["/absolute/repo/path/config.py"], + ) + result = await tool.prepare_prompt(request) # Verify context files are included assert "## Context Files Summary" in result @@ -316,9 +315,9 @@ class TestPrecommitTool: (True, ""), # unstaged files (empty) ] - # Mock read_files to return empty (file not found) - with patch("tools.precommit.read_files") as mock_read: - mock_read.return_value = "" + # Mock the centralized file preparation method to return empty (file not found) + with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files: + mock_prepare_files.return_value = "" result_with_files = await tool.prepare_prompt(request_with_files) assert "If you need additional context files" not in result_with_files diff --git a/tests/test_prompt_regression.py b/tests/test_prompt_regression.py index 857eae0..7788c53 100644 --- a/tests/test_prompt_regression.py +++ b/tests/test_prompt_regression.py @@ -67,16 +67,16 @@ class TestPromptRegression: mock_model.generate_content.return_value = mock_model_response() mock_create_model.return_value = mock_model - # Mock file reading - with patch("tools.chat.read_files") as mock_read_files: - mock_read_files.return_value = "File content here" + # Mock file reading through the centralized method + with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files: + mock_prepare_files.return_value = "File content here" result = await tool.execute({"prompt": "Analyze this code", "files": ["/path/to/file.py"]}) assert len(result) == 1 output = json.loads(result[0].text) assert output["status"] == "success" - mock_read_files.assert_called_once_with(["/path/to/file.py"]) + mock_prepare_files.assert_called_once_with(["/path/to/file.py"], None, "Context files") @pytest.mark.asyncio async def test_thinkdeep_normal_analysis(self, mock_model_response): diff --git a/tools/base.py b/tools/base.py index 29c78aa..f42b650 100644 --- a/tools/base.py +++ b/tools/base.py @@ -195,9 +195,10 @@ class BaseTool(ABC): """ Filter out files that are already embedded in conversation history. - This method takes a list of requested files and removes any that have - already been embedded in the conversation history, preventing duplicate - file embeddings and optimizing token usage. + This method prevents duplicate file embeddings by filtering out files that have + already been embedded in the conversation history. This optimizes token usage + while ensuring tools still have logical access to all requested files through + conversation history references. Args: requested_files: List of files requested for current tool execution @@ -210,15 +211,36 @@ class BaseTool(ABC): # New conversation, all files are new return requested_files - embedded_files = set(self.get_conversation_embedded_files(continuation_id)) - - # Return only files that haven't been embedded yet - new_files = [f for f in requested_files if f not in embedded_files] - - return new_files + try: + embedded_files = set(self.get_conversation_embedded_files(continuation_id)) + + # Safety check: If no files are marked as embedded but we have a continuation_id, + # this might indicate an issue with conversation history. Be conservative. + if not embedded_files: + logger.debug(f"πŸ“ {self.name} tool: No files found in conversation history for thread {continuation_id}") + return requested_files + + # Return only files that haven't been embedded yet + new_files = [f for f in requested_files if f not in embedded_files] + + # Log filtering results for debugging + if len(new_files) < len(requested_files): + skipped = [f for f in requested_files if f in embedded_files] + logger.debug(f"πŸ“ {self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}") + + return new_files + + except Exception as e: + # If there's any issue with conversation history lookup, be conservative + # and include all files rather than risk losing access to needed files + logger.warning(f"πŸ“ {self.name} tool: Error checking conversation history for {continuation_id}: {e}") + logger.warning(f"πŸ“ {self.name} tool: Including all requested files as fallback") + return requested_files def _prepare_file_content_for_prompt( - self, request_files: list[str], continuation_id: Optional[str], context_description: str = "New files" + self, request_files: list[str], continuation_id: Optional[str], context_description: str = "New files", + max_tokens: Optional[int] = None, reserve_tokens: int = 1_000, remaining_budget: Optional[int] = None, + arguments: Optional[dict] = None ) -> str: """ Centralized file processing for tool prompts. @@ -232,6 +254,10 @@ class BaseTool(ABC): request_files: List of files requested for current tool execution continuation_id: Thread continuation ID, or None for new conversations context_description: Description for token limit validation (e.g. "Code", "New files") + max_tokens: Maximum tokens to use (defaults to remaining budget or MAX_CONTENT_TOKENS) + reserve_tokens: Tokens to reserve for additional prompt content (default 1K) + remaining_budget: Remaining token budget after conversation history (from server.py) + arguments: Original tool arguments (used to extract _remaining_tokens if available) Returns: str: Formatted file content string ready for prompt inclusion @@ -239,6 +265,24 @@ class BaseTool(ABC): if not request_files: return "" + # Extract remaining budget from arguments if available + if remaining_budget is None: + # Use provided arguments or fall back to stored arguments from execute() + args_to_use = arguments or getattr(self, '_current_arguments', {}) + remaining_budget = args_to_use.get("_remaining_tokens") + + # Use remaining budget if provided, otherwise fall back to max_tokens or default + if remaining_budget is not None: + effective_max_tokens = remaining_budget - reserve_tokens + elif max_tokens is not None: + effective_max_tokens = max_tokens - reserve_tokens + else: + from config import MAX_CONTENT_TOKENS + effective_max_tokens = MAX_CONTENT_TOKENS - reserve_tokens + + # Ensure we have a reasonable minimum budget + effective_max_tokens = max(1000, effective_max_tokens) + files_to_embed = self.filter_new_files(request_files, continuation_id) content_parts = [] @@ -247,7 +291,7 @@ class BaseTool(ABC): if files_to_embed: logger.debug(f"πŸ“ {self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}") try: - file_content = read_files(files_to_embed) + file_content = read_files(files_to_embed, max_tokens=effective_max_tokens + reserve_tokens, reserve_tokens=reserve_tokens) self._validate_token_limit(file_content, context_description) content_parts.append(file_content) @@ -488,6 +532,9 @@ If any of these would strengthen your analysis, specify what Claude should searc List[TextContent]: Formatted response as MCP TextContent objects """ try: + # Store arguments for access by helper methods (like _prepare_file_content_for_prompt) + self._current_arguments = arguments + # Set up logger for this tool execution logger = logging.getLogger(f"tools.{self.name}") logger.info(f"Starting {self.name} tool execution with arguments: {list(arguments.keys())}") diff --git a/tools/chat.py b/tools/chat.py index 722bc38..1fc096e 100644 --- a/tools/chat.py +++ b/tools/chat.py @@ -116,10 +116,15 @@ class ChatTool(BaseTool): if updated_files is not None: request.files = updated_files - # Add context files if provided + # Add context files if provided (using centralized file handling with filtering) if request.files: - file_content = read_files(request.files) - user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ====" + file_content = self._prepare_file_content_for_prompt( + request.files, + request.continuation_id, + "Context files" + ) + if file_content: + user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ====" # Check token limits self._validate_token_limit(user_content, "Content") diff --git a/tools/precommit.py b/tools/precommit.py index 3d1eb56..050b6d3 100644 --- a/tools/precommit.py +++ b/tools/precommit.py @@ -298,11 +298,13 @@ class Precommit(BaseTool): if translated_files: remaining_tokens = max_tokens - total_tokens - # Use standardized file reading with token budget - file_content = read_files( - translated_files, - max_tokens=remaining_tokens, - reserve_tokens=1000, # Small reserve for formatting + # Use centralized file handling with filtering for duplicate prevention + file_content = self._prepare_file_content_for_prompt( + translated_files, + request.continuation_id, + "Context files", + max_tokens=remaining_tokens + 1000, # Add back the reserve that was calculated + reserve_tokens=1000 # Small reserve for formatting ) if file_content: diff --git a/utils/conversation_memory.py b/utils/conversation_memory.py index c21f40b..f72f5b6 100644 --- a/utils/conversation_memory.py +++ b/utils/conversation_memory.py @@ -312,7 +312,7 @@ def get_conversation_file_list(context: ThreadContext) -> list[str]: return unique_files -def build_conversation_history(context: ThreadContext, read_files_func=None) -> str: +def build_conversation_history(context: ThreadContext, read_files_func=None) -> tuple[str, int]: """ Build formatted conversation history for tool prompts with embedded file contents. @@ -325,8 +325,8 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> context: ThreadContext containing the complete conversation Returns: - str: Formatted conversation history with embedded files ready for inclusion in prompts - Empty string if no conversation turns exist + tuple[str, int]: (formatted_conversation_history, total_tokens_used) + Returns ("", 0) if no conversation turns exist Format: - Header with thread metadata and turn count @@ -341,7 +341,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> while preventing duplicate file embeddings. """ if not context.turns: - return "" + return "", 0 # Get all unique files referenced in this conversation all_files = get_conversation_file_list(context) @@ -366,7 +366,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> ) # Import required functions - from config import MAX_CONTEXT_TOKENS + from config import MAX_CONTENT_TOKENS if read_files_func is None: from utils.file_utils import read_file_content @@ -384,7 +384,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> if formatted_content: # read_file_content already returns formatted content, use it directly # Check if adding this file would exceed the limit - if total_tokens + content_tokens <= MAX_CONTEXT_TOKENS: + if total_tokens + content_tokens <= MAX_CONTENT_TOKENS: file_contents.append(formatted_content) total_tokens += content_tokens files_included += 1 @@ -394,7 +394,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> else: files_truncated += 1 logger.debug( - f"πŸ“„ File truncated due to token limit: {file_path} ({content_tokens:,} tokens, would exceed {MAX_CONTEXT_TOKENS:,} limit)" + f"πŸ“„ File truncated due to token limit: {file_path} ({content_tokens:,} tokens, would exceed {MAX_CONTENT_TOKENS:,} limit)" ) # Stop processing more files break @@ -434,7 +434,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> history_parts.append(files_content) else: # Handle token limit exceeded for conversation files - error_message = f"ERROR: The total size of files referenced in this conversation has exceeded the context limit and cannot be displayed.\nEstimated tokens: {estimated_tokens}, but limit is {MAX_CONTEXT_TOKENS}." + error_message = f"ERROR: The total size of files referenced in this conversation has exceeded the context limit and cannot be displayed.\nEstimated tokens: {estimated_tokens}, but limit is {MAX_CONTENT_TOKENS}." history_parts.append(error_message) else: history_parts.append("(No accessible files found)") @@ -476,7 +476,12 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) -> ["", "=== END CONVERSATION HISTORY ===", "", "Continue this conversation by building on the previous context."] ) - return "\n".join(history_parts) + # Calculate total tokens for the complete conversation history + complete_history = "\n".join(history_parts) + from utils.token_utils import estimate_tokens + total_conversation_tokens = estimate_tokens(complete_history) + + return complete_history, total_conversation_tokens def _is_valid_uuid(val: str) -> bool: