Merge branch 'main' into feature/docs_workflow

This commit is contained in:
Patryk Ciechanski
2025-06-12 09:09:19 +02:00
35 changed files with 3038 additions and 216 deletions

View File

@@ -5,10 +5,22 @@
# Get your API key from: https://makersuite.google.com/app/apikey
GEMINI_API_KEY=your_gemini_api_key_here
# Optional: Redis connection URL for conversation memory
# Defaults to redis://localhost:6379/0
# For Docker: redis://redis:6379/0
REDIS_URL=redis://localhost:6379/0
# Optional: Default model to use
# Full names: 'gemini-2.5-pro-preview-06-05' or 'gemini-2.0-flash-exp'
# Defaults to gemini-2.5-pro-preview-06-05 if not specified
DEFAULT_MODEL=gemini-2.5-pro-preview-06-05
# Optional: Default thinking mode for ThinkDeep tool
# NOTE: Only applies to models that support extended thinking (e.g., Gemini 2.5 Pro)
# Flash models (2.0) will use system prompt engineering instead
# Token consumption per mode:
# minimal: 128 tokens - Quick analysis, fastest response
# low: 2,048 tokens - Light reasoning tasks
# medium: 8,192 tokens - Balanced reasoning (good for most cases)
# high: 16,384 tokens - Complex analysis (recommended for thinkdeep)
# max: 32,768 tokens - Maximum reasoning depth, slowest but most thorough
# Defaults to 'high' if not specified
DEFAULT_THINKING_MODE_THINKDEEP=high
# Optional: Workspace root directory for file access
# This should be the HOST path that contains all files Claude might reference

View File

@@ -10,12 +10,23 @@
> **📚 [Comprehensive Documentation Available](docs/)** - This README provides quick start instructions. For detailed guides, API references, architecture documentation, and development workflows, see our [complete documentation](docs/).
The ultimate development partner for Claude - a Model Context Protocol server that gives Claude access to Google's Gemini 2.5 Pro for extended thinking, code analysis, and problem-solving. **Automatically reads files and directories, passing their contents to Gemini for analysis within its 1M token context.**
The ultimate development partner for Claude - a Model Context Protocol server that gives Claude access to Google's Gemini models (2.5 Pro for extended thinking, 2.0 Flash for speed) for code analysis, problem-solving, and collaborative development. **Automatically reads files and directories, passing their contents to Gemini for analysis within its 1M token context.**
**Features true AI orchestration with conversation continuity across tool usage** - start a task with one tool, continue with another, and maintain full context throughout. Claude and Gemini can collaborate seamlessly across multiple interactions and different tools, creating a unified development experience.
**Features true AI orchestration with conversations that continue across tasks** - Give Claude a complex task and ask it to collaborate with Gemini.
Claude stays in control, performs the actual work, but gets a second perspective from Gemini. Claude will talk to Gemini, work on implementation, then automatically resume the
conversation with Gemini while maintaining the full thread.
Claude can switch between different Gemini tools ([`thinkdeep`](#2-thinkdeep---extended-reasoning-partner) → [`chat`](#1-chat---general-development-chat--collaborative-thinking) → [`precommit`](#4-precommit---pre-commit-validation) → [`codereview`](#3-codereview---professional-code-review)) and the conversation context carries forward seamlessly.
For example, in the video above, Claude was asked to debate SwiftUI vs UIKit with Gemini, resulting in a back-and-forth discussion rather than a simple one-shot query and response.
**Think of it as Claude Code _for_ Claude Code.**
---
> ⚠️ **Active Development Notice**
> This project is under rapid development with frequent commits and changes over the past few days.
> The goal is to expand support beyond Gemini to include additional AI models and providers.
> **Watch this space** for new capabilities and potentially breaking changes in between updates!
## Quick Navigation
- **Getting Started**
@@ -38,6 +49,7 @@ The ultimate development partner for Claude - a Model Context Protocol server th
- [`analyze`](#6-analyze---smart-file-analysis) - File analysis
- **Advanced Topics**
- [Model Configuration](#model-configuration) - Pro vs Flash model selection
- [Thinking Modes](#thinking-modes---managing-token-costs--quality) - Control depth vs cost
- [Working with Large Prompts](#working-with-large-prompts) - Bypass MCP's 25K token limit
- [Web Search Integration](#web-search-integration) - Smart search recommendations
@@ -588,6 +600,7 @@ All tools that work with files support **both individual files and entire direct
**`analyze`** - Analyze files or directories
- `files`: List of file paths or directories (required)
- `question`: What to analyze (required)
- `model`: pro|flash (default: server default)
- `analysis_type`: architecture|performance|security|quality|general
- `output_format`: summary|detailed|actionable
- `thinking_mode`: minimal|low|medium|high|max (default: medium)
@@ -595,11 +608,13 @@ All tools that work with files support **both individual files and entire direct
```
"Use gemini to analyze the src/ directory for architectural patterns"
"Get gemini to analyze main.py and tests/ to understand test coverage"
"Use flash to quickly analyze main.py and tests/ to understand test coverage"
"Use pro for deep analysis of the entire backend/ directory structure"
```
**`codereview`** - Review code files or directories
- `files`: List of file paths or directories (required)
- `model`: pro|flash (default: server default)
- `review_type`: full|security|performance|quick
- `focus_on`: Specific aspects to focus on
- `standards`: Coding standards to enforce
@@ -607,12 +622,13 @@ All tools that work with files support **both individual files and entire direct
- `thinking_mode`: minimal|low|medium|high|max (default: medium)
```
"Use gemini to review the entire api/ directory for security issues"
"Get gemini to review src/ with focus on performance, only show critical issues"
"Use pro to review the entire api/ directory for security issues"
"Use flash to quickly review src/ with focus on performance, only show critical issues"
```
**`debug`** - Debug with file context
- `error_description`: Description of the issue (required)
- `model`: pro|flash (default: server default)
- `error_context`: Stack trace or logs
- `files`: Files or directories related to the issue
- `runtime_info`: Environment details
@@ -626,6 +642,7 @@ All tools that work with files support **both individual files and entire direct
**`thinkdeep`** - Extended analysis with file context
- `current_analysis`: Your current thinking (required)
- `model`: pro|flash (default: server default)
- `problem_context`: Additional context
- `focus_areas`: Specific aspects to focus on
- `files`: Files or directories for context
@@ -867,7 +884,31 @@ This enables better integration, error handling, and support for the dynamic con
The server includes several configurable properties that control its behavior:
### Model Configuration
- **`GEMINI_MODEL`**: `"gemini-2.5-pro-preview-06-05"` - The latest Gemini 2.5 Pro model with native thinking support
**Default Model (Environment Variable):**
- **`DEFAULT_MODEL`**: Set your preferred default model globally
- Default: `"gemini-2.5-pro-preview-06-05"` (extended thinking capabilities)
- Alternative: `"gemini-2.0-flash-exp"` (faster responses)
**Per-Tool Model Selection:**
All tools support a `model` parameter for flexible model switching:
- **`"pro"`** → Gemini 2.5 Pro (extended thinking, slower, higher quality)
- **`"flash"`** → Gemini 2.0 Flash (faster responses, lower cost)
- **Full model names** → Direct model specification
**Examples:**
```env
# Set default globally in .env file
DEFAULT_MODEL=flash
```
```
# Per-tool usage in Claude
"Use flash to quickly analyze this function"
"Use pro for deep architectural analysis"
```
**Token Limits:**
- **`MAX_CONTEXT_TOKENS`**: `1,000,000` - Maximum input context (1M tokens for Gemini 2.5 Pro)
### Temperature Defaults

View File

@@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
Communication Simulator Test for Gemini MCP Server
This script provides comprehensive end-to-end testing of the Gemini MCP server
by simulating real Claude CLI communications and validating conversation
continuity, file handling, deduplication features, and clarification scenarios.
Test Flow:
1. Setup fresh Docker environment with clean containers
2. Load and run individual test modules
3. Validate system behavior through logs and Redis
4. Cleanup and report results
Usage:
python communication_simulator_test.py [--verbose] [--keep-logs] [--tests TEST_NAME...] [--individual TEST_NAME] [--skip-docker]
--tests: Run specific tests only (space-separated)
--list-tests: List all available tests
--individual: Run a single test individually
--skip-docker: Skip Docker setup (assumes containers are already running)
Available tests:
basic_conversation - Basic conversation flow with chat tool
per_tool_deduplication - File deduplication for individual tools
cross_tool_continuation - Cross-tool conversation continuation scenarios
content_validation - Content validation and duplicate detection
logs_validation - Docker logs validation
redis_validation - Redis conversation memory validation
Examples:
# Run all tests
python communication_simulator_test.py
# Run only basic conversation and content validation tests
python communication_simulator_test.py --tests basic_conversation content_validation
# Run a single test individually (with full Docker setup)
python communication_simulator_test.py --individual content_validation
# Run a single test individually (assuming Docker is already running)
python communication_simulator_test.py --individual content_validation --skip-docker
# List available tests
python communication_simulator_test.py --list-tests
"""
import argparse
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import time
class CommunicationSimulator:
"""Simulates real-world Claude CLI communication with MCP Gemini server"""
def __init__(self, verbose: bool = False, keep_logs: bool = False, selected_tests: list[str] = None):
self.verbose = verbose
self.keep_logs = keep_logs
self.selected_tests = selected_tests or []
self.temp_dir = None
self.container_name = "gemini-mcp-server"
self.redis_container = "gemini-mcp-redis"
# Import test registry
from simulator_tests import TEST_REGISTRY
self.test_registry = TEST_REGISTRY
# Available test methods mapping
self.available_tests = {
name: self._create_test_runner(test_class) for name, test_class in self.test_registry.items()
}
# Test result tracking
self.test_results = dict.fromkeys(self.test_registry.keys(), False)
# Configure logging
log_level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s")
self.logger = logging.getLogger(__name__)
def _create_test_runner(self, test_class):
"""Create a test runner function for a test class"""
def run_test():
test_instance = test_class(verbose=self.verbose)
result = test_instance.run_test()
# Update results
test_name = test_instance.test_name
self.test_results[test_name] = result
return result
return run_test
def setup_test_environment(self) -> bool:
"""Setup fresh Docker environment"""
try:
self.logger.info("🚀 Setting up test environment...")
# Create temporary directory for test files
self.temp_dir = tempfile.mkdtemp(prefix="mcp_test_")
self.logger.debug(f"Created temp directory: {self.temp_dir}")
# Setup Docker environment
return self._setup_docker()
except Exception as e:
self.logger.error(f"Failed to setup test environment: {e}")
return False
def _setup_docker(self) -> bool:
"""Setup fresh Docker environment"""
try:
self.logger.info("🐳 Setting up Docker environment...")
# Stop and remove existing containers
self._run_command(["docker", "compose", "down", "--remove-orphans"], check=False, capture_output=True)
# Clean up any old containers/images
old_containers = [self.container_name, self.redis_container]
for container in old_containers:
self._run_command(["docker", "stop", container], check=False, capture_output=True)
self._run_command(["docker", "rm", container], check=False, capture_output=True)
# Build and start services
self.logger.info("📦 Building Docker images...")
result = self._run_command(["docker", "compose", "build", "--no-cache"], capture_output=True)
if result.returncode != 0:
self.logger.error(f"Docker build failed: {result.stderr}")
return False
self.logger.info("🚀 Starting Docker services...")
result = self._run_command(["docker", "compose", "up", "-d"], capture_output=True)
if result.returncode != 0:
self.logger.error(f"Docker startup failed: {result.stderr}")
return False
# Wait for services to be ready
self.logger.info("⏳ Waiting for services to be ready...")
time.sleep(10) # Give services time to initialize
# Verify containers are running
if not self._verify_containers():
return False
self.logger.info("✅ Docker environment ready")
return True
except Exception as e:
self.logger.error(f"Docker setup failed: {e}")
return False
def _verify_containers(self) -> bool:
"""Verify that required containers are running"""
try:
result = self._run_command(["docker", "ps", "--format", "{{.Names}}"], capture_output=True)
running_containers = result.stdout.decode().strip().split("\\n")
required = [self.container_name, self.redis_container]
for container in required:
if container not in running_containers:
self.logger.error(f"Container not running: {container}")
return False
self.logger.debug(f"Verified containers running: {required}")
return True
except Exception as e:
self.logger.error(f"Container verification failed: {e}")
return False
def simulate_claude_cli_session(self) -> bool:
"""Simulate a complete Claude CLI session with conversation continuity"""
try:
self.logger.info("🤖 Starting Claude CLI simulation...")
# If specific tests are selected, run only those
if self.selected_tests:
return self._run_selected_tests()
# Otherwise run all tests in order
test_sequence = list(self.test_registry.keys())
for test_name in test_sequence:
if not self._run_single_test(test_name):
return False
self.logger.info("✅ All tests passed")
return True
except Exception as e:
self.logger.error(f"Claude CLI simulation failed: {e}")
return False
def _run_selected_tests(self) -> bool:
"""Run only the selected tests"""
try:
self.logger.info(f"🎯 Running selected tests: {', '.join(self.selected_tests)}")
for test_name in self.selected_tests:
if not self._run_single_test(test_name):
return False
self.logger.info("✅ All selected tests passed")
return True
except Exception as e:
self.logger.error(f"Selected tests failed: {e}")
return False
def _run_single_test(self, test_name: str) -> bool:
"""Run a single test by name"""
try:
if test_name not in self.available_tests:
self.logger.error(f"Unknown test: {test_name}")
self.logger.info(f"Available tests: {', '.join(self.available_tests.keys())}")
return False
self.logger.info(f"🧪 Running test: {test_name}")
test_function = self.available_tests[test_name]
result = test_function()
if result:
self.logger.info(f"✅ Test {test_name} passed")
else:
self.logger.error(f"❌ Test {test_name} failed")
return result
except Exception as e:
self.logger.error(f"Test {test_name} failed with exception: {e}")
return False
def run_individual_test(self, test_name: str, skip_docker_setup: bool = False) -> bool:
"""Run a single test individually with optional Docker setup skip"""
try:
if test_name not in self.available_tests:
self.logger.error(f"Unknown test: {test_name}")
self.logger.info(f"Available tests: {', '.join(self.available_tests.keys())}")
return False
self.logger.info(f"🧪 Running individual test: {test_name}")
# Setup environment unless skipped
if not skip_docker_setup:
if not self.setup_test_environment():
self.logger.error("❌ Environment setup failed")
return False
# Run the single test
test_function = self.available_tests[test_name]
result = test_function()
if result:
self.logger.info(f"✅ Individual test {test_name} passed")
else:
self.logger.error(f"❌ Individual test {test_name} failed")
return result
except Exception as e:
self.logger.error(f"Individual test {test_name} failed with exception: {e}")
return False
finally:
if not skip_docker_setup and not self.keep_logs:
self.cleanup()
def get_available_tests(self) -> dict[str, str]:
"""Get available tests with descriptions"""
descriptions = {}
for name, test_class in self.test_registry.items():
# Create temporary instance to get description
temp_instance = test_class(verbose=False)
descriptions[name] = temp_instance.test_description
return descriptions
def print_test_summary(self):
"""Print comprehensive test results summary"""
print("\\n" + "=" * 70)
print("🧪 GEMINI MCP COMMUNICATION SIMULATOR - TEST RESULTS SUMMARY")
print("=" * 70)
passed_count = sum(1 for result in self.test_results.values() if result)
total_count = len(self.test_results)
for test_name, result in self.test_results.items():
status = "✅ PASS" if result else "❌ FAIL"
# Get test description
temp_instance = self.test_registry[test_name](verbose=False)
description = temp_instance.test_description
print(f"📝 {description}: {status}")
print(f"\\n🎯 OVERALL RESULT: {'🎉 SUCCESS' if passed_count == total_count else '❌ FAILURE'}")
print(f"{passed_count}/{total_count} tests passed")
print("=" * 70)
return passed_count == total_count
def run_full_test_suite(self, skip_docker_setup: bool = False) -> bool:
"""Run the complete test suite"""
try:
self.logger.info("🚀 Starting Gemini MCP Communication Simulator Test Suite")
# Setup
if not skip_docker_setup:
if not self.setup_test_environment():
self.logger.error("❌ Environment setup failed")
return False
else:
self.logger.info("⏩ Skipping Docker setup (containers assumed running)")
# Main simulation
if not self.simulate_claude_cli_session():
self.logger.error("❌ Claude CLI simulation failed")
return False
# Print comprehensive summary
overall_success = self.print_test_summary()
return overall_success
except Exception as e:
self.logger.error(f"Test suite failed: {e}")
return False
finally:
if not self.keep_logs and not skip_docker_setup:
self.cleanup()
def cleanup(self):
"""Cleanup test environment"""
try:
self.logger.info("🧹 Cleaning up test environment...")
if not self.keep_logs:
# Stop Docker services
self._run_command(["docker", "compose", "down", "--remove-orphans"], check=False, capture_output=True)
else:
self.logger.info("📋 Keeping Docker services running for log inspection")
# Remove temp directory
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
self.logger.debug(f"Removed temp directory: {self.temp_dir}")
except Exception as e:
self.logger.error(f"Cleanup failed: {e}")
def _run_command(self, cmd: list[str], check: bool = True, capture_output: bool = False, **kwargs):
"""Run a shell command with logging"""
if self.verbose:
self.logger.debug(f"Running: {' '.join(cmd)}")
return subprocess.run(cmd, check=check, capture_output=capture_output, **kwargs)
def parse_arguments():
"""Parse and validate command line arguments"""
parser = argparse.ArgumentParser(description="Gemini MCP Communication Simulator Test")
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
parser.add_argument("--keep-logs", action="store_true", help="Keep Docker services running for log inspection")
parser.add_argument("--tests", "-t", nargs="+", help="Specific tests to run (space-separated)")
parser.add_argument("--list-tests", action="store_true", help="List available tests and exit")
parser.add_argument("--individual", "-i", help="Run a single test individually")
parser.add_argument(
"--skip-docker",
action="store_true",
default=True,
help="Skip Docker setup (assumes containers are already running) - DEFAULT",
)
parser.add_argument(
"--rebuild-docker", action="store_true", help="Force rebuild Docker environment (overrides --skip-docker)"
)
return parser.parse_args()
def list_available_tests():
"""List all available tests and exit"""
simulator = CommunicationSimulator()
print("Available tests:")
for test_name, description in simulator.get_available_tests().items():
print(f" {test_name:<25} - {description}")
def run_individual_test(simulator, test_name, skip_docker):
"""Run a single test individually"""
try:
success = simulator.run_individual_test(test_name, skip_docker_setup=skip_docker)
if success:
print(f"\\n🎉 INDIVIDUAL TEST {test_name.upper()}: PASSED")
return 0
else:
print(f"\\n❌ INDIVIDUAL TEST {test_name.upper()}: FAILED")
return 1
except KeyboardInterrupt:
print(f"\\n🛑 Individual test {test_name} interrupted by user")
if not skip_docker:
simulator.cleanup()
return 130
except Exception as e:
print(f"\\n💥 Individual test {test_name} failed with error: {e}")
if not skip_docker:
simulator.cleanup()
return 1
def run_test_suite(simulator, skip_docker=False):
"""Run the full test suite or selected tests"""
try:
success = simulator.run_full_test_suite(skip_docker_setup=skip_docker)
if success:
print("\\n🎉 COMPREHENSIVE MCP COMMUNICATION TEST: PASSED")
return 0
else:
print("\\n❌ COMPREHENSIVE MCP COMMUNICATION TEST: FAILED")
print("⚠️ Check detailed results above")
return 1
except KeyboardInterrupt:
print("\\n🛑 Test interrupted by user")
if not skip_docker:
simulator.cleanup()
return 130
except Exception as e:
print(f"\\n💥 Unexpected error: {e}")
if not skip_docker:
simulator.cleanup()
return 1
def main():
"""Main entry point"""
args = parse_arguments()
# Handle list tests request
if args.list_tests:
list_available_tests()
return
# Initialize simulator consistently for all use cases
simulator = CommunicationSimulator(verbose=args.verbose, keep_logs=args.keep_logs, selected_tests=args.tests)
# Determine execution mode and run
# Override skip_docker if rebuild_docker is specified
skip_docker = args.skip_docker and not args.rebuild_docker
if args.individual:
exit_code = run_individual_test(simulator, args.individual, skip_docker)
else:
exit_code = run_test_suite(simulator, skip_docker)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -13,21 +13,23 @@ import os
# Version and metadata
# These values are used in server responses and for tracking releases
# IMPORTANT: This is the single source of truth for version and author info
# setup.py imports these values to avoid duplication
__version__ = "3.2.0" # Semantic versioning: MAJOR.MINOR.PATCH
__updated__ = "2025-06-10" # Last update date in ISO format
__version__ = "3.3.0" # Semantic versioning: MAJOR.MINOR.PATCH
__updated__ = "2025-06-11" # Last update date in ISO format
__author__ = "Fahad Gilani" # Primary maintainer
# Model configuration
# GEMINI_MODEL: The Gemini model used for all AI operations
# DEFAULT_MODEL: The default model used for all AI operations
# This should be a stable, high-performance model suitable for code analysis
GEMINI_MODEL = "gemini-2.5-pro-preview-06-05"
# Can be overridden by setting DEFAULT_MODEL environment variable
DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gemini-2.5-pro-preview-06-05")
# MAX_CONTEXT_TOKENS: Maximum number of tokens that can be included in a single request
# This limit includes both the prompt and expected response
# Gemini Pro models support up to 1M tokens, but practical usage should reserve
# space for the model's response (typically 50K-100K tokens reserved)
MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens for Gemini Pro
# Token allocation for Gemini Pro (1M total capacity)
# MAX_CONTEXT_TOKENS: Total model capacity
# MAX_CONTENT_TOKENS: Available for prompts, conversation history, and files
# RESPONSE_RESERVE_TOKENS: Reserved for model response generation
MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens total capacity for Gemini Pro
MAX_CONTENT_TOKENS = 800_000 # 800K tokens for content (prompts + files + history)
RESPONSE_RESERVE_TOKENS = 200_000 # 200K tokens reserved for response generation
# Temperature defaults for different tool types
# Temperature controls the randomness/creativity of model responses
@@ -46,6 +48,11 @@ TEMPERATURE_BALANCED = 0.5 # For general chat
# Used when brainstorming, exploring alternatives, or architectural discussions
TEMPERATURE_CREATIVE = 0.7 # For architecture, deep thinking
# Thinking Mode Defaults
# DEFAULT_THINKING_MODE_THINKDEEP: Default thinking depth for extended reasoning tool
# Higher modes use more computational budget but provide deeper analysis
DEFAULT_THINKING_MODE_THINKDEEP = os.getenv("DEFAULT_THINKING_MODE_THINKDEEP", "high")
# MCP Protocol Limits
# MCP_PROMPT_SIZE_LIMIT: Maximum character size for prompts sent directly through MCP
# The MCP protocol has a combined request+response limit of ~25K tokens.

View File

@@ -7,7 +7,7 @@ services:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --save 60 1 --loglevel warning --maxmemory 512mb --maxmemory-policy allkeys-lru
command: redis-server --save 60 1 --loglevel warning --maxmemory 64mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
@@ -29,7 +29,9 @@ services:
redis:
condition: service_healthy
environment:
- GEMINI_API_KEY=${GEMINI_API_KEY}
- GEMINI_API_KEY=${GEMINI_API_KEY:?GEMINI_API_KEY is required. Please set it in your .env file or environment.}
- DEFAULT_MODEL=${DEFAULT_MODEL:-gemini-2.5-pro-preview-06-05}
- DEFAULT_THINKING_MODE_THINKDEEP=${DEFAULT_THINKING_MODE_THINKDEEP:-high}
- REDIS_URL=redis://redis:6379/0
# Use HOME not PWD: Claude needs access to any absolute file path, not just current project,
# and Claude Code could be running from multiple locations at the same time
@@ -39,6 +41,8 @@ services:
volumes:
- ${HOME:-/tmp}:/workspace:ro
- mcp_logs:/tmp # Shared volume for logs
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
stdin_open: true
tty: true
entrypoint: ["python"]
@@ -55,6 +59,8 @@ services:
- PYTHONUNBUFFERED=1
volumes:
- mcp_logs:/tmp # Shared volume for logs
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
entrypoint: ["python"]
command: ["log_monitor.py"]

View File

@@ -22,6 +22,7 @@ import asyncio
import logging
import os
import sys
import time
from datetime import datetime
from typing import Any
@@ -31,7 +32,7 @@ from mcp.server.stdio import stdio_server
from mcp.types import ServerCapabilities, TextContent, Tool, ToolsCapability
from config import (
GEMINI_MODEL,
DEFAULT_MODEL,
MAX_CONTEXT_TOKENS,
__author__,
__updated__,
@@ -51,6 +52,21 @@ from tools.models import ToolOutput
# Can be controlled via LOG_LEVEL environment variable (DEBUG, INFO, WARNING, ERROR)
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
# Create timezone-aware formatter
class LocalTimeFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
"""Override to use local timezone instead of UTC"""
ct = self.converter(record.created)
if datefmt:
s = time.strftime(datefmt, ct)
else:
t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
s = f"{t},{record.msecs:03.0f}"
return s
# Configure both console and file logging
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(
@@ -60,18 +76,22 @@ logging.basicConfig(
stream=sys.stderr, # Use stderr to avoid interfering with MCP stdin/stdout protocol
)
# Apply local time formatter to root logger
for handler in logging.getLogger().handlers:
handler.setFormatter(LocalTimeFormatter(log_format))
# Add file handler for Docker log monitoring
try:
file_handler = logging.FileHandler("/tmp/mcp_server.log")
file_handler.setLevel(getattr(logging, log_level, logging.INFO))
file_handler.setFormatter(logging.Formatter(log_format))
file_handler.setFormatter(LocalTimeFormatter(log_format))
logging.getLogger().addHandler(file_handler)
# Create a special logger for MCP activity tracking
mcp_logger = logging.getLogger("mcp_activity")
mcp_file_handler = logging.FileHandler("/tmp/mcp_activity.log")
mcp_file_handler.setLevel(logging.INFO)
mcp_file_handler.setFormatter(logging.Formatter("%(asctime)s - %(message)s"))
mcp_file_handler.setFormatter(LocalTimeFormatter("%(asctime)s - %(message)s"))
mcp_logger.addHandler(mcp_file_handler)
mcp_logger.setLevel(logging.INFO)
@@ -196,6 +216,10 @@ async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextCon
if "continuation_id" in arguments and arguments["continuation_id"]:
continuation_id = arguments["continuation_id"]
logger.debug(f"Resuming conversation thread: {continuation_id}")
logger.debug(
f"[CONVERSATION_DEBUG] Tool '{name}' resuming thread {continuation_id} with {len(arguments)} arguments"
)
logger.debug(f"[CONVERSATION_DEBUG] Original arguments keys: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
@@ -205,6 +229,9 @@ async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextCon
pass
arguments = await reconstruct_thread_context(arguments)
logger.debug(f"[CONVERSATION_DEBUG] After thread reconstruction, arguments keys: {list(arguments.keys())}")
if "_remaining_tokens" in arguments:
logger.debug(f"[CONVERSATION_DEBUG] Remaining token budget: {arguments['_remaining_tokens']:,}")
# Route to AI-powered tools that require Gemini API calls
if name in TOOLS:
@@ -300,9 +327,11 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
continuation_id = arguments["continuation_id"]
# Get thread context from Redis
logger.debug(f"[CONVERSATION_DEBUG] Looking up thread {continuation_id} in Redis")
context = get_thread(continuation_id)
if not context:
logger.warning(f"Thread not found: {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread {continuation_id} not found in Redis or expired")
# Log to activity file for monitoring
try:
@@ -324,15 +353,26 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
if user_prompt:
# Capture files referenced in this turn
user_files = arguments.get("files", [])
logger.debug(f"[CONVERSATION_DEBUG] Adding user turn to thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] User prompt length: {len(user_prompt)} chars")
logger.debug(f"[CONVERSATION_DEBUG] User files: {user_files}")
success = add_turn(continuation_id, "user", user_prompt, files=user_files)
if not success:
logger.warning(f"Failed to add user turn to thread {continuation_id}")
logger.debug("[CONVERSATION_DEBUG] Failed to add user turn - thread may be at turn limit or expired")
else:
logger.debug(f"[CONVERSATION_DEBUG] Successfully added user turn to thread {continuation_id}")
# Build conversation history
conversation_history = build_conversation_history(context)
# Build conversation history and track token usage
logger.debug(f"[CONVERSATION_DEBUG] Building conversation history for thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread has {len(context.turns)} turns, tool: {context.tool_name}")
conversation_history, conversation_tokens = build_conversation_history(context)
logger.debug(f"[CONVERSATION_DEBUG] Conversation history built: {conversation_tokens:,} tokens")
logger.debug(f"[CONVERSATION_DEBUG] Conversation history length: {len(conversation_history)} chars")
# Add dynamic follow-up instructions based on turn count
follow_up_instructions = get_follow_up_instructions(len(context.turns))
logger.debug(f"[CONVERSATION_DEBUG] Follow-up instructions added for turn {len(context.turns)}")
# Merge original context with new prompt and follow-up instructions
original_prompt = arguments.get("prompt", "")
@@ -343,17 +383,34 @@ async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any
else:
enhanced_prompt = f"{original_prompt}\n\n{follow_up_instructions}"
# Update arguments with enhanced context
# Update arguments with enhanced context and remaining token budget
enhanced_arguments = arguments.copy()
enhanced_arguments["prompt"] = enhanced_prompt
# Calculate remaining token budget for current request files/content
from config import MAX_CONTENT_TOKENS
remaining_tokens = MAX_CONTENT_TOKENS - conversation_tokens
enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative
logger.debug("[CONVERSATION_DEBUG] Token budget calculation:")
logger.debug(f"[CONVERSATION_DEBUG] MAX_CONTENT_TOKENS: {MAX_CONTENT_TOKENS:,}")
logger.debug(f"[CONVERSATION_DEBUG] Conversation tokens: {conversation_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Remaining tokens: {remaining_tokens:,}")
# Merge original context parameters (files, etc.) with new request
if context.initial_context:
logger.debug(f"[CONVERSATION_DEBUG] Merging initial context with {len(context.initial_context)} parameters")
for key, value in context.initial_context.items():
if key not in enhanced_arguments and key not in ["temperature", "thinking_mode", "model"]:
enhanced_arguments[key] = value
logger.debug(f"[CONVERSATION_DEBUG] Merged initial context param: {key}")
logger.info(f"Reconstructed context for thread {continuation_id} (turn {len(context.turns)})")
logger.debug(f"[CONVERSATION_DEBUG] Final enhanced arguments keys: {list(enhanced_arguments.keys())}")
# Debug log files in the enhanced arguments for file tracking
if "files" in enhanced_arguments:
logger.debug(f"[CONVERSATION_DEBUG] Final files in enhanced arguments: {enhanced_arguments['files']}")
# Log to activity file for monitoring
try:
@@ -378,12 +435,16 @@ async def handle_get_version() -> list[TextContent]:
Returns:
Formatted text with version and configuration details
"""
# Import thinking mode here to avoid circular imports
from config import DEFAULT_THINKING_MODE_THINKDEEP
# Gather comprehensive server information
version_info = {
"version": __version__,
"updated": __updated__,
"author": __author__,
"gemini_model": GEMINI_MODEL,
"default_model": DEFAULT_MODEL,
"default_thinking_mode_thinkdeep": DEFAULT_THINKING_MODE_THINKDEEP,
"max_context_tokens": f"{MAX_CONTEXT_TOKENS:,}",
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"server_started": datetime.now().isoformat(),
@@ -396,7 +457,8 @@ Updated: {__updated__}
Author: {__author__}
Configuration:
- Gemini Model: {GEMINI_MODEL}
- Default Model: {DEFAULT_MODEL}
- Default Thinking Mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}
- Max Context: {MAX_CONTEXT_TOKENS:,} tokens
- Python: {version_info["python_version"]}
- Started: {version_info["server_started"]}
@@ -429,7 +491,13 @@ async def main():
# Log startup message for Docker log monitoring
logger.info("Gemini MCP Server starting up...")
logger.info(f"Log level: {log_level}")
logger.info(f"Using model: {GEMINI_MODEL}")
logger.info(f"Using default model: {DEFAULT_MODEL}")
# Import here to avoid circular imports
from config import DEFAULT_THINKING_MODE_THINKDEEP
logger.info(f"Default thinking mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}")
logger.info(f"Available tools: {list(TOOLS.keys())}")
logger.info("Server ready - waiting for tool requests...")

View File

@@ -17,41 +17,34 @@ if [ -f .env ]; then
echo "⚠️ .env file already exists! Updating if needed..."
echo ""
else
# Check if GEMINI_API_KEY is already set in environment
if [ -n "$GEMINI_API_KEY" ]; then
API_KEY_VALUE="$GEMINI_API_KEY"
echo "✅ Found existing GEMINI_API_KEY in environment"
else
API_KEY_VALUE="your-gemini-api-key-here"
# Copy from .env.example and customize
if [ ! -f .env.example ]; then
echo "❌ .env.example file not found! This file should exist in the project directory."
exit 1
fi
# Create the .env file
cat > .env << EOF
# Gemini MCP Server Docker Environment Configuration
# Generated on $(date)
# Your Gemini API key (get one from https://makersuite.google.com/app/apikey)
# IMPORTANT: Replace this with your actual API key
GEMINI_API_KEY=$API_KEY_VALUE
# Redis configuration (automatically set for Docker Compose)
REDIS_URL=redis://redis:6379/0
# Workspace root - host path that maps to /workspace in container
# This should be the host directory path that contains all files Claude might reference
# We use $HOME (not $PWD) because Claude needs access to ANY absolute file path,
# not just files within the current project directory. Additionally, Claude Code
# could be running from multiple locations at the same time.
WORKSPACE_ROOT=$HOME
# Logging level (DEBUG, INFO, WARNING, ERROR)
# DEBUG: Shows detailed operational messages, conversation threading, tool execution flow
# INFO: Shows general operational messages (default)
# WARNING: Shows only warnings and errors
# ERROR: Shows only errors
# Uncomment and change to DEBUG if you need detailed troubleshooting information
LOG_LEVEL=INFO
EOF
# Copy .env.example to .env
cp .env.example .env
echo "✅ Created .env from .env.example"
# Customize the API key if it's set in environment
if [ -n "$GEMINI_API_KEY" ]; then
# Replace the placeholder API key with the actual value
if command -v sed >/dev/null 2>&1; then
sed -i.bak "s/your_gemini_api_key_here/$GEMINI_API_KEY/" .env && rm .env.bak
echo "✅ Updated .env with existing GEMINI_API_KEY from environment"
else
echo "⚠️ Found GEMINI_API_KEY in environment, but sed not available. Please update .env manually."
fi
else
echo "⚠️ GEMINI_API_KEY not found in environment. Please edit .env and add your API key."
fi
# Update WORKSPACE_ROOT to use current user's home directory
if command -v sed >/dev/null 2>&1; then
sed -i.bak "s|WORKSPACE_ROOT=/Users/your-username|WORKSPACE_ROOT=$HOME|" .env && rm .env.bak
echo "✅ Updated WORKSPACE_ROOT to $HOME"
fi
echo "✅ Created .env file with Redis configuration"
echo ""
fi

View File

@@ -1,52 +0,0 @@
"""
Setup configuration for Gemini MCP Server
"""
from pathlib import Path
from setuptools import setup
# Import version and author from config to maintain single source of truth
from config import __author__, __version__
# Read README for long description
readme_path = Path(__file__).parent / "README.md"
long_description = ""
if readme_path.exists():
long_description = readme_path.read_text(encoding="utf-8")
setup(
name="gemini-mcp-server",
version=__version__,
description="Model Context Protocol server for Google Gemini",
long_description=long_description,
long_description_content_type="text/markdown",
author=__author__,
python_requires=">=3.10",
py_modules=["gemini_server"],
install_requires=[
"mcp>=1.0.0",
"google-genai>=1.19.0",
"pydantic>=2.0.0",
],
extras_require={
"dev": [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-mock>=3.11.0",
]
},
entry_points={
"console_scripts": [
"gemini-mcp-server=gemini_server:main",
],
},
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
],
)

View File

@@ -0,0 +1,41 @@
"""
Communication Simulator Tests Package
This package contains individual test modules for the Gemini MCP Communication Simulator.
Each test is in its own file for better organization and maintainability.
"""
from .base_test import BaseSimulatorTest
from .test_basic_conversation import BasicConversationTest
from .test_content_validation import ContentValidationTest
from .test_cross_tool_comprehensive import CrossToolComprehensiveTest
from .test_cross_tool_continuation import CrossToolContinuationTest
from .test_logs_validation import LogsValidationTest
from .test_model_thinking_config import TestModelThinkingConfig
from .test_per_tool_deduplication import PerToolDeduplicationTest
from .test_redis_validation import RedisValidationTest
# Test registry for dynamic loading
TEST_REGISTRY = {
"basic_conversation": BasicConversationTest,
"content_validation": ContentValidationTest,
"per_tool_deduplication": PerToolDeduplicationTest,
"cross_tool_continuation": CrossToolContinuationTest,
"cross_tool_comprehensive": CrossToolComprehensiveTest,
"logs_validation": LogsValidationTest,
"redis_validation": RedisValidationTest,
"model_thinking_config": TestModelThinkingConfig,
}
__all__ = [
"BaseSimulatorTest",
"BasicConversationTest",
"ContentValidationTest",
"PerToolDeduplicationTest",
"CrossToolContinuationTest",
"CrossToolComprehensiveTest",
"LogsValidationTest",
"RedisValidationTest",
"TestModelThinkingConfig",
"TEST_REGISTRY",
]

View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
Base Test Class for Communication Simulator Tests
Provides common functionality and utilities for all simulator tests.
"""
import json
import logging
import os
import subprocess
from typing import Optional
class BaseSimulatorTest:
"""Base class for all communication simulator tests"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.test_files = {}
self.test_dir = None
self.container_name = "gemini-mcp-server"
self.redis_container = "gemini-mcp-redis"
# Configure logging
log_level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s")
self.logger = logging.getLogger(self.__class__.__name__)
def setup_test_files(self):
"""Create test files for the simulation"""
# Test Python file
python_content = '''"""
Sample Python module for testing MCP conversation continuity
"""
def fibonacci(n):
"""Calculate fibonacci number recursively"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
def factorial(n):
"""Calculate factorial iteratively"""
result = 1
for i in range(1, n + 1):
result *= i
return result
class Calculator:
"""Simple calculator class"""
def __init__(self):
self.history = []
def add(self, a, b):
result = a + b
self.history.append(f"{a} + {b} = {result}")
return result
def multiply(self, a, b):
result = a * b
self.history.append(f"{a} * {b} = {result}")
return result
'''
# Test configuration file
config_content = """{
"database": {
"host": "localhost",
"port": 5432,
"name": "testdb",
"ssl": true
},
"cache": {
"redis_url": "redis://localhost:6379",
"ttl": 3600
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
}"""
# Create files in the current project directory
current_dir = os.getcwd()
self.test_dir = os.path.join(current_dir, "test_simulation_files")
os.makedirs(self.test_dir, exist_ok=True)
test_py = os.path.join(self.test_dir, "test_module.py")
test_config = os.path.join(self.test_dir, "config.json")
with open(test_py, "w") as f:
f.write(python_content)
with open(test_config, "w") as f:
f.write(config_content)
# Ensure absolute paths for MCP server compatibility
self.test_files = {"python": os.path.abspath(test_py), "config": os.path.abspath(test_config)}
self.logger.debug(f"Created test files with absolute paths: {list(self.test_files.values())}")
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool via Claude CLI (docker exec)"""
try:
# Prepare the MCP initialization and tool call sequence
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"clientInfo": {"name": "communication-simulator", "version": "1.0.0"},
},
}
# Send initialized notification
initialized_notification = {"jsonrpc": "2.0", "method": "notifications/initialized"}
# Prepare the tool call request
tool_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {"name": tool_name, "arguments": params},
}
# Combine all messages
messages = [json.dumps(init_request), json.dumps(initialized_notification), json.dumps(tool_request)]
# Join with newlines as MCP expects
input_data = "\n".join(messages) + "\n"
# Simulate Claude CLI calling the MCP server via docker exec
docker_cmd = ["docker", "exec", "-i", self.container_name, "python", "server.py"]
self.logger.debug(f"Calling MCP tool {tool_name} with proper initialization")
# Execute the command
result = subprocess.run(
docker_cmd, input=input_data, text=True, capture_output=True, timeout=3600 # 1 hour timeout
)
if result.returncode != 0:
self.logger.error(f"Docker exec failed: {result.stderr}")
return None, None
# Parse the response - look for the tool call response
response_data = self._parse_mcp_response(result.stdout, expected_id=2)
if not response_data:
return None, None
# Extract continuation_id if present
continuation_id = self._extract_continuation_id(response_data)
return response_data, continuation_id
except subprocess.TimeoutExpired:
self.logger.error(f"MCP tool call timed out after 1 hour: {tool_name}")
return None, None
except Exception as e:
self.logger.error(f"MCP tool call failed: {e}")
return None, None
def _parse_mcp_response(self, stdout: str, expected_id: int = 2) -> Optional[str]:
"""Parse MCP JSON-RPC response from stdout"""
try:
lines = stdout.strip().split("\n")
for line in lines:
if line.strip() and line.startswith("{"):
response = json.loads(line)
# Look for the tool call response with the expected ID
if response.get("id") == expected_id and "result" in response:
# Extract the actual content from the response
result = response["result"]
# Handle new response format with 'content' array
if isinstance(result, dict) and "content" in result:
content_array = result["content"]
if isinstance(content_array, list) and len(content_array) > 0:
return content_array[0].get("text", "")
# Handle legacy format
elif isinstance(result, list) and len(result) > 0:
return result[0].get("text", "")
elif response.get("id") == expected_id and "error" in response:
self.logger.error(f"MCP error: {response['error']}")
return None
# If we get here, log all responses for debugging
self.logger.warning(f"No valid tool call response found for ID {expected_id}")
self.logger.debug(f"Full stdout: {stdout}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse MCP response: {e}")
self.logger.debug(f"Stdout that failed to parse: {stdout}")
return None
def _extract_continuation_id(self, response_text: str) -> Optional[str]:
"""Extract continuation_id from response metadata"""
try:
# Parse the response text as JSON to look for continuation metadata
response_data = json.loads(response_text)
# Look for continuation_id in various places
if isinstance(response_data, dict):
# Check metadata
metadata = response_data.get("metadata", {})
if "thread_id" in metadata:
return metadata["thread_id"]
# Check follow_up_request
follow_up = response_data.get("follow_up_request", {})
if follow_up and "continuation_id" in follow_up:
return follow_up["continuation_id"]
# Check continuation_offer
continuation_offer = response_data.get("continuation_offer", {})
if continuation_offer and "continuation_id" in continuation_offer:
return continuation_offer["continuation_id"]
self.logger.debug(f"No continuation_id found in response: {response_data}")
return None
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse response for continuation_id: {e}")
return None
def run_command(self, cmd: list[str], check: bool = True, capture_output: bool = False, **kwargs):
"""Run a shell command with logging"""
if self.verbose:
self.logger.debug(f"Running: {' '.join(cmd)}")
return subprocess.run(cmd, check=check, capture_output=capture_output, **kwargs)
def create_additional_test_file(self, filename: str, content: str) -> str:
"""Create an additional test file for mixed scenario testing"""
if not hasattr(self, "test_dir") or not self.test_dir:
raise RuntimeError("Test directory not initialized. Call setup_test_files() first.")
file_path = os.path.join(self.test_dir, filename)
with open(file_path, "w") as f:
f.write(content)
# Return absolute path for MCP server compatibility
return os.path.abspath(file_path)
def cleanup_test_files(self):
"""Clean up test files"""
if hasattr(self, "test_dir") and self.test_dir and os.path.exists(self.test_dir):
import shutil
shutil.rmtree(self.test_dir)
self.logger.debug(f"Removed test files directory: {self.test_dir}")
def run_test(self) -> bool:
"""Run the test - to be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement run_test()")
@property
def test_name(self) -> str:
"""Get the test name - to be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement test_name property")
@property
def test_description(self) -> str:
"""Get the test description - to be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement test_description property")

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Basic Conversation Flow Test
Tests basic conversation continuity with the chat tool, including:
- Initial chat with file analysis
- Continuing conversation with same file (deduplication)
- Adding additional files to ongoing conversation
"""
from .base_test import BaseSimulatorTest
class BasicConversationTest(BaseSimulatorTest):
"""Test basic conversation flow with chat tool"""
@property
def test_name(self) -> str:
return "basic_conversation"
@property
def test_description(self) -> str:
return "Basic conversation flow with chat tool"
def run_test(self) -> bool:
"""Test basic conversation flow with chat tool"""
try:
self.logger.info("📝 Test: Basic conversation flow")
# Setup test files
self.setup_test_files()
# Initial chat tool call with file
self.logger.info(" 1.1: Initial chat with file analysis")
response1, continuation_id = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Analyze this Python code and explain what it does",
"files": [self.test_files["python"]],
},
)
if not response1 or not continuation_id:
self.logger.error("Failed to get initial response with continuation_id")
return False
self.logger.info(f" ✅ Got continuation_id: {continuation_id}")
# Continue conversation with same file (should be deduplicated)
self.logger.info(" 1.2: Continue conversation with same file")
response2, _ = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Now focus on the Calculator class specifically. Are there any improvements you'd suggest?",
"files": [self.test_files["python"]], # Same file - should be deduplicated
"continuation_id": continuation_id,
},
)
if not response2:
self.logger.error("Failed to continue conversation")
return False
# Continue with additional file
self.logger.info(" 1.3: Continue conversation with additional file")
response3, _ = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Now also analyze this configuration file and see how it might relate to the Python code",
"files": [self.test_files["python"], self.test_files["config"]],
"continuation_id": continuation_id,
},
)
if not response3:
self.logger.error("Failed to continue with additional file")
return False
self.logger.info(" ✅ Basic conversation flow working")
return True
except Exception as e:
self.logger.error(f"Basic conversation flow test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""
Content Validation Test
Tests that tools don't duplicate file content in their responses.
This test is specifically designed to catch content duplication bugs.
"""
import json
import os
from .base_test import BaseSimulatorTest
class ContentValidationTest(BaseSimulatorTest):
"""Test that tools don't duplicate file content in their responses"""
@property
def test_name(self) -> str:
return "content_validation"
@property
def test_description(self) -> str:
return "Content validation and duplicate detection"
def run_test(self) -> bool:
"""Test that tools don't duplicate file content in their responses"""
try:
self.logger.info("📄 Test: Content validation and duplicate detection")
# Setup test files first
self.setup_test_files()
# Create a test file with distinctive content for validation
validation_content = '''"""
Configuration file for content validation testing
This content should appear only ONCE in any tool response
"""
# Configuration constants
MAX_CONTENT_TOKENS = 800_000 # This line should appear exactly once
TEMPERATURE_ANALYTICAL = 0.2 # This should also appear exactly once
UNIQUE_VALIDATION_MARKER = "CONTENT_VALIDATION_TEST_12345"
# Database settings
DATABASE_CONFIG = {
"host": "localhost",
"port": 5432,
"name": "validation_test_db"
}
'''
validation_file = os.path.join(self.test_dir, "validation_config.py")
with open(validation_file, "w") as f:
f.write(validation_content)
# Ensure absolute path for MCP server compatibility
validation_file = os.path.abspath(validation_file)
# Test 1: Precommit tool with files parameter (where the bug occurred)
self.logger.info(" 1: Testing precommit tool content duplication")
# Call precommit tool with the validation file
response1, thread_id = self.call_mcp_tool(
"precommit",
{
"path": os.getcwd(),
"files": [validation_file],
"original_request": "Test for content duplication in precommit tool",
},
)
if response1:
# Parse response and check for content duplication
try:
response_data = json.loads(response1)
content = response_data.get("content", "")
# Count occurrences of distinctive markers
max_content_count = content.count("MAX_CONTENT_TOKENS = 800_000")
temp_analytical_count = content.count("TEMPERATURE_ANALYTICAL = 0.2")
unique_marker_count = content.count("UNIQUE_VALIDATION_MARKER")
# Validate no duplication
duplication_detected = False
issues = []
if max_content_count > 1:
issues.append(f"MAX_CONTENT_TOKENS appears {max_content_count} times")
duplication_detected = True
if temp_analytical_count > 1:
issues.append(f"TEMPERATURE_ANALYTICAL appears {temp_analytical_count} times")
duplication_detected = True
if unique_marker_count > 1:
issues.append(f"UNIQUE_VALIDATION_MARKER appears {unique_marker_count} times")
duplication_detected = True
if duplication_detected:
self.logger.error(f" ❌ Content duplication detected in precommit tool: {'; '.join(issues)}")
return False
else:
self.logger.info(" ✅ No content duplication in precommit tool")
except json.JSONDecodeError:
self.logger.warning(" ⚠️ Could not parse precommit response as JSON")
else:
self.logger.warning(" ⚠️ Precommit tool failed to respond")
# Test 2: Other tools that use files parameter
tools_to_test = [
(
"chat",
{
"prompt": "Please use low thinking mode. Analyze this config file",
"files": [validation_file],
}, # Using absolute path
),
(
"codereview",
{
"files": [validation_file],
"context": "Please use low thinking mode. Review this configuration",
}, # Using absolute path
),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"}), # Using absolute path
]
for tool_name, params in tools_to_test:
self.logger.info(f" 2.{tool_name}: Testing {tool_name} tool content duplication")
response, _ = self.call_mcp_tool(tool_name, params)
if response:
try:
response_data = json.loads(response)
content = response_data.get("content", "")
# Check for duplication
marker_count = content.count("UNIQUE_VALIDATION_MARKER")
if marker_count > 1:
self.logger.error(
f" ❌ Content duplication in {tool_name}: marker appears {marker_count} times"
)
return False
else:
self.logger.info(f" ✅ No content duplication in {tool_name}")
except json.JSONDecodeError:
self.logger.warning(f" ⚠️ Could not parse {tool_name} response")
else:
self.logger.warning(f" ⚠️ {tool_name} tool failed to respond")
# Test 3: Cross-tool content validation with file deduplication
self.logger.info(" 3: Testing cross-tool content consistency")
if thread_id:
# Continue conversation with same file - content should be deduplicated in conversation history
response2, _ = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Continue analyzing this configuration file",
"files": [validation_file], # Same file should be deduplicated
"continuation_id": thread_id,
},
)
if response2:
try:
response_data = json.loads(response2)
content = response_data.get("content", "")
# In continuation, the file content shouldn't be duplicated either
marker_count = content.count("UNIQUE_VALIDATION_MARKER")
if marker_count > 1:
self.logger.error(
f" ❌ Content duplication in cross-tool continuation: marker appears {marker_count} times"
)
return False
else:
self.logger.info(" ✅ No content duplication in cross-tool continuation")
except json.JSONDecodeError:
self.logger.warning(" ⚠️ Could not parse continuation response")
# Cleanup
os.remove(validation_file)
self.logger.info(" ✅ All content validation tests passed")
return True
except Exception as e:
self.logger.error(f"Content validation test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
Comprehensive Cross-Tool Test
Tests file deduplication, conversation continuation, and file handling
across all available MCP tools using realistic workflows with low thinking mode.
Validates:
1. Cross-tool conversation continuation
2. File deduplication across different tools
3. Mixed file scenarios (old + new files)
4. Conversation history preservation
5. Proper tool chaining with context
"""
import subprocess
from .base_test import BaseSimulatorTest
class CrossToolComprehensiveTest(BaseSimulatorTest):
"""Comprehensive test across all MCP tools"""
@property
def test_name(self) -> str:
return "cross_tool_comprehensive"
@property
def test_description(self) -> str:
return "Comprehensive cross-tool file deduplication and continuation"
def get_docker_logs_since(self, since_time: str) -> str:
"""Get docker logs since a specific timestamp"""
try:
# Check both main server and log monitor for comprehensive logs
cmd_server = ["docker", "logs", "--since", since_time, self.container_name]
cmd_monitor = ["docker", "logs", "--since", since_time, "gemini-mcp-log-monitor"]
result_server = subprocess.run(cmd_server, capture_output=True, text=True)
result_monitor = subprocess.run(cmd_monitor, capture_output=True, text=True)
# Combine logs from both containers
combined_logs = result_server.stdout + "\n" + result_monitor.stdout
return combined_logs
except Exception as e:
self.logger.error(f"Failed to get docker logs: {e}")
return ""
def run_test(self) -> bool:
"""Comprehensive cross-tool test with all MCP tools"""
try:
self.logger.info("📄 Test: Comprehensive cross-tool file deduplication and continuation")
# Setup test files
self.setup_test_files()
# Create short test files for quick testing
python_code = """def login(user, pwd):
# Security issue: plain text password
if user == "admin" and pwd == "123":
return True
return False
def hash_pwd(pwd):
# Weak hashing
return str(hash(pwd))
"""
config_file = """{
"db_password": "weak123",
"debug": true,
"secret_key": "test"
}"""
auth_file = self.create_additional_test_file("auth.py", python_code)
config_file_path = self.create_additional_test_file("config.json", config_file)
# Get timestamp for log filtering
import datetime
start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Tool chain: chat → analyze → debug → codereview → precommit
# Each step builds on the previous with cross-tool continuation
current_continuation_id = None
responses = []
# Step 1: Start with chat tool to understand the codebase
self.logger.info(" Step 1: chat tool - Initial codebase exploration")
chat_params = {
"prompt": "Please give me a quick one line reply. I have an authentication module that needs review. Can you help me understand potential issues?",
"files": [auth_file],
"thinking_mode": "low",
}
response1, continuation_id1 = self.call_mcp_tool("chat", chat_params)
if not response1 or not continuation_id1:
self.logger.error(" ❌ Step 1: chat tool failed")
return False
self.logger.info(f" ✅ Step 1: chat completed with continuation_id: {continuation_id1[:8]}...")
responses.append(("chat", response1, continuation_id1))
current_continuation_id = continuation_id1
# Step 2: Use analyze tool to do deeper analysis (fresh conversation)
self.logger.info(" Step 2: analyze tool - Deep code analysis (fresh)")
analyze_params = {
"files": [auth_file],
"question": "Please give me a quick one line reply. What are the security vulnerabilities and architectural issues in this authentication code?",
"thinking_mode": "low",
}
response2, continuation_id2 = self.call_mcp_tool("analyze", analyze_params)
if not response2:
self.logger.error(" ❌ Step 2: analyze tool failed")
return False
self.logger.info(
f" ✅ Step 2: analyze completed with continuation_id: {continuation_id2[:8] if continuation_id2 else 'None'}..."
)
responses.append(("analyze", response2, continuation_id2))
# Step 3: Continue chat conversation with config file
self.logger.info(" Step 3: chat continuation - Add config file context")
chat_continue_params = {
"continuation_id": current_continuation_id,
"prompt": "Please give me a quick one line reply. I also have this configuration file. Can you analyze it alongside the authentication code?",
"files": [auth_file, config_file_path], # Old + new file
"thinking_mode": "low",
}
response3, _ = self.call_mcp_tool("chat", chat_continue_params)
if not response3:
self.logger.error(" ❌ Step 3: chat continuation failed")
return False
self.logger.info(" ✅ Step 3: chat continuation completed")
responses.append(("chat_continue", response3, current_continuation_id))
# Step 4: Use debug tool to identify specific issues
self.logger.info(" Step 4: debug tool - Identify specific problems")
debug_params = {
"files": [auth_file, config_file_path],
"error_description": "Please give me a quick one line reply. The authentication system has security vulnerabilities. Help me identify and fix the main issues.",
"thinking_mode": "low",
}
response4, continuation_id4 = self.call_mcp_tool("debug", debug_params)
if not response4:
self.logger.error(" ❌ Step 4: debug tool failed")
return False
self.logger.info(
f" ✅ Step 4: debug completed with continuation_id: {continuation_id4[:8] if continuation_id4 else 'None'}..."
)
responses.append(("debug", response4, continuation_id4))
# Step 5: Cross-tool continuation - continue debug with chat context
if continuation_id4:
self.logger.info(" Step 5: debug continuation - Additional analysis")
debug_continue_params = {
"continuation_id": continuation_id4,
"files": [auth_file, config_file_path],
"error_description": "Please give me a quick one line reply. What specific code changes would you recommend to fix the password hashing vulnerability?",
"thinking_mode": "low",
}
response5, _ = self.call_mcp_tool("debug", debug_continue_params)
if response5:
self.logger.info(" ✅ Step 5: debug continuation completed")
responses.append(("debug_continue", response5, continuation_id4))
# Step 6: Use codereview for comprehensive review
self.logger.info(" Step 6: codereview tool - Comprehensive code review")
codereview_params = {
"files": [auth_file, config_file_path],
"context": "Please give me a quick one line reply. Comprehensive security-focused code review for production readiness",
"thinking_mode": "low",
}
response6, continuation_id6 = self.call_mcp_tool("codereview", codereview_params)
if not response6:
self.logger.error(" ❌ Step 6: codereview tool failed")
return False
self.logger.info(
f" ✅ Step 6: codereview completed with continuation_id: {continuation_id6[:8] if continuation_id6 else 'None'}..."
)
responses.append(("codereview", response6, continuation_id6))
# Step 7: Create improved version and use precommit
self.logger.info(" Step 7: precommit tool - Pre-commit validation")
# Create a short improved version
improved_code = """import hashlib
def secure_login(user, pwd):
# Better: hashed password check
hashed = hashlib.sha256(pwd.encode()).hexdigest()
if user == "admin" and hashed == "expected_hash":
return True
return False
"""
improved_file = self.create_additional_test_file("auth_improved.py", improved_code)
precommit_params = {
"path": self.test_dir,
"files": [auth_file, config_file_path, improved_file],
"original_request": "Please give me a quick one line reply. Ready to commit security improvements to authentication module",
"thinking_mode": "low",
}
response7, continuation_id7 = self.call_mcp_tool("precommit", precommit_params)
if not response7:
self.logger.error(" ❌ Step 7: precommit tool failed")
return False
self.logger.info(
f" ✅ Step 7: precommit completed with continuation_id: {continuation_id7[:8] if continuation_id7 else 'None'}..."
)
responses.append(("precommit", response7, continuation_id7))
# Validate comprehensive results
self.logger.info(" 📋 Validating comprehensive cross-tool results...")
logs = self.get_docker_logs_since(start_time)
# Validation criteria
tools_used = [r[0] for r in responses]
continuation_ids_created = [r[2] for r in responses if r[2]]
# Check for various log patterns
conversation_logs = [
line for line in logs.split("\n") if "conversation" in line.lower() or "history" in line.lower()
]
embedding_logs = [
line
for line in logs.split("\n")
if "📁" in line or "embedding" in line.lower() or "file" in line.lower()
]
continuation_logs = [
line for line in logs.split("\n") if "continuation" in line.lower() or "resuming" in line.lower()
]
cross_tool_logs = [
line
for line in logs.split("\n")
if any(tool in line.lower() for tool in ["chat", "analyze", "debug", "codereview", "precommit"])
]
# File mentions
auth_file_mentioned = any("auth.py" in line for line in logs.split("\n"))
config_file_mentioned = any("config.json" in line for line in logs.split("\n"))
improved_file_mentioned = any("auth_improved.py" in line for line in logs.split("\n"))
# Print comprehensive diagnostics
self.logger.info(f" 📊 Tools used: {len(tools_used)} ({', '.join(tools_used)})")
self.logger.info(f" 📊 Continuation IDs created: {len(continuation_ids_created)}")
self.logger.info(f" 📊 Conversation logs found: {len(conversation_logs)}")
self.logger.info(f" 📊 File embedding logs found: {len(embedding_logs)}")
self.logger.info(f" 📊 Continuation logs found: {len(continuation_logs)}")
self.logger.info(f" 📊 Cross-tool activity logs: {len(cross_tool_logs)}")
self.logger.info(f" 📊 Auth file mentioned: {auth_file_mentioned}")
self.logger.info(f" 📊 Config file mentioned: {config_file_mentioned}")
self.logger.info(f" 📊 Improved file mentioned: {improved_file_mentioned}")
if self.verbose:
self.logger.debug(" 📋 Sample tool activity logs:")
for log in cross_tool_logs[:10]: # Show first 10
if log.strip():
self.logger.debug(f" {log.strip()}")
self.logger.debug(" 📋 Sample continuation logs:")
for log in continuation_logs[:5]: # Show first 5
if log.strip():
self.logger.debug(f" {log.strip()}")
# Comprehensive success criteria
success_criteria = [
len(tools_used) >= 5, # Used multiple tools
len(continuation_ids_created) >= 3, # Created multiple continuation threads
len(embedding_logs) > 10, # Significant file embedding activity
len(continuation_logs) > 0, # Evidence of continuation
auth_file_mentioned, # Original file processed
config_file_mentioned, # Additional file processed
improved_file_mentioned, # New file processed
len(conversation_logs) > 5, # Conversation history activity
]
passed_criteria = sum(success_criteria)
total_criteria = len(success_criteria)
self.logger.info(f" 📊 Success criteria met: {passed_criteria}/{total_criteria}")
if passed_criteria >= 6: # At least 6 out of 8 criteria
self.logger.info(" ✅ Comprehensive cross-tool test: PASSED")
return True
else:
self.logger.warning(" ⚠️ Comprehensive cross-tool test: FAILED")
self.logger.warning(" 💡 Check logs for detailed cross-tool activity")
return False
except Exception as e:
self.logger.error(f"Comprehensive cross-tool test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""
Cross-Tool Continuation Test
Tests comprehensive cross-tool continuation scenarios to ensure
conversation context is maintained when switching between different tools.
"""
from .base_test import BaseSimulatorTest
class CrossToolContinuationTest(BaseSimulatorTest):
"""Test comprehensive cross-tool continuation scenarios"""
@property
def test_name(self) -> str:
return "cross_tool_continuation"
@property
def test_description(self) -> str:
return "Cross-tool conversation continuation scenarios"
def run_test(self) -> bool:
"""Test comprehensive cross-tool continuation scenarios"""
try:
self.logger.info("🔧 Test: Cross-tool continuation scenarios")
# Setup test files
self.setup_test_files()
success_count = 0
total_scenarios = 3
# Scenario 1: chat -> thinkdeep -> codereview
if self._test_chat_thinkdeep_codereview():
success_count += 1
# Scenario 2: analyze -> debug -> thinkdeep
if self._test_analyze_debug_thinkdeep():
success_count += 1
# Scenario 3: Multi-file cross-tool continuation
if self._test_multi_file_continuation():
success_count += 1
self.logger.info(
f" ✅ Cross-tool continuation scenarios completed: {success_count}/{total_scenarios} scenarios passed"
)
# Consider successful if at least one scenario worked
return success_count > 0
except Exception as e:
self.logger.error(f"Cross-tool continuation test failed: {e}")
return False
finally:
self.cleanup_test_files()
def _test_chat_thinkdeep_codereview(self) -> bool:
"""Test chat -> thinkdeep -> codereview scenario"""
try:
self.logger.info(" 1: Testing chat -> thinkdeep -> codereview")
# Start with chat
chat_response, chat_id = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Look at this Python code and tell me what you think about it",
"files": [self.test_files["python"]],
},
)
if not chat_response or not chat_id:
self.logger.error("Failed to start chat conversation")
return False
# Continue with thinkdeep
thinkdeep_response, _ = self.call_mcp_tool(
"thinkdeep",
{
"prompt": "Please use low thinking mode. Think deeply about potential performance issues in this code",
"files": [self.test_files["python"]], # Same file should be deduplicated
"continuation_id": chat_id,
},
)
if not thinkdeep_response:
self.logger.error("Failed chat -> thinkdeep continuation")
return False
# Continue with codereview
codereview_response, _ = self.call_mcp_tool(
"codereview",
{
"files": [self.test_files["python"]], # Same file should be deduplicated
"context": "Building on our previous analysis, provide a comprehensive code review",
"continuation_id": chat_id,
},
)
if not codereview_response:
self.logger.error("Failed thinkdeep -> codereview continuation")
return False
self.logger.info(" ✅ chat -> thinkdeep -> codereview working")
return True
except Exception as e:
self.logger.error(f"Chat -> thinkdeep -> codereview scenario failed: {e}")
return False
def _test_analyze_debug_thinkdeep(self) -> bool:
"""Test analyze -> debug -> thinkdeep scenario"""
try:
self.logger.info(" 2: Testing analyze -> debug -> thinkdeep")
# Start with analyze
analyze_response, analyze_id = self.call_mcp_tool(
"analyze", {"files": [self.test_files["python"]], "analysis_type": "code_quality"}
)
if not analyze_response or not analyze_id:
self.logger.warning("Failed to start analyze conversation, skipping scenario 2")
return False
# Continue with debug
debug_response, _ = self.call_mcp_tool(
"debug",
{
"files": [self.test_files["python"]], # Same file should be deduplicated
"issue_description": "Based on our analysis, help debug the performance issue in fibonacci",
"continuation_id": analyze_id,
},
)
if not debug_response:
self.logger.warning(" ⚠️ analyze -> debug continuation failed")
return False
# Continue with thinkdeep
final_response, _ = self.call_mcp_tool(
"thinkdeep",
{
"prompt": "Please use low thinking mode. Think deeply about the architectural implications of the issues we've found",
"files": [self.test_files["python"]], # Same file should be deduplicated
"continuation_id": analyze_id,
},
)
if not final_response:
self.logger.warning(" ⚠️ debug -> thinkdeep continuation failed")
return False
self.logger.info(" ✅ analyze -> debug -> thinkdeep working")
return True
except Exception as e:
self.logger.error(f"Analyze -> debug -> thinkdeep scenario failed: {e}")
return False
def _test_multi_file_continuation(self) -> bool:
"""Test multi-file cross-tool continuation"""
try:
self.logger.info(" 3: Testing multi-file cross-tool continuation")
# Start with both files
multi_response, multi_id = self.call_mcp_tool(
"chat",
{
"prompt": "Please use low thinking mode. Analyze both the Python code and configuration file",
"files": [self.test_files["python"], self.test_files["config"]],
},
)
if not multi_response or not multi_id:
self.logger.warning("Failed to start multi-file conversation, skipping scenario 3")
return False
# Switch to codereview with same files (should use conversation history)
multi_review, _ = self.call_mcp_tool(
"codereview",
{
"files": [self.test_files["python"], self.test_files["config"]], # Same files
"context": "Review both files in the context of our previous discussion",
"continuation_id": multi_id,
},
)
if not multi_review:
self.logger.warning(" ⚠️ Multi-file cross-tool continuation failed")
return False
self.logger.info(" ✅ Multi-file cross-tool continuation working")
return True
except Exception as e:
self.logger.error(f"Multi-file continuation scenario failed: {e}")
return False

View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Docker Logs Validation Test
Validates Docker logs to confirm file deduplication behavior and
conversation threading is working properly.
"""
from .base_test import BaseSimulatorTest
class LogsValidationTest(BaseSimulatorTest):
"""Validate Docker logs to confirm file deduplication behavior"""
@property
def test_name(self) -> str:
return "logs_validation"
@property
def test_description(self) -> str:
return "Docker logs validation"
def run_test(self) -> bool:
"""Validate Docker logs to confirm file deduplication behavior"""
try:
self.logger.info("📋 Test: Validating Docker logs for file deduplication...")
# Get server logs from main container
result = self.run_command(["docker", "logs", self.container_name], capture_output=True)
if result.returncode != 0:
self.logger.error(f"Failed to get Docker logs: {result.stderr}")
return False
main_logs = result.stdout.decode() + result.stderr.decode()
# Get logs from log monitor container (where detailed activity is logged)
monitor_result = self.run_command(["docker", "logs", "gemini-mcp-log-monitor"], capture_output=True)
monitor_logs = ""
if monitor_result.returncode == 0:
monitor_logs = monitor_result.stdout.decode() + monitor_result.stderr.decode()
# Also get activity logs for more detailed conversation tracking
activity_result = self.run_command(
["docker", "exec", self.container_name, "cat", "/tmp/mcp_activity.log"], capture_output=True
)
activity_logs = ""
if activity_result.returncode == 0:
activity_logs = activity_result.stdout.decode()
logs = main_logs + "\n" + monitor_logs + "\n" + activity_logs
# Look for conversation threading patterns that indicate the system is working
conversation_patterns = [
"CONVERSATION_RESUME",
"CONVERSATION_CONTEXT",
"previous turns loaded",
"tool embedding",
"files included",
"files truncated",
"already in conversation history",
]
conversation_lines = []
for line in logs.split("\n"):
for pattern in conversation_patterns:
if pattern.lower() in line.lower():
conversation_lines.append(line.strip())
break
# Look for evidence of conversation threading and file handling
conversation_threading_found = False
multi_turn_conversations = False
for line in conversation_lines:
lower_line = line.lower()
if "conversation_resume" in lower_line:
conversation_threading_found = True
self.logger.debug(f"📄 Conversation threading: {line}")
elif "previous turns loaded" in lower_line:
multi_turn_conversations = True
self.logger.debug(f"📄 Multi-turn conversation: {line}")
elif "already in conversation" in lower_line:
self.logger.info(f"✅ Found explicit deduplication: {line}")
return True
# Conversation threading with multiple turns is evidence of file deduplication working
if conversation_threading_found and multi_turn_conversations:
self.logger.info("✅ Conversation threading with multi-turn context working")
self.logger.info(
"✅ File deduplication working implicitly (files embedded once in conversation history)"
)
return True
elif conversation_threading_found:
self.logger.info("✅ Conversation threading detected")
return True
else:
self.logger.warning("⚠️ No clear evidence of conversation threading in logs")
self.logger.debug(f"Found {len(conversation_lines)} conversation-related log lines")
return False
except Exception as e:
self.logger.error(f"Log validation failed: {e}")
return False

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Model Thinking Configuration Test
Tests that thinking configuration is properly applied only to models that support it,
and that Flash models work correctly without thinking config.
"""
from .base_test import BaseSimulatorTest
class TestModelThinkingConfig(BaseSimulatorTest):
"""Test model-specific thinking configuration behavior"""
@property
def test_name(self) -> str:
return "model_thinking_config"
@property
def test_description(self) -> str:
return "Model-specific thinking configuration behavior"
def test_pro_model_with_thinking_config(self):
"""Test that Pro model uses thinking configuration"""
self.logger.info("Testing Pro model with thinking configuration...")
try:
# Test with explicit pro model and high thinking mode
response, continuation_id = self.call_mcp_tool(
"chat",
{
"prompt": "What is 2 + 2? Please think carefully and explain.",
"model": "pro", # Should resolve to gemini-2.5-pro-preview-06-05
"thinking_mode": "high", # Should use thinking_config
},
)
if not response:
raise Exception("Pro model test failed: No response received")
self.logger.info("✅ Pro model with thinking config works correctly")
return True
except Exception as e:
self.logger.error(f"❌ Pro model test failed: {e}")
return False
def test_flash_model_without_thinking_config(self):
"""Test that Flash model works without thinking configuration"""
self.logger.info("Testing Flash model without thinking configuration...")
try:
# Test with explicit flash model and thinking mode (should be ignored)
response, continuation_id = self.call_mcp_tool(
"chat",
{
"prompt": "What is 3 + 3? Give a quick answer.",
"model": "flash", # Should resolve to gemini-2.0-flash-exp
"thinking_mode": "high", # Should be ignored for Flash model
},
)
if not response:
raise Exception("Flash model test failed: No response received")
self.logger.info("✅ Flash model without thinking config works correctly")
return True
except Exception as e:
if "thinking" in str(e).lower() and ("not supported" in str(e).lower() or "invalid" in str(e).lower()):
raise Exception(f"Flash model incorrectly tried to use thinking config: {e}")
self.logger.error(f"❌ Flash model test failed: {e}")
return False
def test_model_resolution_logic(self):
"""Test that model resolution works correctly for both shortcuts and full names"""
self.logger.info("Testing model resolution logic...")
test_cases = [
("pro", "should work with Pro model"),
("flash", "should work with Flash model"),
("gemini-2.5-pro-preview-06-05", "should work with full Pro model name"),
("gemini-2.0-flash-exp", "should work with full Flash model name"),
]
success_count = 0
for model_name, description in test_cases:
try:
response, continuation_id = self.call_mcp_tool(
"chat",
{
"prompt": f"Test with {model_name}: What is 1 + 1?",
"model": model_name,
"thinking_mode": "medium",
},
)
if not response:
raise Exception(f"No response received for model {model_name}")
self.logger.info(f"{model_name} {description}")
success_count += 1
except Exception as e:
self.logger.error(f"{model_name} failed: {e}")
return False
return success_count == len(test_cases)
def test_default_model_behavior(self):
"""Test behavior with server default model (no explicit model specified)"""
self.logger.info("Testing default model behavior...")
try:
# Test without specifying model (should use server default)
response, continuation_id = self.call_mcp_tool(
"chat",
{
"prompt": "Test default model: What is 4 + 4?",
# No model specified - should use DEFAULT_MODEL from config
"thinking_mode": "medium",
},
)
if not response:
raise Exception("Default model test failed: No response received")
self.logger.info("✅ Default model behavior works correctly")
return True
except Exception as e:
self.logger.error(f"❌ Default model test failed: {e}")
return False
def run_test(self) -> bool:
"""Run all model thinking configuration tests"""
self.logger.info(f"📝 Test: {self.test_description}")
try:
# Test Pro model with thinking config
if not self.test_pro_model_with_thinking_config():
return False
# Test Flash model without thinking config
if not self.test_flash_model_without_thinking_config():
return False
# Test model resolution logic
if not self.test_model_resolution_logic():
return False
# Test default model behavior
if not self.test_default_model_behavior():
return False
self.logger.info(f"✅ All {self.test_name} tests passed!")
return True
except Exception as e:
self.logger.error(f"{self.test_name} test failed: {e}")
return False
def main():
"""Run the model thinking configuration tests"""
import sys
verbose = "--verbose" in sys.argv or "-v" in sys.argv
test = TestModelThinkingConfig(verbose=verbose)
success = test.run_test()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""
Per-Tool File Deduplication Test
Tests file deduplication for each individual MCP tool to ensure
that files are properly deduplicated within single-tool conversations.
Validates that:
1. Files are embedded only once in conversation history
2. Continuation calls don't re-read existing files
3. New files are still properly embedded
4. Docker logs show deduplication behavior
"""
import subprocess
from .base_test import BaseSimulatorTest
class PerToolDeduplicationTest(BaseSimulatorTest):
"""Test file deduplication for each individual tool"""
@property
def test_name(self) -> str:
return "per_tool_deduplication"
@property
def test_description(self) -> str:
return "File deduplication for individual tools"
def get_docker_logs_since(self, since_time: str) -> str:
"""Get docker logs since a specific timestamp"""
try:
# Check both main server and log monitor for comprehensive logs
cmd_server = ["docker", "logs", "--since", since_time, self.container_name]
cmd_monitor = ["docker", "logs", "--since", since_time, "gemini-mcp-log-monitor"]
result_server = subprocess.run(cmd_server, capture_output=True, text=True)
result_monitor = subprocess.run(cmd_monitor, capture_output=True, text=True)
# Combine logs from both containers
combined_logs = result_server.stdout + "\n" + result_monitor.stdout
return combined_logs
except Exception as e:
self.logger.error(f"Failed to get docker logs: {e}")
return ""
# create_additional_test_file method now inherited from base class
def validate_file_deduplication_in_logs(self, logs: str, tool_name: str, test_file: str) -> bool:
"""Validate that logs show file deduplication behavior"""
# Look for file embedding messages
embedding_messages = [
line for line in logs.split("\n") if "📁" in line and "embedding" in line and tool_name in line
]
# Look for deduplication/filtering messages
filtering_messages = [
line for line in logs.split("\n") if "📁" in line and "Filtering" in line and tool_name in line
]
skipping_messages = [
line for line in logs.split("\n") if "📁" in line and "skipping" in line and tool_name in line
]
deduplication_found = len(filtering_messages) > 0 or len(skipping_messages) > 0
if deduplication_found:
self.logger.info(f"{tool_name}: Found deduplication evidence in logs")
for msg in filtering_messages + skipping_messages:
self.logger.debug(f" 📁 {msg.strip()}")
else:
self.logger.warning(f" ⚠️ {tool_name}: No deduplication evidence found in logs")
self.logger.debug(f" 📁 All embedding messages: {embedding_messages}")
return deduplication_found
def run_test(self) -> bool:
"""Test file deduplication with realistic precommit/codereview workflow"""
try:
self.logger.info("📄 Test: Simplified file deduplication with precommit/codereview workflow")
# Setup test files
self.setup_test_files()
# Create a short dummy file for quick testing
dummy_content = """def add(a, b):
return a + b # Missing type hints
def divide(x, y):
return x / y # No zero check
"""
dummy_file_path = self.create_additional_test_file("dummy_code.py", dummy_content)
# Get timestamp for log filtering
import datetime
start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Step 1: precommit tool with dummy file (low thinking mode)
self.logger.info(" Step 1: precommit tool with dummy file")
precommit_params = {
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path],
"original_request": "Please give me a quick one line reply. Review this code for commit readiness",
"thinking_mode": "low",
}
response1, continuation_id = self.call_mcp_tool("precommit", precommit_params)
if not response1:
self.logger.error(" ❌ Step 1: precommit tool failed")
return False
if not continuation_id:
self.logger.error(" ❌ Step 1: precommit tool didn't provide continuation_id")
return False
# Validate continuation_id format (should be UUID)
if len(continuation_id) < 32:
self.logger.error(f" ❌ Step 1: Invalid continuation_id format: {continuation_id}")
return False
self.logger.info(f" ✅ Step 1: precommit completed with continuation_id: {continuation_id[:8]}...")
# Step 2: codereview tool with same file (NO continuation - fresh conversation)
self.logger.info(" Step 2: codereview tool with same file (fresh conversation)")
codereview_params = {
"files": [dummy_file_path],
"context": "Please give me a quick one line reply. General code review for quality and best practices",
"thinking_mode": "low",
}
response2, _ = self.call_mcp_tool("codereview", codereview_params)
if not response2:
self.logger.error(" ❌ Step 2: codereview tool failed")
return False
self.logger.info(" ✅ Step 2: codereview completed (fresh conversation)")
# Step 3: Create new file and continue with precommit
self.logger.info(" Step 3: precommit continuation with old + new file")
new_file_content = """def multiply(x, y):
return x * y
def subtract(a, b):
return a - b
"""
new_file_path = self.create_additional_test_file("new_feature.py", new_file_content)
# Continue precommit with both files
continue_params = {
"continuation_id": continuation_id,
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path, new_file_path], # Old + new file
"original_request": "Please give me a quick one line reply. Now also review the new feature file along with the previous one",
"thinking_mode": "low",
}
response3, _ = self.call_mcp_tool("precommit", continue_params)
if not response3:
self.logger.error(" ❌ Step 3: precommit continuation failed")
return False
self.logger.info(" ✅ Step 3: precommit continuation completed")
# Validate results in docker logs
self.logger.info(" 📋 Validating conversation history and file deduplication...")
logs = self.get_docker_logs_since(start_time)
# Check for conversation history building
conversation_logs = [
line for line in logs.split("\n") if "conversation" in line.lower() or "history" in line.lower()
]
# Check for file embedding/deduplication
embedding_logs = [
line
for line in logs.split("\n")
if "📁" in line or "embedding" in line.lower() or "file" in line.lower()
]
# Check for continuation evidence
continuation_logs = [
line for line in logs.split("\n") if "continuation" in line.lower() or continuation_id[:8] in line
]
# Check for both files mentioned
dummy_file_mentioned = any("dummy_code.py" in line for line in logs.split("\n"))
new_file_mentioned = any("new_feature.py" in line for line in logs.split("\n"))
# Print diagnostic information
self.logger.info(f" 📊 Conversation logs found: {len(conversation_logs)}")
self.logger.info(f" 📊 File embedding logs found: {len(embedding_logs)}")
self.logger.info(f" 📊 Continuation logs found: {len(continuation_logs)}")
self.logger.info(f" 📊 Dummy file mentioned: {dummy_file_mentioned}")
self.logger.info(f" 📊 New file mentioned: {new_file_mentioned}")
if self.verbose:
self.logger.debug(" 📋 Sample embedding logs:")
for log in embedding_logs[:5]: # Show first 5
if log.strip():
self.logger.debug(f" {log.strip()}")
self.logger.debug(" 📋 Sample continuation logs:")
for log in continuation_logs[:3]: # Show first 3
if log.strip():
self.logger.debug(f" {log.strip()}")
# Determine success criteria
success_criteria = [
len(embedding_logs) > 0, # File embedding occurred
len(continuation_logs) > 0, # Continuation worked
dummy_file_mentioned, # Original file processed
new_file_mentioned, # New file processed
]
passed_criteria = sum(success_criteria)
total_criteria = len(success_criteria)
self.logger.info(f" 📊 Success criteria met: {passed_criteria}/{total_criteria}")
if passed_criteria >= 3: # At least 3 out of 4 criteria
self.logger.info(" ✅ File deduplication workflow test: PASSED")
return True
else:
self.logger.warning(" ⚠️ File deduplication workflow test: FAILED")
self.logger.warning(" 💡 Check docker logs for detailed file embedding and continuation activity")
return False
except Exception as e:
self.logger.error(f"File deduplication workflow test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""
Redis Conversation Memory Validation Test
Validates that conversation memory is working via Redis by checking
for stored conversation threads and their content.
"""
import json
from .base_test import BaseSimulatorTest
class RedisValidationTest(BaseSimulatorTest):
"""Validate that conversation memory is working via Redis"""
@property
def test_name(self) -> str:
return "redis_validation"
@property
def test_description(self) -> str:
return "Redis conversation memory validation"
def run_test(self) -> bool:
"""Validate that conversation memory is working via Redis"""
try:
self.logger.info("💾 Test: Validating conversation memory via Redis...")
# First, test Redis connectivity
ping_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "ping"], capture_output=True
)
if ping_result.returncode != 0:
self.logger.error("Failed to connect to Redis")
return False
if "PONG" not in ping_result.stdout.decode():
self.logger.error("Redis ping failed")
return False
self.logger.info("✅ Redis connectivity confirmed")
# Check Redis for stored conversations
result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "KEYS", "thread:*"], capture_output=True
)
if result.returncode != 0:
self.logger.error("Failed to query Redis")
return False
keys = result.stdout.decode().strip().split("\n")
thread_keys = [k for k in keys if k.startswith("thread:") and k != "thread:*"]
if thread_keys:
self.logger.info(f"✅ Found {len(thread_keys)} conversation threads in Redis")
# Get details of first thread
thread_key = thread_keys[0]
result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "GET", thread_key], capture_output=True
)
if result.returncode == 0:
thread_data = result.stdout.decode()
try:
parsed = json.loads(thread_data)
turns = parsed.get("turns", [])
self.logger.info(f"✅ Thread has {len(turns)} turns")
return True
except json.JSONDecodeError:
self.logger.warning("Could not parse thread data")
return True
else:
# If no existing threads, create a test thread to validate Redis functionality
self.logger.info("📝 No existing threads found, creating test thread to validate Redis...")
test_thread_id = "test_thread_validation"
test_data = {
"thread_id": test_thread_id,
"turns": [
{"tool": "chat", "timestamp": "2025-06-11T16:30:00Z", "prompt": "Test validation prompt"}
],
}
# Store test data
store_result = self.run_command(
[
"docker",
"exec",
self.redis_container,
"redis-cli",
"SET",
f"thread:{test_thread_id}",
json.dumps(test_data),
],
capture_output=True,
)
if store_result.returncode != 0:
self.logger.error("Failed to store test data in Redis")
return False
# Retrieve test data
retrieve_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "GET", f"thread:{test_thread_id}"],
capture_output=True,
)
if retrieve_result.returncode != 0:
self.logger.error("Failed to retrieve test data from Redis")
return False
retrieved_data = retrieve_result.stdout.decode()
try:
parsed = json.loads(retrieved_data)
if parsed.get("thread_id") == test_thread_id:
self.logger.info("✅ Redis read/write validation successful")
# Clean up test data
self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "DEL", f"thread:{test_thread_id}"],
capture_output=True,
)
return True
else:
self.logger.error("Retrieved data doesn't match stored data")
return False
except json.JSONDecodeError:
self.logger.error("Could not parse retrieved test data")
return False
except Exception as e:
self.logger.error(f"Conversation memory validation failed: {e}")
return False

View File

@@ -196,8 +196,8 @@ class TestClaudeContinuationOffers:
assert response_data.get("continuation_offer") is None
@patch("utils.conversation_memory.get_redis_client")
async def test_threaded_conversation_no_continuation_offer(self, mock_redis):
"""Test that threaded conversations don't get continuation offers"""
async def test_threaded_conversation_with_continuation_offer(self, mock_redis):
"""Test that threaded conversations still get continuation offers when turns remain"""
mock_client = Mock()
mock_redis.return_value = mock_client
@@ -234,9 +234,10 @@ class TestClaudeContinuationOffers:
# Parse response
response_data = json.loads(response[0].text)
# Should be regular success, not continuation offer
assert response_data["status"] == "success"
assert response_data.get("continuation_offer") is None
# Should offer continuation since there are remaining turns (9 remaining: 10 max - 0 current - 1)
assert response_data["status"] == "continuation_available"
assert response_data.get("continuation_offer") is not None
assert response_data["continuation_offer"]["remaining_turns"] == 9
def test_max_turns_reached_no_continuation_offer(self):
"""Test that no continuation is offered when max turns would be exceeded"""
@@ -404,9 +405,11 @@ class TestContinuationIntegration:
# Step 3: Claude uses continuation_id
request2 = ToolRequest(prompt="Now analyze the performance aspects", continuation_id=thread_id)
# This should NOT offer another continuation (already threaded)
# Should still offer continuation if there are remaining turns
continuation_data2 = self.tool._check_continuation_opportunity(request2)
assert continuation_data2 is None
assert continuation_data2 is not None
assert continuation_data2["remaining_turns"] == 8 # MAX_CONVERSATION_TURNS(10) - current_turns(1) - 1
assert continuation_data2["tool_name"] == "test_continuation"
if __name__ == "__main__":

View File

@@ -3,7 +3,7 @@ Tests for configuration
"""
from config import (
GEMINI_MODEL,
DEFAULT_MODEL,
MAX_CONTEXT_TOKENS,
TEMPERATURE_ANALYTICAL,
TEMPERATURE_BALANCED,
@@ -31,7 +31,7 @@ class TestConfig:
def test_model_config(self):
"""Test model configuration"""
assert GEMINI_MODEL == "gemini-2.5-pro-preview-06-05"
assert DEFAULT_MODEL == "gemini-2.5-pro-preview-06-05"
assert MAX_CONTEXT_TOKENS == 1_000_000
def test_temperature_defaults(self):

View File

@@ -166,7 +166,7 @@ class TestConversationMemory:
initial_context={},
)
history = build_conversation_history(context)
history, tokens = build_conversation_history(context)
# Test basic structure
assert "CONVERSATION HISTORY" in history
@@ -207,8 +207,9 @@ class TestConversationMemory:
initial_context={},
)
history = build_conversation_history(context)
history, tokens = build_conversation_history(context)
assert history == ""
assert tokens == 0
class TestConversationFlow:
@@ -373,7 +374,7 @@ class TestConversationFlow:
initial_context={},
)
history = build_conversation_history(context)
history, tokens = build_conversation_history(context)
expected_turn_text = f"Turn {test_max}/{MAX_CONVERSATION_TURNS}"
assert expected_turn_text in history
@@ -595,7 +596,7 @@ class TestConversationFlow:
initial_context={"prompt": "Analyze this codebase", "files": ["/project/src/"]},
)
history = build_conversation_history(final_context)
history, tokens = build_conversation_history(final_context)
# Verify chronological order and speaker identification
assert "--- Turn 1 (Gemini using analyze) ---" in history
@@ -670,7 +671,7 @@ class TestConversationFlow:
mock_client.get.return_value = context_with_followup.model_dump_json()
# Build history to verify follow-up is preserved
history = build_conversation_history(context_with_followup)
history, tokens = build_conversation_history(context_with_followup)
assert "Found potential issue in authentication" in history
assert "[Gemini's Follow-up: Should I examine the authentication middleware?]" in history
@@ -762,7 +763,7 @@ class TestConversationFlow:
)
# Build conversation history (should handle token limits gracefully)
history = build_conversation_history(context)
history, tokens = build_conversation_history(context)
# Verify the history was built successfully
assert "=== CONVERSATION HISTORY ===" in history

View File

@@ -186,8 +186,8 @@ class TestCrossToolContinuation:
response = await self.review_tool.execute(arguments)
response_data = json.loads(response[0].text)
# Should successfully continue the conversation
assert response_data["status"] == "success"
# Should offer continuation since there are remaining turns available
assert response_data["status"] == "continuation_available"
assert "Critical security vulnerability confirmed" in response_data["content"]
# Step 4: Verify the cross-tool continuation worked
@@ -247,7 +247,7 @@ class TestCrossToolContinuation:
# Build conversation history
from utils.conversation_memory import build_conversation_history
history = build_conversation_history(thread_context)
history, tokens = build_conversation_history(thread_context)
# Verify tool names are included in the history
assert "Turn 1 (Gemini using test_analysis)" in history
@@ -307,7 +307,7 @@ class TestCrossToolContinuation:
response = await self.review_tool.execute(arguments)
response_data = json.loads(response[0].text)
assert response_data["status"] == "success"
assert response_data["status"] == "continuation_available"
# Verify files from both tools are tracked in Redis calls
setex_calls = mock_client.setex.call_args_list

View File

@@ -214,15 +214,15 @@ class TestLargePromptHandling:
mock_model.generate_content.return_value = mock_response
mock_create_model.return_value = mock_model
# Mock read_files to avoid file system access
with patch("tools.chat.read_files") as mock_read_files:
mock_read_files.return_value = "File content"
# Mock the centralized file preparation method to avoid file system access
with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files:
mock_prepare_files.return_value = "File content"
await tool.execute({"prompt": "", "files": [temp_prompt_file, other_file]})
# Verify prompt.txt was removed from files list
mock_read_files.assert_called_once()
files_arg = mock_read_files.call_args[0][0]
mock_prepare_files.assert_called_once()
files_arg = mock_prepare_files.call_args[0][0]
assert len(files_arg) == 1
assert files_arg[0] == other_file

View File

@@ -228,10 +228,8 @@ class TestPrecommitTool:
@patch("tools.precommit.find_git_repositories")
@patch("tools.precommit.get_git_status")
@patch("tools.precommit.run_git_command")
@patch("tools.precommit.read_files")
async def test_files_parameter_with_context(
self,
mock_read_files,
mock_run_git,
mock_status,
mock_find_repos,
@@ -254,14 +252,15 @@ class TestPrecommitTool:
(True, ""), # unstaged files list (empty)
]
# Mock read_files
mock_read_files.return_value = "=== FILE: config.py ===\nCONFIG_VALUE = 42\n=== END FILE ==="
# Mock the centralized file preparation method
with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files:
mock_prepare_files.return_value = "=== FILE: config.py ===\nCONFIG_VALUE = 42\n=== END FILE ==="
request = PrecommitRequest(
path="/absolute/repo/path",
files=["/absolute/repo/path/config.py"],
)
result = await tool.prepare_prompt(request)
request = PrecommitRequest(
path="/absolute/repo/path",
files=["/absolute/repo/path/config.py"],
)
result = await tool.prepare_prompt(request)
# Verify context files are included
assert "## Context Files Summary" in result
@@ -316,9 +315,9 @@ class TestPrecommitTool:
(True, ""), # unstaged files (empty)
]
# Mock read_files to return empty (file not found)
with patch("tools.precommit.read_files") as mock_read:
mock_read.return_value = ""
# Mock the centralized file preparation method to return empty (file not found)
with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files:
mock_prepare_files.return_value = ""
result_with_files = await tool.prepare_prompt(request_with_files)
assert "If you need additional context files" not in result_with_files

View File

@@ -0,0 +1,269 @@
"""
Enhanced tests for precommit tool using mock storage to test real logic
"""
import os
import tempfile
from pathlib import Path
from typing import Optional
from unittest.mock import patch
import pytest
from tools.precommit import Precommit, PrecommitRequest
class MockRedisClient:
"""Mock Redis client that uses in-memory dictionary storage"""
def __init__(self):
self.data: dict[str, str] = {}
self.ttl_data: dict[str, int] = {}
def get(self, key: str) -> Optional[str]:
return self.data.get(key)
def set(self, key: str, value: str, ex: Optional[int] = None) -> bool:
self.data[key] = value
if ex:
self.ttl_data[key] = ex
return True
def delete(self, key: str) -> int:
if key in self.data:
del self.data[key]
self.ttl_data.pop(key, None)
return 1
return 0
def exists(self, key: str) -> int:
return 1 if key in self.data else 0
def setex(self, key: str, time: int, value: str) -> bool:
"""Set key to hold string value and set key to timeout after given seconds"""
self.data[key] = value
self.ttl_data[key] = time
return True
class TestPrecommitToolWithMockStore:
"""Test precommit tool with mock storage to validate actual logic"""
@pytest.fixture
def mock_redis(self):
"""Create mock Redis client"""
return MockRedisClient()
@pytest.fixture
def tool(self, mock_redis, temp_repo):
"""Create tool instance with mocked Redis"""
temp_dir, _ = temp_repo
tool = Precommit()
# Mock the Redis client getter and PROJECT_ROOT to allow access to temp files
with (
patch("utils.conversation_memory.get_redis_client", return_value=mock_redis),
patch("utils.file_utils.PROJECT_ROOT", Path(temp_dir).resolve()),
):
yield tool
@pytest.fixture
def temp_repo(self):
"""Create a temporary git repository with test files"""
import subprocess
temp_dir = tempfile.mkdtemp()
# Initialize git repo
subprocess.run(["git", "init"], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=temp_dir, capture_output=True)
# Create test config file
config_content = '''"""Test configuration file"""
# Version and metadata
__version__ = "1.0.0"
__author__ = "Test"
# Configuration
MAX_CONTENT_TOKENS = 800_000 # 800K tokens for content
TEMPERATURE_ANALYTICAL = 0.2 # For code review, debugging
'''
config_path = os.path.join(temp_dir, "config.py")
with open(config_path, "w") as f:
f.write(config_content)
# Add and commit initial version
subprocess.run(["git", "add", "."], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial commit"], cwd=temp_dir, capture_output=True)
# Modify config to create a diff
modified_content = config_content + '\nNEW_SETTING = "test" # Added setting\n'
with open(config_path, "w") as f:
f.write(modified_content)
yield temp_dir, config_path
# Cleanup
import shutil
shutil.rmtree(temp_dir)
@pytest.mark.asyncio
async def test_no_duplicate_file_content_in_prompt(self, tool, temp_repo, mock_redis):
"""Test that file content appears in expected locations
This test validates our design decision that files can legitimately appear in both:
1. Git Diffs section: Shows only changed lines + limited context (wrapped with BEGIN DIFF markers)
2. Additional Context section: Shows complete file content (wrapped with BEGIN FILE markers)
This is intentional, not a bug - the AI needs both perspectives for comprehensive analysis.
"""
temp_dir, config_path = temp_repo
# Create request with files parameter
request = PrecommitRequest(path=temp_dir, files=[config_path], original_request="Test configuration changes")
# Generate the prompt
prompt = await tool.prepare_prompt(request)
# Verify expected sections are present
assert "## Original Request" in prompt
assert "Test configuration changes" in prompt
assert "## Additional Context Files" in prompt
assert "## Git Diffs" in prompt
# Verify the file appears in the git diff
assert "config.py" in prompt
assert "NEW_SETTING" in prompt
# Note: Files can legitimately appear in both git diff AND additional context:
# - Git diff shows only changed lines + limited context
# - Additional context provides complete file content for full understanding
# This is intentional and provides comprehensive context to the AI
@pytest.mark.asyncio
async def test_conversation_memory_integration(self, tool, temp_repo, mock_redis):
"""Test that conversation memory works with mock storage"""
temp_dir, config_path = temp_repo
# Mock conversation memory functions to use our mock redis
with patch("utils.conversation_memory.get_redis_client", return_value=mock_redis):
# First request - should embed file content
PrecommitRequest(path=temp_dir, files=[config_path], original_request="First review")
# Simulate conversation thread creation
from utils.conversation_memory import add_turn, create_thread
thread_id = create_thread("precommit", {"files": [config_path]})
# Test that file embedding works
files_to_embed = tool.filter_new_files([config_path], None)
assert config_path in files_to_embed, "New conversation should embed all files"
# Add a turn to the conversation
add_turn(thread_id, "assistant", "First response", files=[config_path], tool_name="precommit")
# Second request with continuation - should skip already embedded files
PrecommitRequest(
path=temp_dir, files=[config_path], continuation_id=thread_id, original_request="Follow-up review"
)
files_to_embed_2 = tool.filter_new_files([config_path], thread_id)
assert len(files_to_embed_2) == 0, "Continuation should skip already embedded files"
@pytest.mark.asyncio
async def test_prompt_structure_integrity(self, tool, temp_repo, mock_redis):
"""Test that the prompt structure is well-formed and doesn't have content duplication"""
temp_dir, config_path = temp_repo
request = PrecommitRequest(
path=temp_dir,
files=[config_path],
original_request="Validate prompt structure",
review_type="full",
severity_filter="high",
)
prompt = await tool.prepare_prompt(request)
# Split prompt into sections
sections = {
"original_request": "## Original Request",
"review_parameters": "## Review Parameters",
"repo_summary": "## Repository Changes Summary",
"context_files_summary": "## Context Files Summary",
"git_diffs": "## Git Diffs",
"additional_context": "## Additional Context Files",
"review_instructions": "## Review Instructions",
}
section_indices = {}
for name, header in sections.items():
index = prompt.find(header)
if index != -1:
section_indices[name] = index
# Verify sections appear in logical order
assert section_indices["original_request"] < section_indices["review_parameters"]
assert section_indices["review_parameters"] < section_indices["repo_summary"]
assert section_indices["git_diffs"] < section_indices["additional_context"]
assert section_indices["additional_context"] < section_indices["review_instructions"]
# Test that file content only appears in Additional Context section
file_content_start = section_indices["additional_context"]
file_content_end = section_indices["review_instructions"]
file_section = prompt[file_content_start:file_content_end]
prompt[:file_content_start]
after_file_section = prompt[file_content_end:]
# File content should appear in the file section
assert "MAX_CONTENT_TOKENS = 800_000" in file_section
# Check that configuration content appears in the file section
assert "# Configuration" in file_section
# The complete file content should not appear in the review instructions
assert '__version__ = "1.0.0"' in file_section
assert '__version__ = "1.0.0"' not in after_file_section
@pytest.mark.asyncio
async def test_file_content_formatting(self, tool, temp_repo, mock_redis):
"""Test that file content is properly formatted without duplication"""
temp_dir, config_path = temp_repo
# Test the centralized file preparation method directly
file_content = tool._prepare_file_content_for_prompt(
[config_path], None, "Test files", max_tokens=100000, reserve_tokens=1000 # No continuation
)
# Should contain file markers
assert "--- BEGIN FILE:" in file_content
assert "--- END FILE:" in file_content
assert "config.py" in file_content
# Should contain actual file content
assert "MAX_CONTENT_TOKENS = 800_000" in file_content
assert '__version__ = "1.0.0"' in file_content
# Content should appear only once
assert file_content.count("MAX_CONTENT_TOKENS = 800_000") == 1
assert file_content.count('__version__ = "1.0.0"') == 1
def test_mock_redis_basic_operations():
"""Test that our mock Redis implementation works correctly"""
mock_redis = MockRedisClient()
# Test basic operations
assert mock_redis.get("nonexistent") is None
assert mock_redis.exists("nonexistent") == 0
mock_redis.set("test_key", "test_value")
assert mock_redis.get("test_key") == "test_value"
assert mock_redis.exists("test_key") == 1
assert mock_redis.delete("test_key") == 1
assert mock_redis.get("test_key") is None
assert mock_redis.delete("test_key") == 0 # Already deleted

View File

@@ -67,16 +67,16 @@ class TestPromptRegression:
mock_model.generate_content.return_value = mock_model_response()
mock_create_model.return_value = mock_model
# Mock file reading
with patch("tools.chat.read_files") as mock_read_files:
mock_read_files.return_value = "File content here"
# Mock file reading through the centralized method
with patch.object(tool, "_prepare_file_content_for_prompt") as mock_prepare_files:
mock_prepare_files.return_value = "File content here"
result = await tool.execute({"prompt": "Analyze this code", "files": ["/path/to/file.py"]})
assert len(result) == 1
output = json.loads(result[0].text)
assert output["status"] == "success"
mock_read_files.assert_called_once_with(["/path/to/file.py"])
mock_prepare_files.assert_called_once_with(["/path/to/file.py"], None, "Context files")
@pytest.mark.asyncio
async def test_thinkdeep_normal_analysis(self, mock_model_response):

View File

@@ -42,6 +42,8 @@ class AnalyzeTool(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
return {
"type": "object",
"properties": {
@@ -50,6 +52,10 @@ class AnalyzeTool(BaseTool):
"items": {"type": "string"},
"description": "Files or directories to analyze (must be absolute paths)",
},
"model": {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
},
"question": {
"type": "string",
"description": "What to analyze or look for",

View File

@@ -25,7 +25,7 @@ from google.genai import types
from mcp.types import TextContent
from pydantic import BaseModel, Field
from config import GEMINI_MODEL, MAX_CONTEXT_TOKENS, MCP_PROMPT_SIZE_LIMIT
from config import DEFAULT_MODEL, MAX_CONTEXT_TOKENS, MCP_PROMPT_SIZE_LIMIT
from utils import check_token_limit
from utils.conversation_memory import (
MAX_CONVERSATION_TURNS,
@@ -50,7 +50,10 @@ class ToolRequest(BaseModel):
these common fields.
"""
model: Optional[str] = Field(None, description="Model to use (defaults to Gemini 2.5 Pro)")
model: Optional[str] = Field(
None,
description=f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
)
temperature: Optional[float] = Field(None, description="Temperature for response (tool-specific defaults)")
# Thinking mode controls how much computational budget the model uses for reasoning
# Higher values allow for more complex reasoning but increase latency and cost
@@ -189,15 +192,18 @@ class BaseTool(ABC):
# Thread not found, no files embedded
return []
return get_conversation_file_list(thread_context)
embedded_files = get_conversation_file_list(thread_context)
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files")
return embedded_files
def filter_new_files(self, requested_files: list[str], continuation_id: Optional[str]) -> list[str]:
"""
Filter out files that are already embedded in conversation history.
This method takes a list of requested files and removes any that have
already been embedded in the conversation history, preventing duplicate
file embeddings and optimizing token usage.
This method prevents duplicate file embeddings by filtering out files that have
already been embedded in the conversation history. This optimizes token usage
while ensuring tools still have logical access to all requested files through
conversation history references.
Args:
requested_files: List of files requested for current tool execution
@@ -206,19 +212,64 @@ class BaseTool(ABC):
Returns:
list[str]: List of files that need to be embedded (not already in history)
"""
logger.debug(f"[FILES] {self.name}: Filtering {len(requested_files)} requested files")
if not continuation_id:
# New conversation, all files are new
logger.debug(f"[FILES] {self.name}: New conversation, all {len(requested_files)} files are new")
return requested_files
embedded_files = set(self.get_conversation_embedded_files(continuation_id))
try:
embedded_files = set(self.get_conversation_embedded_files(continuation_id))
logger.debug(f"[FILES] {self.name}: Found {len(embedded_files)} embedded files in conversation")
# Return only files that haven't been embedded yet
new_files = [f for f in requested_files if f not in embedded_files]
# Safety check: If no files are marked as embedded but we have a continuation_id,
# this might indicate an issue with conversation history. Be conservative.
if not embedded_files:
logger.debug(
f"📁 {self.name} tool: No files found in conversation history for thread {continuation_id}"
)
logger.debug(
f"[FILES] {self.name}: No embedded files found, returning all {len(requested_files)} requested files"
)
return requested_files
return new_files
# Return only files that haven't been embedded yet
new_files = [f for f in requested_files if f not in embedded_files]
logger.debug(
f"[FILES] {self.name}: After filtering: {len(new_files)} new files, {len(requested_files) - len(new_files)} already embedded"
)
logger.debug(f"[FILES] {self.name}: New files to embed: {new_files}")
# Log filtering results for debugging
if len(new_files) < len(requested_files):
skipped = [f for f in requested_files if f in embedded_files]
logger.debug(
f"📁 {self.name} tool: Filtering {len(skipped)} files already in conversation history: {', '.join(skipped)}"
)
logger.debug(f"[FILES] {self.name}: Skipped (already embedded): {skipped}")
return new_files
except Exception as e:
# If there's any issue with conversation history lookup, be conservative
# and include all files rather than risk losing access to needed files
logger.warning(f"📁 {self.name} tool: Error checking conversation history for {continuation_id}: {e}")
logger.warning(f"📁 {self.name} tool: Including all requested files as fallback")
logger.debug(
f"[FILES] {self.name}: Exception in filter_new_files, returning all {len(requested_files)} files as fallback"
)
return requested_files
def _prepare_file_content_for_prompt(
self, request_files: list[str], continuation_id: Optional[str], context_description: str = "New files"
self,
request_files: list[str],
continuation_id: Optional[str],
context_description: str = "New files",
max_tokens: Optional[int] = None,
reserve_tokens: int = 1_000,
remaining_budget: Optional[int] = None,
arguments: Optional[dict] = None,
) -> str:
"""
Centralized file processing for tool prompts.
@@ -232,6 +283,10 @@ class BaseTool(ABC):
request_files: List of files requested for current tool execution
continuation_id: Thread continuation ID, or None for new conversations
context_description: Description for token limit validation (e.g. "Code", "New files")
max_tokens: Maximum tokens to use (defaults to remaining budget or MAX_CONTENT_TOKENS)
reserve_tokens: Tokens to reserve for additional prompt content (default 1K)
remaining_budget: Remaining token budget after conversation history (from server.py)
arguments: Original tool arguments (used to extract _remaining_tokens if available)
Returns:
str: Formatted file content string ready for prompt inclusion
@@ -239,15 +294,40 @@ class BaseTool(ABC):
if not request_files:
return ""
# Extract remaining budget from arguments if available
if remaining_budget is None:
# Use provided arguments or fall back to stored arguments from execute()
args_to_use = arguments or getattr(self, "_current_arguments", {})
remaining_budget = args_to_use.get("_remaining_tokens")
# Use remaining budget if provided, otherwise fall back to max_tokens or default
if remaining_budget is not None:
effective_max_tokens = remaining_budget - reserve_tokens
elif max_tokens is not None:
effective_max_tokens = max_tokens - reserve_tokens
else:
from config import MAX_CONTENT_TOKENS
effective_max_tokens = MAX_CONTENT_TOKENS - reserve_tokens
# Ensure we have a reasonable minimum budget
effective_max_tokens = max(1000, effective_max_tokens)
files_to_embed = self.filter_new_files(request_files, continuation_id)
logger.debug(f"[FILES] {self.name}: Will embed {len(files_to_embed)} files after filtering")
content_parts = []
# Read content of new files only
if files_to_embed:
logger.debug(f"📁 {self.name} tool embedding {len(files_to_embed)} new files: {', '.join(files_to_embed)}")
logger.debug(
f"[FILES] {self.name}: Starting file embedding with token budget {effective_max_tokens + reserve_tokens:,}"
)
try:
file_content = read_files(files_to_embed)
file_content = read_files(
files_to_embed, max_tokens=effective_max_tokens + reserve_tokens, reserve_tokens=reserve_tokens
)
self._validate_token_limit(file_content, context_description)
content_parts.append(file_content)
@@ -258,9 +338,13 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool successfully embedded {len(files_to_embed)} files ({content_tokens:,} tokens)"
)
logger.debug(f"[FILES] {self.name}: Successfully embedded files - {content_tokens:,} tokens used")
except Exception as e:
logger.error(f"📁 {self.name} tool failed to embed files {files_to_embed}: {type(e).__name__}: {e}")
logger.debug(f"[FILES] {self.name}: File embedding failed - {type(e).__name__}: {e}")
raise
else:
logger.debug(f"[FILES] {self.name}: No files to embed after filtering")
# Generate note about files already in conversation history
if continuation_id and len(files_to_embed) < len(request_files):
@@ -270,6 +354,7 @@ class BaseTool(ABC):
logger.debug(
f"📁 {self.name} tool skipping {len(skipped_files)} files already in conversation history: {', '.join(skipped_files)}"
)
logger.debug(f"[FILES] {self.name}: Adding note about {len(skipped_files)} skipped files")
if content_parts:
content_parts.append("\n\n")
note_lines = [
@@ -279,8 +364,12 @@ class BaseTool(ABC):
"--- END NOTE ---",
]
content_parts.append("\n".join(note_lines))
else:
logger.debug(f"[FILES] {self.name}: No skipped files to note")
return "".join(content_parts) if content_parts else ""
result = "".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] {self.name}: _prepare_file_content_for_prompt returning {len(result)} chars")
return result
def get_websearch_instruction(self, use_websearch: bool, tool_specific: Optional[str] = None) -> str:
"""
@@ -488,6 +577,9 @@ If any of these would strengthen your analysis, specify what Claude should searc
List[TextContent]: Formatted response as MCP TextContent objects
"""
try:
# Store arguments for access by helper methods (like _prepare_file_content_for_prompt)
self._current_arguments = arguments
# Set up logger for this tool execution
logger = logging.getLogger(f"tools.{self.name}")
logger.info(f"Starting {self.name} tool execution with arguments: {list(arguments.keys())}")
@@ -536,7 +628,7 @@ If any of these would strengthen your analysis, specify what Claude should searc
# No need to rebuild it here - prompt already contains conversation history
# Extract model configuration from request or use defaults
model_name = getattr(request, "model", None) or GEMINI_MODEL
model_name = getattr(request, "model", None) or DEFAULT_MODEL
temperature = getattr(request, "temperature", None)
if temperature is None:
temperature = self.get_default_temperature()
@@ -580,11 +672,29 @@ If any of these would strengthen your analysis, specify what Claude should searc
# Catch all exceptions to prevent server crashes
# Return error information in standardized format
logger = logging.getLogger(f"tools.{self.name}")
logger.error(f"Error in {self.name} tool execution: {str(e)}", exc_info=True)
error_msg = str(e)
# Check if this is a 500 INTERNAL error that asks for retry
if "500 INTERNAL" in error_msg and "Please retry" in error_msg:
logger.warning(f"500 INTERNAL error in {self.name} - attempting retry")
try:
# Single retry attempt
model = self._get_model_wrapper(request)
raw_response = await model.generate_content(prompt)
response = raw_response.text
# If successful, process normally
return [TextContent(type="text", text=self._process_response(response, request).model_dump_json())]
except Exception as retry_e:
logger.error(f"Retry failed for {self.name} tool: {str(retry_e)}")
error_msg = f"Tool failed after retry: {str(retry_e)}"
logger.error(f"Error in {self.name} tool execution: {error_msg}", exc_info=True)
error_output = ToolOutput(
status="error",
content=f"Error in {self.name}: {str(e)}",
content=f"Error in {self.name}: {error_msg}",
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]
@@ -811,18 +921,24 @@ If any of these would strengthen your analysis, specify what Claude should searc
Returns:
Dict with continuation data if opportunity should be offered, None otherwise
"""
# Only offer continuation for new conversations (not already threaded)
continuation_id = getattr(request, "continuation_id", None)
if continuation_id:
# This is already a threaded conversation, don't offer continuation
# (either Gemini will ask follow-up or conversation naturally ends)
return None
# Only offer if we haven't reached conversation limits
try:
# For new conversations, we have MAX_CONVERSATION_TURNS - 1 remaining
# (since this response will be turn 1)
remaining_turns = MAX_CONVERSATION_TURNS - 1
if continuation_id:
# Check remaining turns in existing thread
from utils.conversation_memory import get_thread
context = get_thread(continuation_id)
if context:
current_turns = len(context.turns)
remaining_turns = MAX_CONVERSATION_TURNS - current_turns - 1 # -1 for this response
else:
# Thread not found, don't offer continuation
return None
else:
# New conversation, we have MAX_CONVERSATION_TURNS - 1 remaining
# (since this response will be turn 1)
remaining_turns = MAX_CONVERSATION_TURNS - 1
if remaining_turns <= 0:
return None
@@ -951,13 +1067,22 @@ If any of these would strengthen your analysis, specify what Claude should searc
temperature and thinking budget configuration for models that support it.
Args:
model_name: Name of the Gemini model to use
model_name: Name of the Gemini model to use (or shorthand like 'flash', 'pro')
temperature: Temperature setting for response generation
thinking_mode: Thinking depth mode (affects computational budget)
Returns:
Model instance configured and ready for generation
"""
# Define model shorthands for user convenience
model_shorthands = {
"pro": "gemini-2.5-pro-preview-06-05",
"flash": "gemini-2.0-flash-exp",
}
# Resolve shorthand to full model name
resolved_model_name = model_shorthands.get(model_name.lower(), model_name)
# Map thinking modes to computational budget values
# Higher budgets allow for more complex reasoning but increase latency
thinking_budgets = {
@@ -972,7 +1097,7 @@ If any of these would strengthen your analysis, specify what Claude should searc
# Gemini 2.5 models support thinking configuration for enhanced reasoning
# Skip special handling in test environment to allow mocking
if "2.5" in model_name and not os.environ.get("PYTEST_CURRENT_TEST"):
if "2.5" in resolved_model_name and not os.environ.get("PYTEST_CURRENT_TEST"):
try:
# Retrieve API key for Gemini client creation
api_key = os.environ.get("GEMINI_API_KEY")
@@ -1031,7 +1156,7 @@ If any of these would strengthen your analysis, specify what Claude should searc
return ResponseWrapper(response.text)
return ModelWrapper(client, model_name, temperature, thinking_budget)
return ModelWrapper(client, resolved_model_name, temperature, thinking_budget)
except Exception:
# Fall back to regular API if thinking configuration fails
@@ -1084,4 +1209,4 @@ If any of these would strengthen your analysis, specify what Claude should searc
return ResponseWrapper(response.text)
return SimpleModelWrapper(client, model_name, temperature)
return SimpleModelWrapper(client, resolved_model_name, temperature)

View File

@@ -9,7 +9,6 @@ from pydantic import Field
from config import TEMPERATURE_BALANCED
from prompts import CHAT_PROMPT
from utils import read_files
from .base import BaseTool, ToolRequest
from .models import ToolOutput
@@ -45,6 +44,8 @@ class ChatTool(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
return {
"type": "object",
"properties": {
@@ -57,6 +58,10 @@ class ChatTool(BaseTool):
"items": {"type": "string"},
"description": "Optional files for context (must be absolute paths)",
},
"model": {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
},
"temperature": {
"type": "number",
"description": "Response creativity (0-1, default 0.5)",
@@ -116,10 +121,13 @@ class ChatTool(BaseTool):
if updated_files is not None:
request.files = updated_files
# Add context files if provided
# Add context files if provided (using centralized file handling with filtering)
if request.files:
file_content = read_files(request.files)
user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===="
file_content = self._prepare_file_content_for_prompt(
request.files, request.continuation_id, "Context files"
)
if file_content:
user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===="
# Check token limits
self._validate_token_limit(user_content, "Content")

View File

@@ -79,6 +79,8 @@ class CodeReviewTool(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
return {
"type": "object",
"properties": {
@@ -87,6 +89,10 @@ class CodeReviewTool(BaseTool):
"items": {"type": "string"},
"description": "Code files or directories to review (must be absolute paths)",
},
"model": {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
},
"context": {
"type": "string",
"description": "User's summary of what the code does, expected behavior, constraints, and review objectives",

View File

@@ -50,6 +50,8 @@ class DebugIssueTool(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
return {
"type": "object",
"properties": {
@@ -57,6 +59,10 @@ class DebugIssueTool(BaseTool):
"type": "string",
"description": "Error message, symptoms, or issue description",
},
"model": {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
},
"error_context": {
"type": "string",
"description": "Stack trace, logs, or additional error context",

View File

@@ -1,5 +1,11 @@
"""
Tool for pre-commit validation of git changes across multiple repositories.
Design Note - File Content in Multiple Sections:
Files may legitimately appear in both "Git Diffs" and "Additional Context Files" sections:
- Git Diffs: Shows changed lines + limited context (marked with "BEGIN DIFF" / "END DIFF")
- Additional Context: Shows complete file content (marked with "BEGIN FILE" / "END FILE")
This provides comprehensive context for AI analysis - not a duplication bug.
"""
import os
@@ -10,7 +16,7 @@ from pydantic import Field
from config import MAX_CONTEXT_TOKENS
from prompts.tool_prompts import PRECOMMIT_PROMPT
from utils.file_utils import read_files, translate_file_paths, translate_path_for_environment
from utils.file_utils import translate_file_paths, translate_path_for_environment
from utils.git_utils import find_git_repositories, get_git_status, run_git_command
from utils.token_utils import estimate_tokens
@@ -92,7 +98,15 @@ class Precommit(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
schema = self.get_request_model().model_json_schema()
# Ensure model parameter has enhanced description
if "properties" in schema and "model" in schema["properties"]:
schema["properties"]["model"] = {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
}
# Ensure use_websearch is in the schema with proper description
if "properties" in schema and "use_websearch" not in schema["properties"]:
schema["properties"]["use_websearch"] = {
@@ -239,9 +253,12 @@ class Precommit(BaseTool):
staged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for staged changes
# Each diff is wrapped with clear markers to distinguish from full file content
for file_path in staged_files:
success, diff = run_git_command(repo_path, ["diff", "--cached", "--", file_path])
if success and diff.strip():
# Use "BEGIN DIFF" markers (distinct from "BEGIN FILE" markers in utils/file_utils.py)
# This allows AI to distinguish between diff context vs complete file content
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (staged) ---\n"
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
@@ -258,6 +275,7 @@ class Precommit(BaseTool):
unstaged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for unstaged changes
# Same clear marker pattern as staged changes above
for file_path in unstaged_files:
success, diff = run_git_command(repo_path, ["diff", "--", file_path])
if success and diff.strip():
@@ -298,10 +316,12 @@ class Precommit(BaseTool):
if translated_files:
remaining_tokens = max_tokens - total_tokens
# Use standardized file reading with token budget
file_content = read_files(
# Use centralized file handling with filtering for duplicate prevention
file_content = self._prepare_file_content_for_prompt(
translated_files,
max_tokens=remaining_tokens,
request.continuation_id,
"Context files",
max_tokens=remaining_tokens + 1000, # Add back the reserve that was calculated
reserve_tokens=1000, # Small reserve for formatting
)
@@ -370,7 +390,8 @@ class Precommit(BaseTool):
if total_tokens > 0:
prompt_parts.append(f"\nTotal context tokens used: ~{total_tokens:,}")
# Add the diff contents
# Add the diff contents with clear section markers
# Each diff is wrapped with "--- BEGIN DIFF: ... ---" and "--- END DIFF: ... ---"
prompt_parts.append("\n## Git Diffs\n")
if all_diffs:
prompt_parts.extend(all_diffs)
@@ -378,6 +399,11 @@ class Precommit(BaseTool):
prompt_parts.append("--- NO DIFFS FOUND ---")
# Add context files content if provided
# IMPORTANT: Files may legitimately appear in BOTH sections:
# - Git Diffs: Show only changed lines + limited context (what changed)
# - Additional Context: Show complete file content (full understanding)
# This is intentional design for comprehensive AI analysis, not duplication bug.
# Each file in this section is wrapped with "--- BEGIN FILE: ... ---" and "--- END FILE: ... ---"
if context_files_content:
prompt_parts.append("\n## Additional Context Files")
prompt_parts.append(

View File

@@ -48,6 +48,8 @@ class ThinkDeepTool(BaseTool):
)
def get_input_schema(self) -> dict[str, Any]:
from config import DEFAULT_MODEL
return {
"type": "object",
"properties": {
@@ -55,6 +57,10 @@ class ThinkDeepTool(BaseTool):
"type": "string",
"description": "Your current thinking/analysis to extend and validate",
},
"model": {
"type": "string",
"description": f"Model to use: 'pro' (Gemini 2.5 Pro with extended thinking) or 'flash' (Gemini 2.0 Flash - faster). Defaults to '{DEFAULT_MODEL}' if not specified.",
},
"problem_context": {
"type": "string",
"description": "Additional context about the problem or goal",
@@ -78,8 +84,7 @@ class ThinkDeepTool(BaseTool):
"thinking_mode": {
"type": "string",
"enum": ["minimal", "low", "medium", "high", "max"],
"description": "Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768)",
"default": "high",
"description": f"Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768). Defaults to '{self.get_default_thinking_mode()}' if not specified.",
},
"use_websearch": {
"type": "boolean",
@@ -101,8 +106,10 @@ class ThinkDeepTool(BaseTool):
return TEMPERATURE_CREATIVE
def get_default_thinking_mode(self) -> str:
"""ThinkDeep uses high thinking by default"""
return "high"
"""ThinkDeep uses configurable thinking mode, defaults to high"""
from config import DEFAULT_THINKING_MODE_THINKDEEP
return DEFAULT_THINKING_MODE_THINKDEEP
def get_request_model(self):
return ThinkDeepRequest

View File

@@ -250,12 +250,16 @@ def add_turn(
- Turn limits prevent runaway conversations
- File references are preserved for cross-tool access
"""
logger.debug(f"[FLOW] Adding {role} turn to {thread_id} ({tool_name})")
context = get_thread(thread_id)
if not context:
logger.debug(f"[FLOW] Thread {thread_id} not found for turn addition")
return False
# Check turn limit to prevent runaway conversations
if len(context.turns) >= MAX_CONVERSATION_TURNS:
logger.debug(f"[FLOW] Thread {thread_id} at max turns ({MAX_CONVERSATION_TURNS})")
return False
# Create new turn with complete metadata
@@ -277,7 +281,8 @@ def add_turn(
key = f"thread:{thread_id}"
client.setex(key, 3600, context.model_dump_json()) # Refresh TTL to 1 hour
return True
except Exception:
except Exception as e:
logger.debug(f"[FLOW] Failed to save turn to Redis: {type(e).__name__}")
return False
@@ -296,23 +301,33 @@ def get_conversation_file_list(context: ThreadContext) -> list[str]:
list[str]: Deduplicated list of file paths referenced in the conversation
"""
if not context.turns:
logger.debug("[FILES] No turns found, returning empty file list")
return []
# Collect all unique files from all turns, preserving order of first appearance
seen_files = set()
unique_files = []
for turn in context.turns:
logger.debug(f"[FILES] Collecting files from {len(context.turns)} turns")
for i, turn in enumerate(context.turns):
if turn.files:
logger.debug(f"[FILES] Turn {i+1} has {len(turn.files)} files: {turn.files}")
for file_path in turn.files:
if file_path not in seen_files:
seen_files.add(file_path)
unique_files.append(file_path)
logger.debug(f"[FILES] Added new file: {file_path}")
else:
logger.debug(f"[FILES] Duplicate file skipped: {file_path}")
else:
logger.debug(f"[FILES] Turn {i+1} has no files")
logger.debug(f"[FILES] Final unique file list ({len(unique_files)}): {unique_files}")
return unique_files
def build_conversation_history(context: ThreadContext, read_files_func=None) -> str:
def build_conversation_history(context: ThreadContext, read_files_func=None) -> tuple[str, int]:
"""
Build formatted conversation history for tool prompts with embedded file contents.
@@ -325,8 +340,8 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
context: ThreadContext containing the complete conversation
Returns:
str: Formatted conversation history with embedded files ready for inclusion in prompts
Empty string if no conversation turns exist
tuple[str, int]: (formatted_conversation_history, total_tokens_used)
Returns ("", 0) if no conversation turns exist
Format:
- Header with thread metadata and turn count
@@ -341,10 +356,11 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
while preventing duplicate file embeddings.
"""
if not context.turns:
return ""
return "", 0
# Get all unique files referenced in this conversation
all_files = get_conversation_file_list(context)
logger.debug(f"[FILES] Found {len(all_files)} unique files in conversation history")
history_parts = [
"=== CONVERSATION HISTORY ===",
@@ -356,6 +372,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
# Embed all files referenced in this conversation once at the start
if all_files:
logger.debug(f"[FILES] Starting embedding for {len(all_files)} files")
history_parts.extend(
[
"=== FILES REFERENCED IN THIS CONVERSATION ===",
@@ -366,7 +383,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
)
# Import required functions
from config import MAX_CONTEXT_TOKENS
from config import MAX_CONTENT_TOKENS
if read_files_func is None:
from utils.file_utils import read_file_content
@@ -379,32 +396,41 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
for file_path in all_files:
try:
logger.debug(f"[FILES] Processing file {file_path}")
# Correctly unpack the tuple returned by read_file_content
formatted_content, content_tokens = read_file_content(file_path)
if formatted_content:
# read_file_content already returns formatted content, use it directly
# Check if adding this file would exceed the limit
if total_tokens + content_tokens <= MAX_CONTEXT_TOKENS:
if total_tokens + content_tokens <= MAX_CONTENT_TOKENS:
file_contents.append(formatted_content)
total_tokens += content_tokens
files_included += 1
logger.debug(
f"📄 File embedded in conversation history: {file_path} ({content_tokens:,} tokens)"
)
logger.debug(
f"[FILES] Successfully embedded {file_path} - {content_tokens:,} tokens (total: {total_tokens:,})"
)
else:
files_truncated += 1
logger.debug(
f"📄 File truncated due to token limit: {file_path} ({content_tokens:,} tokens, would exceed {MAX_CONTEXT_TOKENS:,} limit)"
f"📄 File truncated due to token limit: {file_path} ({content_tokens:,} tokens, would exceed {MAX_CONTENT_TOKENS:,} limit)"
)
logger.debug(
f"[FILES] File {file_path} would exceed token limit - skipping (would be {total_tokens + content_tokens:,} tokens)"
)
# Stop processing more files
break
else:
logger.debug(f"📄 File skipped (empty content): {file_path}")
logger.debug(f"[FILES] File {file_path} has empty content - skipping")
except Exception as e:
# Skip files that can't be read but log the failure
logger.warning(
f"📄 Failed to embed file in conversation history: {file_path} - {type(e).__name__}: {e}"
)
logger.debug(f"[FILES] Failed to read file {file_path} - {type(e).__name__}: {e}")
continue
if file_contents:
@@ -417,11 +443,15 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
logger.debug(
f"📄 Conversation history file embedding complete: {files_included} files embedded, {files_truncated} truncated, {total_tokens:,} total tokens"
)
logger.debug(
f"[FILES] File embedding summary - {files_included} embedded, {files_truncated} truncated, {total_tokens:,} tokens total"
)
else:
history_parts.append("(No accessible files found)")
logger.debug(
f"📄 Conversation history file embedding: no accessible files found from {len(all_files)} requested"
)
logger.debug(f"[FILES] No accessible files found from {len(all_files)} requested files")
else:
# Fallback to original read_files function for backward compatibility
files_content = read_files_func(all_files)
@@ -434,7 +464,7 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
history_parts.append(files_content)
else:
# Handle token limit exceeded for conversation files
error_message = f"ERROR: The total size of files referenced in this conversation has exceeded the context limit and cannot be displayed.\nEstimated tokens: {estimated_tokens}, but limit is {MAX_CONTEXT_TOKENS}."
error_message = f"ERROR: The total size of files referenced in this conversation has exceeded the context limit and cannot be displayed.\nEstimated tokens: {estimated_tokens}, but limit is {MAX_CONTENT_TOKENS}."
history_parts.append(error_message)
else:
history_parts.append("(No accessible files found)")
@@ -476,7 +506,20 @@ def build_conversation_history(context: ThreadContext, read_files_func=None) ->
["", "=== END CONVERSATION HISTORY ===", "", "Continue this conversation by building on the previous context."]
)
return "\n".join(history_parts)
# Calculate total tokens for the complete conversation history
complete_history = "\n".join(history_parts)
from utils.token_utils import estimate_tokens
total_conversation_tokens = estimate_tokens(complete_history)
# Summary log of what was built
user_turns = len([t for t in context.turns if t.role == "user"])
assistant_turns = len([t for t in context.turns if t.role == "assistant"])
logger.debug(
f"[FLOW] Built conversation history: {user_turns} user + {assistant_turns} assistant turns, {len(all_files)} files, {total_conversation_tokens:,} tokens"
)
return complete_history, total_conversation_tokens
def _is_valid_uuid(val: str) -> bool:

View File

@@ -422,11 +422,14 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
Tuple of (formatted_content, estimated_tokens)
Content is wrapped with clear delimiters for AI parsing
"""
logger.debug(f"[FILES] read_file_content called for: {file_path}")
try:
# Validate path security before any file operations
path = resolve_and_validate_path(file_path)
logger.debug(f"[FILES] Path validated and resolved: {path}")
except (ValueError, PermissionError) as e:
# Return error in a format that provides context to the AI
logger.debug(f"[FILES] Path validation failed for {file_path}: {type(e).__name__}: {e}")
error_msg = str(e)
# Add Docker-specific help if we're in Docker and path is inaccessible
if WORKSPACE_ROOT and CONTAINER_WORKSPACE.exists():
@@ -438,37 +441,54 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
f"To access files in a different directory, please run Claude from that directory."
)
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {error_msg}\n--- END FILE ---\n"
return content, estimate_tokens(content)
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
try:
# Validate file existence and type
if not path.exists():
logger.debug(f"[FILES] File does not exist: {file_path}")
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
if not path.is_file():
logger.debug(f"[FILES] Path is not a file: {file_path}")
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size to prevent memory exhaustion
file_size = path.stat().st_size
logger.debug(f"[FILES] File size for {file_path}: {file_size:,} bytes")
if file_size > max_size:
logger.debug(f"[FILES] File too large: {file_path} ({file_size:,} > {max_size:,} bytes)")
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
logger.debug(f"[FILES] Reading file content for {file_path}")
with open(path, encoding="utf-8", errors="replace") as f:
file_content = f.read()
logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
# NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
# ("--- BEGIN DIFF: ... ---") to allow AI to distinguish between complete file content
# vs. partial diff content when files appear in both sections
formatted = f"\n--- BEGIN FILE: {file_path} ---\n{file_content}\n--- END FILE: {file_path} ---\n"
return formatted, estimate_tokens(formatted)
tokens = estimate_tokens(formatted)
logger.debug(f"[FILES] Formatted content for {file_path}: {len(formatted)} chars, {tokens} tokens")
return formatted, tokens
except Exception as e:
logger.debug(f"[FILES] Exception reading file {file_path}: {type(e).__name__}: {e}")
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
return content, estimate_tokens(content)
tokens = estimate_tokens(content)
logger.debug(f"[FILES] Returning error content for {file_path}: {tokens} tokens")
return content, tokens
def read_files(
@@ -497,6 +517,11 @@ def read_files(
if max_tokens is None:
max_tokens = MAX_CONTEXT_TOKENS
logger.debug(f"[FILES] read_files called with {len(file_paths)} paths")
logger.debug(
f"[FILES] Token budget: max={max_tokens:,}, reserve={reserve_tokens:,}, available={max_tokens - reserve_tokens:,}"
)
content_parts = []
total_tokens = 0
available_tokens = max_tokens - reserve_tokens
@@ -517,31 +542,42 @@ def read_files(
# Priority 2: Process file paths
if file_paths:
# Expand directories to get all individual files
logger.debug(f"[FILES] Expanding {len(file_paths)} file paths")
all_files = expand_paths(file_paths)
logger.debug(f"[FILES] After expansion: {len(all_files)} individual files")
if not all_files and file_paths:
# No files found but paths were provided
logger.debug("[FILES] No files found from provided paths")
content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n")
else:
# Read files sequentially until token limit is reached
for file_path in all_files:
logger.debug(f"[FILES] Reading {len(all_files)} files with token budget {available_tokens:,}")
for i, file_path in enumerate(all_files):
if total_tokens >= available_tokens:
files_skipped.append(file_path)
continue
logger.debug(f"[FILES] Token budget exhausted, skipping remaining {len(all_files) - i} files")
files_skipped.extend(all_files[i:])
break
file_content, file_tokens = read_file_content(file_path)
logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
# Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content)
total_tokens += file_tokens
logger.debug(f"[FILES] Added file {file_path}, total tokens: {total_tokens:,}")
else:
# File too large for remaining budget
logger.debug(
f"[FILES] File {file_path} too large for remaining budget ({file_tokens:,} tokens, {available_tokens - total_tokens:,} remaining)"
)
files_skipped.append(file_path)
# Add informative note about skipped files to help users understand
# what was omitted and why
if files_skipped:
logger.debug(f"[FILES] {len(files_skipped)} files skipped due to token limits")
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files as examples
@@ -552,4 +588,6 @@ def read_files(
skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note)
return "\n\n".join(content_parts) if content_parts else ""
result = "\n\n".join(content_parts) if content_parts else ""
logger.debug(f"[FILES] read_files complete: {len(result)} chars, {total_tokens:,} tokens used")
return result