Extra logging and more tests

This commit is contained in:
Fahad
2025-06-11 18:26:13 +04:00
parent 3aef6e961b
commit 4974fbc725
10 changed files with 400 additions and 112 deletions

View File

@@ -366,7 +366,10 @@ def parse_arguments():
parser.add_argument("--list-tests", action="store_true", help="List available tests and exit")
parser.add_argument("--individual", "-i", help="Run a single test individually")
parser.add_argument(
"--skip-docker", action="store_true", help="Skip Docker setup (assumes containers are already running)"
"--skip-docker", action="store_true", default=True, help="Skip Docker setup (assumes containers are already running) - DEFAULT"
)
parser.add_argument(
"--rebuild-docker", action="store_true", help="Force rebuild Docker environment (overrides --skip-docker)"
)
return parser.parse_args()
@@ -442,10 +445,13 @@ def main():
simulator = CommunicationSimulator(verbose=args.verbose, keep_logs=args.keep_logs, selected_tests=args.tests)
# Determine execution mode and run
# Override skip_docker if rebuild_docker is specified
skip_docker = args.skip_docker and not args.rebuild_docker
if args.individual:
exit_code = run_individual_test(simulator, args.individual, args.skip_docker)
exit_code = run_individual_test(simulator, args.individual, skip_docker)
else:
exit_code = run_test_suite(simulator, args.skip_docker)
exit_code = run_test_suite(simulator, skip_docker)
sys.exit(exit_code)

View File

@@ -7,7 +7,7 @@ services:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --save 60 1 --loglevel warning --maxmemory 512mb --maxmemory-policy allkeys-lru
command: redis-server --save 60 1 --loglevel warning --maxmemory 64mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
@@ -39,6 +39,8 @@ services:
volumes:
- ${HOME:-/tmp}:/workspace:ro
- mcp_logs:/tmp # Shared volume for logs
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
stdin_open: true
tty: true
entrypoint: ["python"]
@@ -55,6 +57,8 @@ services:
- PYTHONUNBUFFERED=1
volumes:
- mcp_logs:/tmp # Shared volume for logs
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
entrypoint: ["python"]
command: ["log_monitor.py"]

View File

@@ -51,6 +51,19 @@ from tools.models import ToolOutput
# Can be controlled via LOG_LEVEL environment variable (DEBUG, INFO, WARNING, ERROR)
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# Create timezone-aware formatter
import time
class LocalTimeFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
"""Override to use local timezone instead of UTC"""
ct = self.converter(record.created)
if datefmt:
s = time.strftime(datefmt, ct)
else:
t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
s = "%s,%03d" % (t, record.msecs)
return s
# Configure both console and file logging
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(
@@ -60,18 +73,22 @@ logging.basicConfig(
stream=sys.stderr, # Use stderr to avoid interfering with MCP stdin/stdout protocol
)
# Apply local time formatter to root logger
for handler in logging.getLogger().handlers:
handler.setFormatter(LocalTimeFormatter(log_format))
# Add file handler for Docker log monitoring
try:
file_handler = logging.FileHandler("/tmp/mcp_server.log")
file_handler.setLevel(getattr(logging, log_level, logging.INFO))
file_handler.setFormatter(logging.Formatter(log_format))
file_handler.setFormatter(LocalTimeFormatter(log_format))
logging.getLogger().addHandler(file_handler)
# Create a special logger for MCP activity tracking
mcp_logger = logging.getLogger("mcp_activity")
mcp_file_handler = logging.FileHandler("/tmp/mcp_activity.log")
mcp_file_handler.setLevel(logging.INFO)
mcp_file_handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
mcp_file_handler.setFormatter(LocalTimeFormatter("%(asctime)s - %(message)s"))
mcp_logger.addHandler(mcp_file_handler)
mcp_logger.setLevel(logging.INFO)
@@ -196,6 +213,8 @@ async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextCon
if "continuation_id" in arguments and arguments["continuation_id"]:
continuation_id = arguments["continuation_id"]
logger.debug(f"Resuming conversation thread: {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Tool '{name}' resuming thread {continuation_id} with {len(arguments)} arguments")
logger.debug(f"[CONVERSATION_DEBUG] Original arguments keys: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
@@ -205,6 +224,9 @@ async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextCon
pass
arguments = await reconstruct_thread_context(arguments)
logger.debug(f"[CONVERSATION_DEBUG] After thread reconstruction, arguments keys: {list(arguments.keys())}")
if '_remaining_tokens' in arguments:
logger.debug(f"[CONVERSATION_DEBUG] Remaining token budget: {arguments['_remaining_tokens']:,}")
# Route to AI-powered tools that require Gemini API calls
if name in TOOLS:
@@ -300,9 +322,11 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
continuation_id = arguments["continuation_id"]
# Get thread context from Redis
logger.debug(f"[CONVERSATION_DEBUG] Looking up thread {continuation_id} in Redis")
context = get_thread(continuation_id)
if not context:
logger.warning(f"Thread not found: {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread {continuation_id} not found in Redis or expired")
# Log to activity file for monitoring
try:
@@ -324,15 +348,26 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
if user_prompt:
# Capture files referenced in this turn
user_files = arguments.get("files", [])
logger.debug(f"[CONVERSATION_DEBUG] Adding user turn to thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] User prompt length: {len(user_prompt)} chars")
logger.debug(f"[CONVERSATION_DEBUG] User files: {user_files}")
success = add_turn(continuation_id, "user", user_prompt, files=user_files)
if not success:
logger.warning(f"Failed to add user turn to thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Failed to add user turn - thread may be at turn limit or expired")
else:
logger.debug(f"[CONVERSATION_DEBUG] Successfully added user turn to thread {continuation_id}")
# Build conversation history and track token usage
logger.debug(f"[CONVERSATION_DEBUG] Building conversation history for thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread has {len(context.turns)} turns, tool: {context.tool_name}")
conversation_history, conversation_tokens = build_conversation_history(context)
logger.debug(f"[CONVERSATION_DEBUG] Conversation history built: {conversation_tokens:,} tokens")
logger.debug(f"[CONVERSATION_DEBUG] Conversation history length: {len(conversation_history)} chars")
# Add dynamic follow-up instructions based on turn count
follow_up_instructions = get_follow_up_instructions(len(context.turns))
logger.debug(f"[CONVERSATION_DEBUG] Follow-up instructions added for turn {len(context.turns)}")
# Merge original context with new prompt and follow-up instructions
original_prompt = arguments.get("prompt", "")
@@ -352,14 +387,25 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
remaining_tokens = MAX_CONTENT_TOKENS - conversation_tokens
enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative
logger.debug(f"[CONVERSATION_DEBUG] Token budget calculation:")
logger.debug(f"[CONVERSATION_DEBUG] MAX_CONTENT_TOKENS: {MAX_CONTENT_TOKENS:,}")
logger.debug(f"[CONVERSATION_DEBUG] Conversation tokens: {conversation_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Remaining tokens: {remaining_tokens:,}")
# Merge original context parameters (files, etc.) with new request
if context.initial_context:
logger.debug(f"[CONVERSATION_DEBUG] Merging initial context with {len(context.initial_context)} parameters")
for key, value in context.initial_context.items():
if key not in enhanced_arguments and key not in ["temperature", "thinking_mode", "model"]:
enhanced_arguments[key] = value
logger.debug(f"[CONVERSATION_DEBUG] Merged initial context param: {key}")
logger.info(f"Reconstructed context for thread {continuation_id} (turn {len(context.turns)})")
logger.debug(f"[CONVERSATION_DEBUG] Final enhanced arguments keys: {list(enhanced_arguments.keys())}")
# Debug log files in the enhanced arguments for file tracking
if 'files' in enhanced_arguments:
logger.debug(f"[CONVERSATION_DEBUG] Final files in enhanced arguments: {enhanced_arguments['files']}")
# Log to activity file for monitoring
try:

View File

@@ -95,8 +95,12 @@ class Calculator:
with open(test_config, "w") as f:
f.write(config_content)
self.test_files = {"python": test_py, "config": test_config}
self.logger.debug(f"Created test files: {list(self.test_files.values())}")
# Ensure absolute paths for MCP server compatibility
self.test_files = {
"python": os.path.abspath(test_py),
"config": os.path.abspath(test_config)
}
self.logger.debug(f"Created test files with absolute paths: {list(self.test_files.values())}")
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool via Claude CLI (docker exec)"""
@@ -137,7 +141,7 @@ class Calculator:
# Execute the command
result = subprocess.run(
docker_cmd, input=input_data, text=True, capture_output=True, timeout=300 # 5 minute timeout
docker_cmd, input=input_data, text=True, capture_output=True, timeout=3600 # 1 hour timeout
)
if result.returncode != 0:
@@ -155,7 +159,7 @@ class Calculator:
return response_data, continuation_id
except subprocess.TimeoutExpired:
self.logger.error(f"MCP tool call timed out: {tool_name}")
self.logger.error(f"MCP tool call timed out after 1 hour: {tool_name}")
return None, None
except Exception as e:
self.logger.error(f"MCP tool call failed: {e}")
@@ -231,6 +235,17 @@ class Calculator:
return subprocess.run(cmd, check=check, capture_output=capture_output, **kwargs)
def create_additional_test_file(self, filename: str, content: str) -> str:
"""Create an additional test file for mixed scenario testing"""
if not hasattr(self, 'test_dir') or not self.test_dir:
raise RuntimeError("Test directory not initialized. Call setup_test_files() first.")
file_path = os.path.join(self.test_dir, filename)
with open(file_path, "w") as f:
f.write(content)
# Return absolute path for MCP server compatibility
return os.path.abspath(file_path)
def cleanup_test_files(self):
"""Clean up test files"""
if hasattr(self, "test_dir") and self.test_dir and os.path.exists(self.test_dir):

View File

@@ -54,6 +54,9 @@ DATABASE_CONFIG = {
with open(validation_file, "w") as f:
f.write(validation_content)
# Ensure absolute path for MCP server compatibility
validation_file = os.path.abspath(validation_file)
# Test 1: Precommit tool with files parameter (where the bug occurred)
self.logger.info(" 1: Testing precommit tool content duplication")
@@ -110,13 +113,13 @@ DATABASE_CONFIG = {
tools_to_test = [
(
"chat",
{"prompt": "Please use low thinking mode. Analyze this config file", "files": [validation_file]},
{"prompt": "Please use low thinking mode. Analyze this config file", "files": [validation_file]}, # Using absolute path
),
(
"codereview",
{"files": [validation_file], "context": "Please use low thinking mode. Review this configuration"},
{"files": [validation_file], "context": "Please use low thinking mode. Review this configuration"}, # Using absolute path
),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"}),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"}), # Using absolute path
]
for tool_name, params in tools_to_test:

View File

@@ -25,7 +25,7 @@ class LogsValidationTest(BaseSimulatorTest):
try:
self.logger.info("📋 Test: Validating Docker logs for file deduplication...")
# Get server logs from both main container and activity logs
# Get server logs from main container
result = self.run_command(["docker", "logs", self.container_name], capture_output=True)
if result.returncode != 0:
@@ -34,6 +34,12 @@ class LogsValidationTest(BaseSimulatorTest):
main_logs = result.stdout.decode() + result.stderr.decode()
# Get logs from log monitor container (where detailed activity is logged)
monitor_result = self.run_command(["docker", "logs", "gemini-mcp-log-monitor"], capture_output=True)
monitor_logs = ""
if monitor_result.returncode == 0:
monitor_logs = monitor_result.stdout.decode() + monitor_result.stderr.decode()
# Also get activity logs for more detailed conversation tracking
activity_result = self.run_command(
["docker", "exec", self.container_name, "cat", "/tmp/mcp_activity.log"], capture_output=True
@@ -43,7 +49,7 @@ class LogsValidationTest(BaseSimulatorTest):
if activity_result.returncode == 0:
activity_logs = activity_result.stdout.decode()
logs = main_logs + "\n" + activity_logs
logs = main_logs + "\n" + monitor_logs + "\n" + activity_logs
# Look for conversation threading patterns that indicate the system is working
conversation_patterns = [

View File

@@ -4,8 +4,17 @@ Per-Tool File Deduplication Test
Tests file deduplication for each individual MCP tool to ensure
that files are properly deduplicated within single-tool conversations.
Validates that:
1. Files are embedded only once in conversation history
2. Continuation calls don't re-read existing files
3. New files are still properly embedded
4. Docker logs show deduplication behavior
"""
import json
import os
import subprocess
import tempfile
from .base_test import BaseSimulatorTest
@@ -20,96 +29,195 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
def test_description(self) -> str:
return "File deduplication for individual tools"
def run_test(self) -> bool:
"""Test file deduplication for each individual tool"""
def get_docker_logs_since(self, since_time: str) -> str:
"""Get docker logs since a specific timestamp"""
try:
self.logger.info("📄 Test: Per-tool file deduplication")
# Check both main server and log monitor for comprehensive logs
cmd_server = ["docker", "logs", "--since", since_time, self.container_name]
cmd_monitor = ["docker", "logs", "--since", since_time, "gemini-mcp-log-monitor"]
result_server = subprocess.run(cmd_server, capture_output=True, text=True)
result_monitor = subprocess.run(cmd_monitor, capture_output=True, text=True)
# Combine logs from both containers
combined_logs = result_server.stdout + "\n" + result_monitor.stdout
return combined_logs
except Exception as e:
self.logger.error(f"Failed to get docker logs: {e}")
return ""
# create_additional_test_file method now inherited from base class
def validate_file_deduplication_in_logs(self, logs: str, tool_name: str, test_file: str) -> bool:
"""Validate that logs show file deduplication behavior"""
# Look for file embedding messages
embedding_messages = [line for line in logs.split('\n') if '📁' in line and 'embedding' in line and tool_name in line]
# Look for deduplication/filtering messages
filtering_messages = [line for line in logs.split('\n') if '📁' in line and 'Filtering' in line and tool_name in line]
skipping_messages = [line for line in logs.split('\n') if '📁' in line and 'skipping' in line and tool_name in line]
deduplication_found = len(filtering_messages) > 0 or len(skipping_messages) > 0
if deduplication_found:
self.logger.info(f"{tool_name}: Found deduplication evidence in logs")
for msg in filtering_messages + skipping_messages:
self.logger.debug(f" 📁 {msg.strip()}")
else:
self.logger.warning(f" ⚠️ {tool_name}: No deduplication evidence found in logs")
self.logger.debug(f" 📁 All embedding messages: {embedding_messages}")
return deduplication_found
def run_test(self) -> bool:
"""Test file deduplication with realistic precommit/codereview workflow"""
try:
self.logger.info("📄 Test: Simplified file deduplication with precommit/codereview workflow")
# Setup test files
self.setup_test_files()
tools_to_test = [
(
"thinkdeep",
{
"current_analysis": "Please use low thinking mode. I'm analyzing this Python code to identify potential architectural improvements",
"files": [self.test_files["python"]],
},
),
(
"analyze",
{
"files": [self.test_files["python"]],
"question": "Please use low thinking mode. What are the architectural patterns in this code?",
},
),
(
"debug",
{
"files": [self.test_files["python"]],
"error_description": "Please use low thinking mode. The fibonacci function seems slow for large numbers",
},
),
(
"codereview",
{
"files": [self.test_files["python"]],
"context": "General code review for quality and best practices",
},
),
]
# Create a dummy file for precommit testing
dummy_content = '''def hello_world():
"""A simple hello world function with a bug"""
print("Hello world!")
return "hello"
successful_tests = 0
total_tests = len(tools_to_test)
# TODO: Fix the inconsistent return type
def calculate_sum(a, b):
return a + b # Missing type hints
'''
dummy_file_path = self.create_additional_test_file("dummy_code.py", dummy_content)
for tool_name, initial_params in tools_to_test:
self.logger.info(f" {tool_name}: Testing {tool_name} tool file deduplication")
# Get timestamp for log filtering
import datetime
start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Initial call
response1, continuation_id = self.call_mcp_tool(tool_name, initial_params)
# Step 1: precommit tool with dummy file (low thinking mode)
self.logger.info(" Step 1: precommit tool with dummy file")
precommit_params = {
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path],
"original_request": "Please use low thinking mode. Review this code for commit readiness",
"thinking_mode": "low"
}
response1, continuation_id = self.call_mcp_tool("precommit", precommit_params)
if not response1:
self.logger.warning(f" ⚠️ {tool_name} tool initial call failed, skipping")
continue
self.logger.error(" ❌ Step 1: precommit tool failed")
return False
if not continuation_id:
self.logger.warning(f" ⚠️ {tool_name} tool didn't provide continuation_id, skipping")
continue
self.logger.error(" ❌ Step 1: precommit tool didn't provide continuation_id")
return False
# Continue with same file - should be deduplicated
continue_params = initial_params.copy()
continue_params["continuation_id"] = continuation_id
self.logger.info(f" ✅ Step 1: precommit completed with continuation_id: {continuation_id[:8]}...")
if tool_name == "thinkdeep":
continue_params["current_analysis"] = (
"Please use low thinking mode. Now focus specifically on the recursive fibonacci implementation"
)
elif tool_name == "analyze":
continue_params["question"] = (
"Please use low thinking mode. What are the performance characteristics of this code?"
)
elif tool_name == "debug":
continue_params["error_description"] = (
"Please use low thinking mode. How can we optimize the fibonacci function?"
)
elif tool_name == "codereview":
continue_params["context"] = "Focus on the Calculator class implementation"
# Step 2: codereview tool with same file (NO continuation - fresh conversation)
self.logger.info(" Step 2: codereview tool with same file (fresh conversation)")
codereview_params = {
"files": [dummy_file_path],
"context": "Please use low thinking mode. General code review for quality and best practices"
}
response2, _ = self.call_mcp_tool(tool_name, continue_params)
if response2:
self.logger.info(f" {tool_name} tool file deduplication working")
successful_tests += 1
response2, _ = self.call_mcp_tool("codereview", codereview_params)
if not response2:
self.logger.error(" ❌ Step 2: codereview tool failed")
return False
self.logger.info(" ✅ Step 2: codereview completed (fresh conversation)")
# Step 3: Create new file and continue with precommit
self.logger.info(" Step 3: precommit continuation with old + new file")
new_file_content = '''def new_feature():
"""A new feature function"""
return {"status": "implemented", "version": "1.0"}
class NewUtility:
"""A new utility class"""
def __init__(self):
self.initialized = True
def process_data(self, data):
return f"Processed: {data}"
'''
new_file_path = self.create_additional_test_file("new_feature.py", new_file_content)
# Continue precommit with both files
continue_params = {
"continuation_id": continuation_id,
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path, new_file_path], # Old + new file
"original_request": "Please use low thinking mode. Now also review the new feature file along with the previous one",
"thinking_mode": "low"
}
response3, _ = self.call_mcp_tool("precommit", continue_params)
if not response3:
self.logger.error(" ❌ Step 3: precommit continuation failed")
return False
self.logger.info(" ✅ Step 3: precommit continuation completed")
# Validate results in docker logs
self.logger.info(" 📋 Validating conversation history and file deduplication...")
logs = self.get_docker_logs_since(start_time)
# Check for conversation history building
conversation_logs = [line for line in logs.split('\n') if 'conversation' in line.lower() or 'history' in line.lower()]
# Check for file embedding/deduplication
embedding_logs = [line for line in logs.split('\n') if '📁' in line or 'embedding' in line.lower() or 'file' in line.lower()]
# Check for continuation evidence
continuation_logs = [line for line in logs.split('\n') if 'continuation' in line.lower() or continuation_id[:8] in line]
# Check for both files mentioned
dummy_file_mentioned = any("dummy_code.py" in line for line in logs.split('\n'))
new_file_mentioned = any("new_feature.py" in line for line in logs.split('\n'))
# Print diagnostic information
self.logger.info(f" 📊 Conversation logs found: {len(conversation_logs)}")
self.logger.info(f" 📊 File embedding logs found: {len(embedding_logs)}")
self.logger.info(f" 📊 Continuation logs found: {len(continuation_logs)}")
self.logger.info(f" 📊 Dummy file mentioned: {dummy_file_mentioned}")
self.logger.info(f" 📊 New file mentioned: {new_file_mentioned}")
if self.verbose:
self.logger.debug(" 📋 Sample embedding logs:")
for log in embedding_logs[:5]: # Show first 5
if log.strip():
self.logger.debug(f" {log.strip()}")
self.logger.debug(" 📋 Sample continuation logs:")
for log in continuation_logs[:3]: # Show first 3
if log.strip():
self.logger.debug(f" {log.strip()}")
# Determine success criteria
success_criteria = [
len(embedding_logs) > 0, # File embedding occurred
len(continuation_logs) > 0, # Continuation worked
dummy_file_mentioned, # Original file processed
new_file_mentioned # New file processed
]
passed_criteria = sum(success_criteria)
total_criteria = len(success_criteria)
self.logger.info(f" 📊 Success criteria met: {passed_criteria}/{total_criteria}")
if passed_criteria >= 3: # At least 3 out of 4 criteria
self.logger.info(" ✅ File deduplication workflow test: PASSED")
return True
else:
self.logger.warning(f" ⚠️ {tool_name} tool continuation failed")
self.logger.info(
f" ✅ Per-tool file deduplication tests completed: {successful_tests}/{total_tests} tools passed"
)
# Consider test successful if at least one tool worked
return successful_tests > 0
self.logger.warning(" ⚠️ File deduplication workflow test: FAILED")
self.logger.warning(" 💡 Check docker logs for detailed file embedding and continuation activity")
return False
except Exception as e:
self.logger.error(f"Per-tool file deduplication test failed: {e}")
self.logger.error(f"File deduplication workflow test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -189,7 +189,9 @@ class BaseTool(ABC):
# Thread not found, no files embedded
return []
return get_conversation_file_list(thread_context)
embedded_files = get_conversation_file_list(thread_context)
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files")
return embedded_files
def filter_new_files(self, requested_files: list[str], continuation_id: Optional[str]) -> list[str]:
"""
@@ -207,12 +209,16 @@ class BaseTool(ABC):
Returns:
list[str]: List of files that need to be embedded (not already in history)
"""
logger.debug(f"[FILES] {self.name}: Filtering {len(requested_files)} requested files")
if not continuation_id:
# New conversation, all files are new
logger.debug(f"[FILES] {self.name}: New conversation, all {len(requested_files)} files are new")
return requested_files
try:
embedded_files = set(self.get_conversation_embedded_files(continuation_id))
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files in conversation")
# Safety check: If no files are marked as embedded but we have a continuation_id,
# this might indicate an issue with conversation history. Be conservative.
@@ -220,10 +226,13 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool: No files found in conversation history for thread {continuation_id}"
)
logger.debug(f"[FILES] {self.name}: No embedded files found, returning all {len(requested_files)} requested files")
return requested_files
# Return only files that haven't been embedded yet
new_files = [f for f in requested_files if f not in embedded_files]
logger.debug(f"[FILES] {self.name}: After filtering: {len(new_files)} new files, {len(requested_files) - len(new_files)} already embedded")
logger.debug(f"[FILES] {self.name}: New files to embed: {new_files}")
# Log filtering results for debugging
if len(new_files) < len(requested_files):
@@ -231,6 +240,7 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}"
)
logger.debug(f"[FILES] {self.name}: Skipped (already embedded): {skipped}")
return new_files
@@ -239,6 +249,7 @@ class BaseTool(ABC):
# and include all files rather than risk losing access to needed files
logger.warning(f"📁 {self.name} tool: Error checking conversation history for {continuation_id}: {e}")
logger.warning(f"📁 {self.name} tool: Including all requested files as fallback")
logger.debug(f"[FILES] {self.name}: Exception in filter_new_files, returning all {len(requested_files)} files as fallback")
return requested_files
def _prepare_file_content_for_prompt(
@@ -294,12 +305,14 @@ class BaseTool(ABC):
effective_max_tokens = max(1000, effective_max_tokens)
files_to_embed = self.filter_new_files(request_files, continuation_id)
logger.debug(f"[FILES] {self.name}: Will embed {len(files_to_embed)} files after filtering")
content_parts = []
# Read content of new files only
if files_to_embed:
logger.debug(f"📁 {self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}")
logger.debug(f"[FILES] {self.name}: Starting file embedding with token budget {effective_max_tokens + reserve_tokens:,}")
try:
file_content = read_files(
files_to_embed, max_tokens=effective_max_tokens + reserve_tokens, reserve_tokens=reserve_tokens
@@ -314,9 +327,13 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool successfully embedded {len(files_to_embed)} files ({content_tokens:,} tokens)"
)
logger.debug(f"[FILES] {self.name}: Successfully embedded files - {content_tokens:,} tokens used")
except Exception as e:
logger.error(f"📁 {self.name} tool failed to embed files {files_to_embed}: {type(e).__name__}: {e}")
logger.debug(f"[FILES] {self.name}: File embedding failed - {type(e).__name__}: {e}")
raise
else:
logger.debug(f"[FILES] {self.name}: No files to embed after filtering")
# Generate note about files already in conversation history
if continuation_id and len(files_to_embed) < len(request_files):
@@ -326,6 +343,7 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool skipping {len(skipped_files)} files already in conversation history: {', '.join(skipped_files)}"
)
logger.debug(f"[FILES] {self.name}: Adding note about {len(skipped_files)} skipped files")
if content_parts:
content_parts.append("\n\n")
note_lines = [
@@ -335,8 +353,12 @@ class BaseTool(ABC):
"--- END NOTE ---",
]
content_parts.append("\n".join(note_lines))
else:
logger.debug(f"[FILES] {self.name}: No skipped files to note")
return "".join(content_parts) if content_parts else ""
result = "".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] {self.name}: _prepare_file_content_for_prompt returning {len(result)} chars")
return result
def get_websearch_instruction(self, use_websearch: bool, tool_specific: Optional[str] = None) -> str:
"""
@@ -639,11 +661,29 @@ If any of these would strengthen your analysis, specify what Claude should searc
# Catch all exceptions to prevent server crashes
# Return error information in standardized format
logger = logging.getLogger(f"tools.{self.name}")
logger.error(f"Error in {self.name} tool execution: {str(e)}", exc_info=True)
error_msg = str(e)
# Check if this is a 500 INTERNAL error that asks for retry
if "500 INTERNAL" in error_msg and "Please retry" in error_msg:
logger.warning(f"500 INTERNAL error in {self.name} - attempting retry")
try:
# Single retry attempt
model = self._get_model_wrapper(request)
raw_response = await model.generate_content(prompt)
response = raw_response.text
# If successful, process normally
return [TextContent(type="text", text=self._process_response(response, request).model_dump_json())]
except Exception as retry_e:
logger.error(f"Retry failed for {self.name} tool: {str(retry_e)}")
error_msg = f"Tool failed after retry: {str(retry_e)}"
logger.error(f"Error in {self.name} tool execution: {error_msg}", exc_info=True)
error_output = ToolOutput(
status="error",
content=f"Error in {self.name}: {str(e)}",
content=f"Error in {self.name}: {error_msg}",
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]

View File

@@ -250,12 +250,16 @@ def add_turn(
- Turn limits prevent runaway conversations
- File references are preserved for cross-tool access
"""
logger.debug(f"[FLOW] Adding {role} turn to {thread_id} ({tool_name})")
context = get_thread(thread_id)
if not context:
logger.debug(f"[FLOW] Thread {thread_id} not found for turn addition")
return False
# Check turn limit to prevent runaway conversations
if len(context.turns) >= MAX_CONVERSATION_TURNS:
logger.debug(f"[FLOW] Thread {thread_id} at max turns ({MAX_CONVERSATION_TURNS})")
return False
# Create new turn with complete metadata
@@ -277,7 +281,8 @@ def add_turn(
key = f"thread:{thread_id}"
client.setex(key, 3600, context.model_dump_json()) # Refresh TTL to 1 hour
return True
except Exception:
except Exception as e:
logger.debug(f"[FLOW] Failed to save turn to Redis: {type(e).__name__}")
return False
@@ -296,19 +301,29 @@ def get_conversation_file_list(context: ThreadContext) -> list[str]:
list[str]: Deduplicated list of file paths referenced in the conversation
"""
if not context.turns:
logger.debug(f"[FILES] No turns found, returning empty file list")
return []
# Collect all unique files from all turns, preserving order of first appearance
seen_files = set()
unique_files = []
for turn in context.turns:
logger.debug(f"[FILES] Collecting files from {len(context.turns)} turns")
for i, turn in enumerate(context.turns):
if turn.files:
logger.debug(f"[FILES] Turn {i+1} has {len(turn.files)} files: {turn.files}")
for file_path in turn.files:
if file_path not in seen_files:
seen_files.add(file_path)
unique_files.append(file_path)
logger.debug(f"[FILES] Added new file: {file_path}")
else:
logger.debug(f"[FILES] Duplicate file skipped: {file_path}")
else:
logger.debug(f"[FILES] Turn {i+1} has no files")
logger.debug(f"[FILES] Final unique file list ({len(unique_files)}): {unique_files}")
return unique_files
@@ -345,6 +360,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
# Get all unique files referenced in this conversation
all_files = get_conversation_file_list(context)
logger.debug(f"[FILES] Found {len(all_files)} unique files in conversation history")
history_parts = [
"=== CONVERSATION HISTORY ===",
@@ -356,6 +372,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
# Embed all files referenced in this conversation once at the start
if all_files:
logger.debug(f"[FILES] Starting embedding for {len(all_files)} files")
history_parts.extend(
[
"=== FILES REFERENCED IN THIS CONVERSATION ===",
@@ -379,6 +396,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
for file_path in all_files:
try:
logger.debug(f"[FILES] Processing file {file_path}")
# Correctly unpack the tuple returned by read_file_content
formatted_content, content_tokens = read_file_content(file_path)
if formatted_content:
@@ -391,20 +409,24 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
logger.debug(
f"📄 File embedded in conversation history: {file_path} ({content_tokens:,} tokens)"
)
logger.debug(f"[FILES] Successfully embedded {file_path} - {content_tokens:,} tokens (total: {total_tokens:,})")
else:
files_truncated += 1
logger.debug(
f"📄 File truncated due to token limit: {file_path} ({content_tokens:,} tokens, would exceed {MAX_CONTENT_TOKENS:,} limit)"
)
logger.debug(f"[FILES] File {file_path} would exceed token limit - skipping (would be {total_tokens + content_tokens:,} tokens)")
# Stop processing more files
break
else:
logger.debug(f"📄 File skipped (empty content): {file_path}")
logger.debug(f"[FILES] File {file_path} has empty content - skipping")
except Exception as e:
# Skip files that can't be read but log the failure
logger.warning(
f"📄 Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}"
)
logger.debug(f"[FILES] Failed to read file {file_path} - {type(e).__name__}: {e}")
continue
if file_contents:
@@ -417,11 +439,13 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
logger.debug(
f"📄 Conversation history file embedding complete: {files_included} files embedded, {files_truncated} truncated, {total_tokens:,} total tokens"
)
logger.debug(f"[FILES] File embedding summary - {files_included} embedded, {files_truncated} truncated, {total_tokens:,} tokens total")
else:
history_parts.append("(No accessible files found)")
logger.debug(
f"📄 Conversation history file embedding: no accessible files found from {len(all_files)} requested"
)
logger.debug(f"[FILES] No accessible files found from {len(all_files)} requested files")
else:
# Fallback to original read_files function for backward compatibility
files_content = read_files_func(all_files)
@@ -482,6 +506,11 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
total_conversation_tokens = estimate_tokens(complete_history)
# Summary log of what was built
user_turns = len([t for t in context.turns if t.role == "user"])
assistant_turns = len([t for t in context.turns if t.role == "assistant"])
logger.debug(f"[FLOW] Built conversation history: {user_turns} user + {assistant_turns} assistant turns, {len(all_files)} files, {total_conversation_tokens:,} tokens")
return complete_history, total_conversation_tokens

View File

@@ -422,11 +422,14 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
Tuple of (formatted_content, estimated_tokens)
Content is wrapped with clear delimiters for AI parsing
"""
logger.debug(f"[FILES] read_file_content called for: {file_path}")
try:
# Validate path security before any file operations
path = resolve_and_validate_path(file_path)
logger.debug(f"[FILES] Path validated and resolved: {path}")
except (ValueError, PermissionError) as e:
# Return error in a format that provides context to the AI
logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
error_msg = str(e)
# Add Docker-specific help if we're in Docker and path is inaccessible
if WORKSPACE_ROOT and CONTAINER_WORKSPACE.exists():
@@ -438,40 +441,54 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
f"To access files in a different directory, please run Claude from that directory."
)
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
return content, estimate_tokens(content)
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
try:
# Validate file existence and type
if not path.exists():
logger.debug(f"[FILES] File does not exist: {file_path}")
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
if not path.is_file():
logger.debug(f"[FILES] Path is not a file: {file_path}")
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size to prevent memory exhaustion
file_size = path.stat().st_size
logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
if file_size > max_size:
logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
logger.debug(f"[FILES] Reading file content for {file_path}")
with open(path, encoding="utf-8", errors="replace") as f:
file_content = f.read()
logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
# NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
# ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
# vs. partial diff content when files appear in both sections
formatted = f"\n--- BEGIN FILE: {file_path} ---\n{file_content}\n--- END FILE: {file_path} ---\n"
return formatted, estimate_tokens(formatted)
tokens = estimate_tokens(formatted)
logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
return formatted, tokens
except Exception as e:
logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
return content, estimate_tokens(content)
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
def read_files(
@@ -500,6 +517,9 @@ def read_files(
if max_tokens is None:
max_tokens = MAX_CONTEXT_TOKENS
logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
logger.debug(f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}")
content_parts = []
total_tokens = 0
available_tokens = max_tokens - reserve_tokens
@@ -520,31 +540,40 @@ def read_files(
# Priority 2: Process file paths
if file_paths:
# Expand directories to get all individual files
logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
all_files = expand_paths(file_paths)
logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
if not all_files and file_paths:
# No files found but paths were provided
logger.debug(f"[FILES] No files found from provided paths")
content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
else:
# Read files sequentially until token limit is reached
for file_path in all_files:
logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
for i, file_path in enumerate(all_files):
if total_tokens >= available_tokens:
files_skipped.append(file_path)
continue
logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
files_skipped.extend(all_files[i:])
break
file_content, file_tokens = read_file_content(file_path)
logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
# Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content)
total_tokens += file_tokens
logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
else:
# File too large for remaining budget
logger.debug(f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)")
files_skipped.append(file_path)
# Add informative note about skipped files to help users understand
# what was omitted and why
if files_skipped:
logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files as examples
@@ -555,4 +584,6 @@ def read_files(
skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note)
return "\n\n".join(content_parts) if content_parts else ""
result = "\n\n".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
return result