Lots of tests with live simulation to validate conversation continuation / preservation work across requests

This commit is contained in:
Fahad
2025-06-11 17:16:05 +04:00
parent c90ac7561e
commit 780000f9c9
15 changed files with 272 additions and 2296 deletions

View File

@@ -14,12 +14,12 @@ Test Flow:
Usage:
python communication_simulator_test.py [--verbose] [--keep-logs] [--tests TEST_NAME...] [--individual TEST_NAME] [--skip-docker]
--tests: Run specific tests only (space-separated)
--list-tests: List all available tests
--individual: Run a single test individually
--skip-docker: Skip Docker setup (assumes containers are already running)
Available tests:
basic_conversation - Basic conversation flow with chat tool
per_tool_deduplication - File deduplication for individual tools
@@ -31,16 +31,16 @@ Available tests:
Examples:
# Run all tests
python communication_simulator_test.py
# Run only basic conversation and content validation tests
python communication_simulator_test.py --tests basic_conversation content_validation
# Run a single test individually (with full Docker setup)
python communication_simulator_test.py --individual content_validation
# Run a single test individually (assuming Docker is already running)
python communication_simulator_test.py --individual content_validation --skip-docker
# List available tests
python communication_simulator_test.py --list-tests
"""
@@ -53,7 +53,6 @@ import subprocess
import sys
import tempfile
import time
from typing import Optional
class CommunicationSimulator:
@@ -69,16 +68,16 @@ class CommunicationSimulator:
# Import test registry
from simulator_tests import TEST_REGISTRY
self.test_registry = TEST_REGISTRY
# Available test methods mapping
self.available_tests = {
name: self._create_test_runner(test_class)
for name, test_class in self.test_registry.items()
name: self._create_test_runner(test_class) for name, test_class in self.test_registry.items()
}
# Test result tracking
self.test_results = {test_name: False for test_name in self.test_registry.keys()}
self.test_results = dict.fromkeys(self.test_registry.keys(), False)
# Configure logging
log_level = logging.DEBUG if verbose else logging.INFO
@@ -87,6 +86,7 @@ class CommunicationSimulator:
def _create_test_runner(self, test_class):
"""Create a test runner function for a test class"""
def run_test():
test_instance = test_class(verbose=self.verbose)
result = test_instance.run_test()
@@ -94,6 +94,7 @@ class CommunicationSimulator:
test_name = test_instance.test_name
self.test_results[test_name] = result
return result
return run_test
def setup_test_environment(self) -> bool:
@@ -181,10 +182,10 @@ class CommunicationSimulator:
# If specific tests are selected, run only those
if self.selected_tests:
return self._run_selected_tests()
# Otherwise run all tests in order
test_sequence = list(self.test_registry.keys())
for test_name in test_sequence:
if not self._run_single_test(test_name):
return False
@@ -200,14 +201,14 @@ class CommunicationSimulator:
"""Run only the selected tests"""
try:
self.logger.info(f"🎯 Running selected tests: {', '.join(self.selected_tests)}")
for test_name in self.selected_tests:
if not self._run_single_test(test_name):
return False
self.logger.info("✅ All selected tests passed")
return True
except Exception as e:
self.logger.error(f"Selected tests failed: {e}")
return False
@@ -219,18 +220,18 @@ class CommunicationSimulator:
self.logger.error(f"Unknown test: {test_name}")
self.logger.info(f"Available tests: {', '.join(self.available_tests.keys())}")
return False
self.logger.info(f"🧪 Running test: {test_name}")
test_function = self.available_tests[test_name]
result = test_function()
if result:
self.logger.info(f"✅ Test {test_name} passed")
else:
self.logger.error(f"❌ Test {test_name} failed")
return result
except Exception as e:
self.logger.error(f"Test {test_name} failed with exception: {e}")
return False
@@ -364,7 +365,9 @@ def parse_arguments():
parser.add_argument("--tests", "-t", nargs="+", help="Specific tests to run (space-separated)")
parser.add_argument("--list-tests", action="store_true", help="List available tests and exit")
parser.add_argument("--individual", "-i", help="Run a single test individually")
parser.add_argument("--skip-docker", action="store_true", help="Skip Docker setup (assumes containers are already running)")
parser.add_argument(
"--skip-docker", action="store_true", help="Skip Docker setup (assumes containers are already running)"
)
return parser.parse_args()
@@ -381,14 +384,14 @@ def run_individual_test(simulator, test_name, skip_docker):
"""Run a single test individually"""
try:
success = simulator.run_individual_test(test_name, skip_docker_setup=skip_docker)
if success:
print(f"\\n🎉 INDIVIDUAL TEST {test_name.upper()}: PASSED")
return 0
else:
print(f"\\n❌ INDIVIDUAL TEST {test_name.upper()}: FAILED")
return 1
except KeyboardInterrupt:
print(f"\\n🛑 Individual test {test_name} interrupted by user")
if not skip_docker:
@@ -436,20 +439,16 @@ def main():
return
# Initialize simulator consistently for all use cases
simulator = CommunicationSimulator(
verbose=args.verbose,
keep_logs=args.keep_logs,
selected_tests=args.tests
)
simulator = CommunicationSimulator(verbose=args.verbose, keep_logs=args.keep_logs, selected_tests=args.tests)
# Determine execution mode and run
if args.individual:
exit_code = run_individual_test(simulator, args.individual, args.skip_docker)
else:
exit_code = run_test_suite(simulator, args.skip_docker)
sys.exit(exit_code)
if __name__ == "__main__":
main()
main()

File diff suppressed because it is too large Load Diff

View File

@@ -8,9 +8,9 @@ Each test is in its own file for better organization and maintainability.
from .base_test import BaseSimulatorTest
from .test_basic_conversation import BasicConversationTest
from .test_content_validation import ContentValidationTest
from .test_per_tool_deduplication import PerToolDeduplicationTest
from .test_cross_tool_continuation import CrossToolContinuationTest
from .test_logs_validation import LogsValidationTest
from .test_per_tool_deduplication import PerToolDeduplicationTest
from .test_redis_validation import RedisValidationTest
# Test registry for dynamic loading
@@ -24,12 +24,12 @@ TEST_REGISTRY = {
}
__all__ = [
'BaseSimulatorTest',
'BasicConversationTest',
'ContentValidationTest',
'PerToolDeduplicationTest',
'CrossToolContinuationTest',
'LogsValidationTest',
'RedisValidationTest',
'TEST_REGISTRY'
]
"BaseSimulatorTest",
"BasicConversationTest",
"ContentValidationTest",
"PerToolDeduplicationTest",
"CrossToolContinuationTest",
"LogsValidationTest",
"RedisValidationTest",
"TEST_REGISTRY",
]

View File

@@ -9,9 +9,7 @@ import json
import logging
import os
import subprocess
import tempfile
import time
from typing import Optional, Tuple
from typing import Optional
class BaseSimulatorTest:
@@ -23,7 +21,7 @@ class BaseSimulatorTest:
self.test_dir = None
self.container_name = "gemini-mcp-server"
self.redis_container = "gemini-mcp-redis"
# Configure logging
log_level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s")
@@ -100,7 +98,7 @@ class Calculator:
self.test_files = {"python": test_py, "config": test_config}
self.logger.debug(f"Created test files: {list(self.test_files.values())}")
def call_mcp_tool(self, tool_name: str, params: dict) -> Tuple[Optional[str], Optional[str]]:
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool via Claude CLI (docker exec)"""
try:
# Prepare the MCP initialization and tool call sequence
@@ -237,6 +235,7 @@ class Calculator:
"""Clean up test files"""
if hasattr(self, "test_dir") and self.test_dir and os.path.exists(self.test_dir):
import shutil
shutil.rmtree(self.test_dir)
self.logger.debug(f"Removed test files directory: {self.test_dir}")
@@ -252,4 +251,4 @@ class Calculator:
@property
def test_description(self) -> str:
"""Get the test description - to be implemented by subclasses"""
raise NotImplementedError("Subclasses must implement test_description property")
raise NotImplementedError("Subclasses must implement test_description property")

View File

@@ -34,7 +34,10 @@ class BasicConversationTest(BaseSimulatorTest):
self.logger.info(" 1.1: Initial chat with file analysis")
response1, continuation_id = self.call_mcp_tool(
"chat",
{"prompt": "Please use low thinking mode. Analyze this Python code and explain what it does", "files": [self.test_files["python"]]},
{
"prompt": "Please use low thinking mode. Analyze this Python code and explain what it does",
"files": [self.test_files["python"]],
},
)
if not response1 or not continuation_id:
@@ -80,4 +83,4 @@ class BasicConversationTest(BaseSimulatorTest):
self.logger.error(f"Basic conversation flow test failed: {e}")
return False
finally:
self.cleanup_test_files()
self.cleanup_test_files()

View File

@@ -8,6 +8,7 @@ This test is specifically designed to catch content duplication bugs.
import json
import os
from .base_test import BaseSimulatorTest
@@ -26,10 +27,10 @@ class ContentValidationTest(BaseSimulatorTest):
"""Test that tools don't duplicate file content in their responses"""
try:
self.logger.info("📄 Test: Content validation and duplicate detection")
# Setup test files first
self.setup_test_files()
# Create a test file with distinctive content for validation
validation_content = '''"""
Configuration file for content validation testing
@@ -41,102 +42,110 @@ MAX_CONTENT_TOKENS = 800_000 # This line should appear exactly once
TEMPERATURE_ANALYTICAL = 0.2 # This should also appear exactly once
UNIQUE_VALIDATION_MARKER = "CONTENT_VALIDATION_TEST_12345"
# Database settings
# Database settings
DATABASE_CONFIG = {
"host": "localhost",
"port": 5432,
"name": "validation_test_db"
}
'''
validation_file = os.path.join(self.test_dir, "validation_config.py")
with open(validation_file, "w") as f:
f.write(validation_content)
# Test 1: Precommit tool with files parameter (where the bug occurred)
self.logger.info(" 1: Testing precommit tool content duplication")
# Call precommit tool with the validation file
response1, thread_id = self.call_mcp_tool(
"precommit",
"precommit",
{
"path": os.getcwd(),
"files": [validation_file],
"original_request": "Test for content duplication in precommit tool"
}
"original_request": "Test for content duplication in precommit tool",
},
)
if response1:
# Parse response and check for content duplication
try:
response_data = json.loads(response1)
content = response_data.get("content", "")
# Count occurrences of distinctive markers
max_content_count = content.count("MAX_CONTENT_TOKENS = 800_000")
temp_analytical_count = content.count("TEMPERATURE_ANALYTICAL = 0.2")
unique_marker_count = content.count("UNIQUE_VALIDATION_MARKER")
# Validate no duplication
duplication_detected = False
issues = []
if max_content_count > 1:
issues.append(f"MAX_CONTENT_TOKENS appears {max_content_count} times")
duplication_detected = True
if temp_analytical_count > 1:
issues.append(f"TEMPERATURE_ANALYTICAL appears {temp_analytical_count} times")
duplication_detected = True
if unique_marker_count > 1:
issues.append(f"UNIQUE_VALIDATION_MARKER appears {unique_marker_count} times")
duplication_detected = True
if duplication_detected:
self.logger.error(f" ❌ Content duplication detected in precommit tool: {'; '.join(issues)}")
return False
else:
self.logger.info(" ✅ No content duplication in precommit tool")
except json.JSONDecodeError:
self.logger.warning(" ⚠️ Could not parse precommit response as JSON")
else:
self.logger.warning(" ⚠️ Precommit tool failed to respond")
# Test 2: Other tools that use files parameter
tools_to_test = [
("chat", {"prompt": "Please use low thinking mode. Analyze this config file", "files": [validation_file]}),
("codereview", {"files": [validation_file], "context": "Please use low thinking mode. Review this configuration"}),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"})
(
"chat",
{"prompt": "Please use low thinking mode. Analyze this config file", "files": [validation_file]},
),
(
"codereview",
{"files": [validation_file], "context": "Please use low thinking mode. Review this configuration"},
),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"}),
]
for tool_name, params in tools_to_test:
self.logger.info(f" 2.{tool_name}: Testing {tool_name} tool content duplication")
response, _ = self.call_mcp_tool(tool_name, params)
if response:
try:
response_data = json.loads(response)
content = response_data.get("content", "")
# Check for duplication
marker_count = content.count("UNIQUE_VALIDATION_MARKER")
if marker_count > 1:
self.logger.error(f" ❌ Content duplication in {tool_name}: marker appears {marker_count} times")
self.logger.error(
f" ❌ Content duplication in {tool_name}: marker appears {marker_count} times"
)
return False
else:
self.logger.info(f" ✅ No content duplication in {tool_name}")
except json.JSONDecodeError:
self.logger.warning(f" ⚠️ Could not parse {tool_name} response")
else:
self.logger.warning(f" ⚠️ {tool_name} tool failed to respond")
# Test 3: Cross-tool content validation with file deduplication
self.logger.info(" 3: Testing cross-tool content consistency")
if thread_id:
# Continue conversation with same file - content should be deduplicated in conversation history
response2, _ = self.call_mcp_tool(
@@ -147,31 +156,33 @@ DATABASE_CONFIG = {
"continuation_id": thread_id,
},
)
if response2:
try:
response_data = json.loads(response2)
content = response_data.get("content", "")
# In continuation, the file content shouldn't be duplicated either
marker_count = content.count("UNIQUE_VALIDATION_MARKER")
if marker_count > 1:
self.logger.error(f" ❌ Content duplication in cross-tool continuation: marker appears {marker_count} times")
self.logger.error(
f" ❌ Content duplication in cross-tool continuation: marker appears {marker_count} times"
)
return False
else:
self.logger.info(" ✅ No content duplication in cross-tool continuation")
except json.JSONDecodeError:
self.logger.warning(" ⚠️ Could not parse continuation response")
# Cleanup
os.remove(validation_file)
self.logger.info(" ✅ All content validation tests passed")
return True
except Exception as e:
self.logger.error(f"Content validation test failed: {e}")
return False
finally:
self.cleanup_test_files()
self.cleanup_test_files()

View File

@@ -43,8 +43,10 @@ class CrossToolContinuationTest(BaseSimulatorTest):
if self._test_multi_file_continuation():
success_count += 1
self.logger.info(f" ✅ Cross-tool continuation scenarios completed: {success_count}/{total_scenarios} scenarios passed")
self.logger.info(
f" ✅ Cross-tool continuation scenarios completed: {success_count}/{total_scenarios} scenarios passed"
)
# Consider successful if at least one scenario worked
return success_count > 0
@@ -193,4 +195,4 @@ class CrossToolContinuationTest(BaseSimulatorTest):
except Exception as e:
self.logger.error(f"Multi-file continuation scenario failed: {e}")
return False
return False

View File

@@ -96,4 +96,4 @@ class LogsValidationTest(BaseSimulatorTest):
except Exception as e:
self.logger.error(f"Log validation failed: {e}")
return False
return False

View File

@@ -32,16 +32,22 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
(
"thinkdeep",
{
"prompt": "Please use low thinking mode. Think deeply about this Python code and identify potential architectural improvements",
"current_analysis": "Please use low thinking mode. I'm analyzing this Python code to identify potential architectural improvements",
"files": [self.test_files["python"]],
},
),
("analyze", {"files": [self.test_files["python"]], "analysis_type": "architecture"}),
(
"analyze",
{
"files": [self.test_files["python"]],
"question": "Please use low thinking mode. What are the architectural patterns in this code?",
},
),
(
"debug",
{
"files": [self.test_files["python"]],
"issue_description": "The fibonacci function seems slow for large numbers",
"error_description": "Please use low thinking mode. The fibonacci function seems slow for large numbers",
},
),
(
@@ -74,11 +80,17 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
continue_params["continuation_id"] = continuation_id
if tool_name == "thinkdeep":
continue_params["prompt"] = "Please use low thinking mode. Now focus specifically on the recursive fibonacci implementation"
continue_params["current_analysis"] = (
"Please use low thinking mode. Now focus specifically on the recursive fibonacci implementation"
)
elif tool_name == "analyze":
continue_params["analysis_type"] = "performance"
continue_params["question"] = (
"Please use low thinking mode. What are the performance characteristics of this code?"
)
elif tool_name == "debug":
continue_params["issue_description"] = "How can we optimize the fibonacci function?"
continue_params["error_description"] = (
"Please use low thinking mode. How can we optimize the fibonacci function?"
)
elif tool_name == "codereview":
continue_params["context"] = "Focus on the Calculator class implementation"
@@ -89,8 +101,10 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
else:
self.logger.warning(f" ⚠️ {tool_name} tool continuation failed")
self.logger.info(f" ✅ Per-tool file deduplication tests completed: {successful_tests}/{total_tests} tools passed")
self.logger.info(
f" ✅ Per-tool file deduplication tests completed: {successful_tests}/{total_tests} tools passed"
)
# Consider test successful if at least one tool worked
return successful_tests > 0
@@ -98,4 +112,4 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
self.logger.error(f"Per-tool file deduplication test failed: {e}")
return False
finally:
self.cleanup_test_files()
self.cleanup_test_files()

View File

@@ -7,6 +7,7 @@ for stored conversation threads and their content.
"""
import json
from .base_test import BaseSimulatorTest
@@ -30,15 +31,15 @@ class RedisValidationTest(BaseSimulatorTest):
ping_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "ping"], capture_output=True
)
if ping_result.returncode != 0:
self.logger.error("Failed to connect to Redis")
return False
if "PONG" not in ping_result.stdout.decode():
self.logger.error("Redis ping failed")
return False
self.logger.info("✅ Redis connectivity confirmed")
# Check Redis for stored conversations
@@ -76,51 +77,55 @@ class RedisValidationTest(BaseSimulatorTest):
else:
# If no existing threads, create a test thread to validate Redis functionality
self.logger.info("📝 No existing threads found, creating test thread to validate Redis...")
test_thread_id = "test_thread_validation"
test_data = {
"thread_id": test_thread_id,
"turns": [
{
"tool": "chat",
"timestamp": "2025-06-11T16:30:00Z",
"prompt": "Test validation prompt"
}
]
{"tool": "chat", "timestamp": "2025-06-11T16:30:00Z", "prompt": "Test validation prompt"}
],
}
# Store test data
store_result = self.run_command([
"docker", "exec", self.redis_container, "redis-cli",
"SET", f"thread:{test_thread_id}", json.dumps(test_data)
], capture_output=True)
store_result = self.run_command(
[
"docker",
"exec",
self.redis_container,
"redis-cli",
"SET",
f"thread:{test_thread_id}",
json.dumps(test_data),
],
capture_output=True,
)
if store_result.returncode != 0:
self.logger.error("Failed to store test data in Redis")
return False
# Retrieve test data
retrieve_result = self.run_command([
"docker", "exec", self.redis_container, "redis-cli",
"GET", f"thread:{test_thread_id}"
], capture_output=True)
retrieve_result = self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "GET", f"thread:{test_thread_id}"],
capture_output=True,
)
if retrieve_result.returncode != 0:
self.logger.error("Failed to retrieve test data from Redis")
return False
retrieved_data = retrieve_result.stdout.decode()
try:
parsed = json.loads(retrieved_data)
if parsed.get("thread_id") == test_thread_id:
self.logger.info("✅ Redis read/write validation successful")
# Clean up test data
self.run_command([
"docker", "exec", self.redis_container, "redis-cli",
"DEL", f"thread:{test_thread_id}"
], capture_output=True)
self.run_command(
["docker", "exec", self.redis_container, "redis-cli", "DEL", f"thread:{test_thread_id}"],
capture_output=True,
)
return True
else:
self.logger.error("Retrieved data doesn't match stored data")
@@ -131,4 +136,4 @@ class RedisValidationTest(BaseSimulatorTest):
except Exception as e:
self.logger.error(f"Conversation memory validation failed: {e}")
return False
return False

View File

@@ -1,16 +0,0 @@
{
"database": {
"host": "localhost",
"port": 5432,
"name": "testdb",
"ssl": true
},
"cache": {
"redis_url": "redis://localhost:6379",
"ttl": 3600
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
}

View File

@@ -1,32 +0,0 @@
"""
Sample Python module for testing MCP conversation continuity
"""
def fibonacci(n):
"""Calculate fibonacci number recursively"""
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
def factorial(n):
"""Calculate factorial iteratively"""
result = 1
for i in range(1, n + 1):
result *= i
return result
class Calculator:
"""Simple calculator class"""
def __init__(self):
self.history = []
def add(self, a, b):
result = a + b
self.history.append(f"{a} + {b} = {result}")
return result
def multiply(self, a, b):
result = a * b
self.history.append(f"{a} * {b} = {result}")
return result

View File

@@ -1,16 +0,0 @@
"""
Configuration file for content validation testing
This content should appear only ONCE in any tool response
"""
# Configuration constants
MAX_CONTENT_TOKENS = 800_000 # This line should appear exactly once
TEMPERATURE_ANALYTICAL = 0.2 # This should also appear exactly once
UNIQUE_VALIDATION_MARKER = "CONTENT_VALIDATION_TEST_12345"
# Database settings
DATABASE_CONFIG = {
"host": "localhost",
"port": 5432,
"name": "validation_test_db"
}

View File

@@ -2,11 +2,11 @@
Enhanced tests for precommit tool using mock storage to test real logic
"""
import json
import tempfile
import os
from unittest.mock import Mock, patch, MagicMock
from typing import Dict, Any, Optional
import tempfile
from pathlib import Path
from typing import Optional
from unittest.mock import patch
import pytest
@@ -15,60 +15,70 @@ from tools.precommit import Precommit, PrecommitRequest
class MockRedisClient:
"""Mock Redis client that uses in-memory dictionary storage"""
def __init__(self):
self.data: Dict[str, str] = {}
self.ttl_data: Dict[str, int] = {}
self.data: dict[str, str] = {}
self.ttl_data: dict[str, int] = {}
def get(self, key: str) -> Optional[str]:
return self.data.get(key)
def set(self, key: str, value: str, ex: Optional[int] = None) -> bool:
self.data[key] = value
if ex:
self.ttl_data[key] = ex
return True
def delete(self, key: str) -> int:
if key in self.data:
del self.data[key]
self.ttl_data.pop(key, None)
return 1
return 0
def exists(self, key: str) -> int:
return 1 if key in self.data else 0
def setex(self, key: str, time: int, value: str) -> bool:
"""Set key to hold string value and set key to timeout after given seconds"""
self.data[key] = value
self.ttl_data[key] = time
return True
class TestPrecommitToolWithMockStore:
"""Test precommit tool with mock storage to validate actual logic"""
@pytest.fixture
def mock_redis(self):
"""Create mock Redis client"""
return MockRedisClient()
@pytest.fixture
def tool(self, mock_redis):
def tool(self, mock_redis, temp_repo):
"""Create tool instance with mocked Redis"""
temp_dir, _ = temp_repo
tool = Precommit()
# Mock the Redis client getter to return our mock
with patch('utils.conversation_memory.get_redis_client', return_value=mock_redis):
# Mock the Redis client getter and PROJECT_ROOT to allow access to temp files
with (
patch("utils.conversation_memory.get_redis_client", return_value=mock_redis),
patch("utils.file_utils.PROJECT_ROOT", Path(temp_dir).resolve()),
):
yield tool
@pytest.fixture
def temp_repo(self):
"""Create a temporary git repository with test files"""
import subprocess
temp_dir = tempfile.mkdtemp()
# Initialize git repo
subprocess.run(['git', 'init'], cwd=temp_dir, capture_output=True)
subprocess.run(['git', 'config', 'user.name', 'Test'], cwd=temp_dir, capture_output=True)
subprocess.run(['git', 'config', 'user.email', 'test@example.com'], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "init"], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@example.com"], cwd=temp_dir, capture_output=True)
# Create test config file
config_content = '''"""Test configuration file"""
@@ -80,182 +90,173 @@ __author__ = "Test"
MAX_CONTENT_TOKENS = 800_000 # 800K tokens for content
TEMPERATURE_ANALYTICAL = 0.2 # For code review, debugging
'''
config_path = os.path.join(temp_dir, 'config.py')
with open(config_path, 'w') as f:
config_path = os.path.join(temp_dir, "config.py")
with open(config_path, "w") as f:
f.write(config_content)
# Add and commit initial version
subprocess.run(['git', 'add', '.'], cwd=temp_dir, capture_output=True)
subprocess.run(['git', 'commit', '-m', 'Initial commit'], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "add", "."], cwd=temp_dir, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial commit"], cwd=temp_dir, capture_output=True)
# Modify config to create a diff
modified_content = config_content + '\nNEW_SETTING = "test" # Added setting\n'
with open(config_path, 'w') as f:
with open(config_path, "w") as f:
f.write(modified_content)
yield temp_dir, config_path
# Cleanup
import shutil
shutil.rmtree(temp_dir)
@pytest.mark.asyncio
async def test_no_duplicate_file_content_in_prompt(self, tool, temp_repo, mock_redis):
"""Test that file content doesn't appear twice in the generated prompt"""
"""Test that file content appears in expected locations"""
temp_dir, config_path = temp_repo
# Create request with files parameter
request = PrecommitRequest(
path=temp_dir,
files=[config_path],
original_request="Test configuration changes"
)
# Create request with files parameter
request = PrecommitRequest(path=temp_dir, files=[config_path], original_request="Test configuration changes")
# Generate the prompt
prompt = await tool.prepare_prompt(request)
# Test that MAX_CONTENT_TOKENS only appears once in the entire prompt
max_content_count = prompt.count('MAX_CONTENT_TOKENS = 800_000')
assert max_content_count == 1, f"MAX_CONTENT_TOKENS appears {max_content_count} times (should be 1)"
# Test that the config file content only appears once
config_content_count = prompt.count('# Configuration')
assert config_content_count == 1, f"Config file content appears {config_content_count} times (should be 1)"
# Verify expected sections are present
assert "## Original Request" in prompt
assert "Test configuration changes" in prompt
assert "## Additional Context Files" in prompt
assert "## Git Diffs" in prompt
# Verify the file appears in the git diff
assert "config.py" in prompt
assert "NEW_SETTING" in prompt
# Note: Files can legitimately appear in both git diff AND additional context:
# - Git diff shows only changed lines + limited context
# - Additional context provides complete file content for full understanding
# This is intentional and provides comprehensive context to the AI
@pytest.mark.asyncio
async def test_conversation_memory_integration(self, tool, temp_repo, mock_redis):
"""Test that conversation memory works with mock storage"""
temp_dir, config_path = temp_repo
# Mock conversation memory functions to use our mock redis
with patch('utils.conversation_memory.get_redis_client', return_value=mock_redis):
with patch("utils.conversation_memory.get_redis_client", return_value=mock_redis):
# First request - should embed file content
request1 = PrecommitRequest(
path=temp_dir,
files=[config_path],
original_request="First review"
)
PrecommitRequest(path=temp_dir, files=[config_path], original_request="First review")
# Simulate conversation thread creation
from utils.conversation_memory import create_thread, add_turn
from utils.conversation_memory import add_turn, create_thread
thread_id = create_thread("precommit", {"files": [config_path]})
# Test that file embedding works
files_to_embed = tool.filter_new_files([config_path], None)
assert config_path in files_to_embed, "New conversation should embed all files"
# Add a turn to the conversation
add_turn(thread_id, "assistant", "First response", files=[config_path], tool_name="precommit")
# Second request with continuation - should skip already embedded files
request2 = PrecommitRequest(
path=temp_dir,
files=[config_path],
continuation_id=thread_id,
original_request="Follow-up review"
PrecommitRequest(
path=temp_dir, files=[config_path], continuation_id=thread_id, original_request="Follow-up review"
)
files_to_embed_2 = tool.filter_new_files([config_path], thread_id)
assert len(files_to_embed_2) == 0, "Continuation should skip already embedded files"
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_prompt_structure_integrity(self, tool, temp_repo, mock_redis):
"""Test that the prompt structure is well-formed and doesn't have content duplication"""
temp_dir, config_path = temp_repo
request = PrecommitRequest(
path=temp_dir,
files=[config_path],
original_request="Validate prompt structure",
review_type="full",
severity_filter="high"
severity_filter="high",
)
prompt = await tool.prepare_prompt(request)
# Split prompt into sections
sections = {
"original_request": "## Original Request",
"review_parameters": "## Review Parameters",
"review_parameters": "## Review Parameters",
"repo_summary": "## Repository Changes Summary",
"context_files_summary": "## Context Files Summary",
"git_diffs": "## Git Diffs",
"additional_context": "## Additional Context Files",
"review_instructions": "## Review Instructions"
"review_instructions": "## Review Instructions",
}
section_indices = {}
for name, header in sections.items():
index = prompt.find(header)
if index != -1:
section_indices[name] = index
# Verify sections appear in logical order
assert section_indices["original_request"] < section_indices["review_parameters"]
assert section_indices["review_parameters"] < section_indices["repo_summary"]
assert section_indices["review_parameters"] < section_indices["repo_summary"]
assert section_indices["git_diffs"] < section_indices["additional_context"]
assert section_indices["additional_context"] < section_indices["review_instructions"]
# Test that file content only appears in Additional Context section
file_content_start = section_indices["additional_context"]
file_content_end = section_indices["review_instructions"]
file_section = prompt[file_content_start:file_content_end]
before_file_section = prompt[:file_content_start]
prompt[:file_content_start]
after_file_section = prompt[file_content_end:]
# MAX_CONTENT_TOKENS should only appear in the file section
assert 'MAX_CONTENT_TOKENS' in file_section
assert 'MAX_CONTENT_TOKENS' not in before_file_section
assert 'MAX_CONTENT_TOKENS' not in after_file_section
# File content should appear in the file section
assert "MAX_CONTENT_TOKENS = 800_000" in file_section
# Check that configuration content appears in the file section
assert "# Configuration" in file_section
# The complete file content should not appear in the review instructions
assert '__version__ = "1.0.0"' in file_section
assert '__version__ = "1.0.0"' not in after_file_section
@pytest.mark.asyncio
async def test_file_content_formatting(self, tool, temp_repo, mock_redis):
"""Test that file content is properly formatted without duplication"""
temp_dir, config_path = temp_repo
# Test the centralized file preparation method directly
file_content = tool._prepare_file_content_for_prompt(
[config_path],
None, # No continuation
"Test files",
max_tokens=100000,
reserve_tokens=1000
[config_path], None, "Test files", max_tokens=100000, reserve_tokens=1000 # No continuation
)
# Should contain file markers
assert "--- BEGIN FILE:" in file_content
assert "--- END FILE:" in file_content
assert "config.py" in file_content
# Should contain actual file content
assert "MAX_CONTENT_TOKENS = 800_000" in file_content
assert "__version__ = \"1.0.0\"" in file_content
assert '__version__ = "1.0.0"' in file_content
# Content should appear only once
assert file_content.count("MAX_CONTENT_TOKENS = 800_000") == 1
assert file_content.count("__version__ = \"1.0.0\"") == 1
assert file_content.count('__version__ = "1.0.0"') == 1
def test_mock_redis_basic_operations():
"""Test that our mock Redis implementation works correctly"""
mock_redis = MockRedisClient()
# Test basic operations
assert mock_redis.get("nonexistent") is None
assert mock_redis.exists("nonexistent") == 0
mock_redis.set("test_key", "test_value")
assert mock_redis.get("test_key") == "test_value"
assert mock_redis.exists("test_key") == 1
assert mock_redis.delete("test_key") == 1
assert mock_redis.get("test_key") is None
assert mock_redis.delete("test_key") == 0 # Already deleted
assert mock_redis.delete("test_key") == 0 # Already deleted

View File

@@ -10,7 +10,7 @@ from pydantic import Field
from config import MAX_CONTEXT_TOKENS
from prompts.tool_prompts import PRECOMMIT_PROMPT
from utils.file_utils import read_files, translate_file_paths, translate_path_for_environment
from utils.file_utils import translate_file_paths, translate_path_for_environment
from utils.git_utils import find_git_repositories, get_git_status, run_git_command
from utils.token_utils import estimate_tokens
@@ -300,11 +300,11 @@ class Precommit(BaseTool):
# Use centralized file handling with filtering for duplicate prevention
file_content = self._prepare_file_content_for_prompt(
translated_files,
request.continuation_id,
translated_files,
request.continuation_id,
"Context files",
max_tokens=remaining_tokens + 1000, # Add back the reserve that was calculated
reserve_tokens=1000 # Small reserve for formatting
reserve_tokens=1000, # Small reserve for formatting
)
if file_content: