More tests

This commit is contained in:
Fahad
2025-06-11 18:44:34 +04:00
parent ee3b9fdcd8
commit 898373bc22
10 changed files with 455 additions and 105 deletions

View File

@@ -8,6 +8,7 @@ Each test is in its own file for better organization and maintainability.
from .base_test import BaseSimulatorTest
from .test_basic_conversation import BasicConversationTest
from .test_content_validation import ContentValidationTest
from .test_cross_tool_comprehensive import CrossToolComprehensiveTest
from .test_cross_tool_continuation import CrossToolContinuationTest
from .test_logs_validation import LogsValidationTest
from .test_per_tool_deduplication import PerToolDeduplicationTest
@@ -19,6 +20,7 @@ TEST_REGISTRY = {
"content_validation": ContentValidationTest,
"per_tool_deduplication": PerToolDeduplicationTest,
"cross_tool_continuation": CrossToolContinuationTest,
"cross_tool_comprehensive": CrossToolComprehensiveTest,
"logs_validation": LogsValidationTest,
"redis_validation": RedisValidationTest,
}
@@ -29,6 +31,7 @@ __all__ = [
"ContentValidationTest",
"PerToolDeduplicationTest",
"CrossToolContinuationTest",
"CrossToolComprehensiveTest",
"LogsValidationTest",
"RedisValidationTest",
"TEST_REGISTRY",

View File

@@ -96,10 +96,7 @@ class Calculator:
f.write(config_content)
# Ensure absolute paths for MCP server compatibility
self.test_files = {
"python": os.path.abspath(test_py),
"config": os.path.abspath(test_config)
}
self.test_files = {"python": os.path.abspath(test_py), "config": os.path.abspath(test_config)}
self.logger.debug(f"Created test files with absolute paths: {list(self.test_files.values())}")
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
@@ -237,9 +234,9 @@ class Calculator:
def create_additional_test_file(self, filename: str, content: str) -> str:
"""Create an additional test file for mixed scenario testing"""
if not hasattr(self, 'test_dir') or not self.test_dir:
if not hasattr(self, "test_dir") or not self.test_dir:
raise RuntimeError("Test directory not initialized. Call setup_test_files() first.")
file_path = os.path.join(self.test_dir, filename)
with open(file_path, "w") as f:
f.write(content)

View File

@@ -53,7 +53,7 @@ DATABASE_CONFIG = {
validation_file = os.path.join(self.test_dir, "validation_config.py")
with open(validation_file, "w") as f:
f.write(validation_content)
# Ensure absolute path for MCP server compatibility
validation_file = os.path.abspath(validation_file)
@@ -113,11 +113,17 @@ DATABASE_CONFIG = {
tools_to_test = [
(
"chat",
{"prompt": "Please use low thinking mode. Analyze this config file", "files": [validation_file]}, # Using absolute path
{
"prompt": "Please use low thinking mode. Analyze this config file",
"files": [validation_file],
}, # Using absolute path
),
(
"codereview",
{"files": [validation_file], "context": "Please use low thinking mode. Review this configuration"}, # Using absolute path
{
"files": [validation_file],
"context": "Please use low thinking mode. Review this configuration",
}, # Using absolute path
),
("analyze", {"files": [validation_file], "analysis_type": "code_quality"}), # Using absolute path
]

View File

@@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
Comprehensive Cross-Tool Test
Tests file deduplication, conversation continuation, and file handling
across all available MCP tools using realistic workflows with low thinking mode.
Validates:
1. Cross-tool conversation continuation
2. File deduplication across different tools
3. Mixed file scenarios (old + new files)
4. Conversation history preservation
5. Proper tool chaining with context
"""
import subprocess
from .base_test import BaseSimulatorTest
class CrossToolComprehensiveTest(BaseSimulatorTest):
"""Comprehensive test across all MCP tools"""
@property
def test_name(self) -> str:
return "cross_tool_comprehensive"
@property
def test_description(self) -> str:
return "Comprehensive cross-tool file deduplication and continuation"
def get_docker_logs_since(self, since_time: str) -> str:
"""Get docker logs since a specific timestamp"""
try:
# Check both main server and log monitor for comprehensive logs
cmd_server = ["docker", "logs", "--since", since_time, self.container_name]
cmd_monitor = ["docker", "logs", "--since", since_time, "gemini-mcp-log-monitor"]
result_server = subprocess.run(cmd_server, capture_output=True, text=True)
result_monitor = subprocess.run(cmd_monitor, capture_output=True, text=True)
# Combine logs from both containers
combined_logs = result_server.stdout + "\n" + result_monitor.stdout
return combined_logs
except Exception as e:
self.logger.error(f"Failed to get docker logs: {e}")
return ""
def run_test(self) -> bool:
"""Comprehensive cross-tool test with all MCP tools"""
try:
self.logger.info("📄 Test: Comprehensive cross-tool file deduplication and continuation")
# Setup test files
self.setup_test_files()
# Create short test files for quick testing
python_code = '''def login(user, pwd):
# Security issue: plain text password
if user == "admin" and pwd == "123":
return True
return False
def hash_pwd(pwd):
# Weak hashing
return str(hash(pwd))
'''
config_file = """{
"db_password": "weak123",
"debug": true,
"secret_key": "test"
}"""
auth_file = self.create_additional_test_file("auth.py", python_code)
config_file_path = self.create_additional_test_file("config.json", config_file)
# Get timestamp for log filtering
import datetime
start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Tool chain: chat → analyze → debug → codereview → precommit
# Each step builds on the previous with cross-tool continuation
current_continuation_id = None
responses = []
# Step 1: Start with chat tool to understand the codebase
self.logger.info(" Step 1: chat tool - Initial codebase exploration")
chat_params = {
"prompt": "Please give me a quick one line reply. I have an authentication module that needs review. Can you help me understand potential issues?",
"files": [auth_file],
"thinking_mode": "low"
}
response1, continuation_id1 = self.call_mcp_tool("chat", chat_params)
if not response1 or not continuation_id1:
self.logger.error(" ❌ Step 1: chat tool failed")
return False
self.logger.info(f" ✅ Step 1: chat completed with continuation_id: {continuation_id1[:8]}...")
responses.append(("chat", response1, continuation_id1))
current_continuation_id = continuation_id1
# Step 2: Use analyze tool to do deeper analysis (fresh conversation)
self.logger.info(" Step 2: analyze tool - Deep code analysis (fresh)")
analyze_params = {
"files": [auth_file],
"question": "Please give me a quick one line reply. What are the security vulnerabilities and architectural issues in this authentication code?",
"thinking_mode": "low"
}
response2, continuation_id2 = self.call_mcp_tool("analyze", analyze_params)
if not response2:
self.logger.error(" ❌ Step 2: analyze tool failed")
return False
self.logger.info(
f" ✅ Step 2: analyze completed with continuation_id: {continuation_id2[:8] if continuation_id2 else 'None'}..."
)
responses.append(("analyze", response2, continuation_id2))
# Step 3: Continue chat conversation with config file
self.logger.info(" Step 3: chat continuation - Add config file context")
chat_continue_params = {
"continuation_id": current_continuation_id,
"prompt": "Please give me a quick one line reply. I also have this configuration file. Can you analyze it alongside the authentication code?",
"files": [auth_file, config_file_path], # Old + new file
"thinking_mode": "low"
}
response3, _ = self.call_mcp_tool("chat", chat_continue_params)
if not response3:
self.logger.error(" ❌ Step 3: chat continuation failed")
return False
self.logger.info(" ✅ Step 3: chat continuation completed")
responses.append(("chat_continue", response3, current_continuation_id))
# Step 4: Use debug tool to identify specific issues
self.logger.info(" Step 4: debug tool - Identify specific problems")
debug_params = {
"files": [auth_file, config_file_path],
"error_description": "Please give me a quick one line reply. The authentication system has security vulnerabilities. Help me identify and fix the main issues.",
"thinking_mode": "low"
}
response4, continuation_id4 = self.call_mcp_tool("debug", debug_params)
if not response4:
self.logger.error(" ❌ Step 4: debug tool failed")
return False
self.logger.info(
f" ✅ Step 4: debug completed with continuation_id: {continuation_id4[:8] if continuation_id4 else 'None'}..."
)
responses.append(("debug", response4, continuation_id4))
# Step 5: Cross-tool continuation - continue debug with chat context
if continuation_id4:
self.logger.info(" Step 5: debug continuation - Additional analysis")
debug_continue_params = {
"continuation_id": continuation_id4,
"files": [auth_file, config_file_path],
"error_description": "Please give me a quick one line reply. What specific code changes would you recommend to fix the password hashing vulnerability?",
"thinking_mode": "low"
}
response5, _ = self.call_mcp_tool("debug", debug_continue_params)
if response5:
self.logger.info(" ✅ Step 5: debug continuation completed")
responses.append(("debug_continue", response5, continuation_id4))
# Step 6: Use codereview for comprehensive review
self.logger.info(" Step 6: codereview tool - Comprehensive code review")
codereview_params = {
"files": [auth_file, config_file_path],
"context": "Please give me a quick one line reply. Comprehensive security-focused code review for production readiness",
"thinking_mode": "low"
}
response6, continuation_id6 = self.call_mcp_tool("codereview", codereview_params)
if not response6:
self.logger.error(" ❌ Step 6: codereview tool failed")
return False
self.logger.info(
f" ✅ Step 6: codereview completed with continuation_id: {continuation_id6[:8] if continuation_id6 else 'None'}..."
)
responses.append(("codereview", response6, continuation_id6))
# Step 7: Create improved version and use precommit
self.logger.info(" Step 7: precommit tool - Pre-commit validation")
# Create a short improved version
improved_code = '''import hashlib
def secure_login(user, pwd):
# Better: hashed password check
hashed = hashlib.sha256(pwd.encode()).hexdigest()
if user == "admin" and hashed == "expected_hash":
return True
return False
'''
improved_file = self.create_additional_test_file("auth_improved.py", improved_code)
precommit_params = {
"path": self.test_dir,
"files": [auth_file, config_file_path, improved_file],
"original_request": "Please give me a quick one line reply. Ready to commit security improvements to authentication module",
"thinking_mode": "low",
}
response7, continuation_id7 = self.call_mcp_tool("precommit", precommit_params)
if not response7:
self.logger.error(" ❌ Step 7: precommit tool failed")
return False
self.logger.info(
f" ✅ Step 7: precommit completed with continuation_id: {continuation_id7[:8] if continuation_id7 else 'None'}..."
)
responses.append(("precommit", response7, continuation_id7))
# Validate comprehensive results
self.logger.info(" 📋 Validating comprehensive cross-tool results...")
logs = self.get_docker_logs_since(start_time)
# Validation criteria
tools_used = [r[0] for r in responses]
continuation_ids_created = [r[2] for r in responses if r[2]]
# Check for various log patterns
conversation_logs = [
line for line in logs.split("\n") if "conversation" in line.lower() or "history" in line.lower()
]
embedding_logs = [
line
for line in logs.split("\n")
if "📁" in line or "embedding" in line.lower() or "file" in line.lower()
]
continuation_logs = [
line for line in logs.split("\n") if "continuation" in line.lower() or "resuming" in line.lower()
]
cross_tool_logs = [
line
for line in logs.split("\n")
if any(tool in line.lower() for tool in ["chat", "analyze", "debug", "codereview", "precommit"])
]
# File mentions
auth_file_mentioned = any("auth.py" in line for line in logs.split("\n"))
config_file_mentioned = any("config.json" in line for line in logs.split("\n"))
improved_file_mentioned = any("auth_improved.py" in line for line in logs.split("\n"))
# Print comprehensive diagnostics
self.logger.info(f" 📊 Tools used: {len(tools_used)} ({', '.join(tools_used)})")
self.logger.info(f" 📊 Continuation IDs created: {len(continuation_ids_created)}")
self.logger.info(f" 📊 Conversation logs found: {len(conversation_logs)}")
self.logger.info(f" 📊 File embedding logs found: {len(embedding_logs)}")
self.logger.info(f" 📊 Continuation logs found: {len(continuation_logs)}")
self.logger.info(f" 📊 Cross-tool activity logs: {len(cross_tool_logs)}")
self.logger.info(f" 📊 Auth file mentioned: {auth_file_mentioned}")
self.logger.info(f" 📊 Config file mentioned: {config_file_mentioned}")
self.logger.info(f" 📊 Improved file mentioned: {improved_file_mentioned}")
if self.verbose:
self.logger.debug(" 📋 Sample tool activity logs:")
for log in cross_tool_logs[:10]: # Show first 10
if log.strip():
self.logger.debug(f" {log.strip()}")
self.logger.debug(" 📋 Sample continuation logs:")
for log in continuation_logs[:5]: # Show first 5
if log.strip():
self.logger.debug(f" {log.strip()}")
# Comprehensive success criteria
success_criteria = [
len(tools_used) >= 5, # Used multiple tools
len(continuation_ids_created) >= 3, # Created multiple continuation threads
len(embedding_logs) > 10, # Significant file embedding activity
len(continuation_logs) > 0, # Evidence of continuation
auth_file_mentioned, # Original file processed
config_file_mentioned, # Additional file processed
improved_file_mentioned, # New file processed
len(conversation_logs) > 5, # Conversation history activity
]
passed_criteria = sum(success_criteria)
total_criteria = len(success_criteria)
self.logger.info(f" 📊 Success criteria met: {passed_criteria}/{total_criteria}")
if passed_criteria >= 6: # At least 6 out of 8 criteria
self.logger.info(" ✅ Comprehensive cross-tool test: PASSED")
return True
else:
self.logger.warning(" ⚠️ Comprehensive cross-tool test: FAILED")
self.logger.warning(" 💡 Check logs for detailed cross-tool activity")
return False
except Exception as e:
self.logger.error(f"Comprehensive cross-tool test failed: {e}")
return False
finally:
self.cleanup_test_files()

View File

@@ -11,10 +11,8 @@ Validates that:
4. Docker logs show deduplication behavior
"""
import json
import os
import subprocess
import tempfile
from .base_test import BaseSimulatorTest
@@ -35,10 +33,10 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
# Check both main server and log monitor for comprehensive logs
cmd_server = ["docker", "logs", "--since", since_time, self.container_name]
cmd_monitor = ["docker", "logs", "--since", since_time, "gemini-mcp-log-monitor"]
result_server = subprocess.run(cmd_server, capture_output=True, text=True)
result_monitor = subprocess.run(cmd_monitor, capture_output=True, text=True)
# Combine logs from both containers
combined_logs = result_server.stdout + "\n" + result_monitor.stdout
return combined_logs
@@ -51,14 +49,20 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
def validate_file_deduplication_in_logs(self, logs: str, tool_name: str, test_file: str) -> bool:
"""Validate that logs show file deduplication behavior"""
# Look for file embedding messages
embedding_messages = [line for line in logs.split('\n') if '📁' in line and 'embedding' in line and tool_name in line]
# Look for deduplication/filtering messages
filtering_messages = [line for line in logs.split('\n') if '📁' in line and 'Filtering' in line and tool_name in line]
skipping_messages = [line for line in logs.split('\n') if '📁' in line and 'skipping' in line and tool_name in line]
embedding_messages = [
line for line in logs.split("\n") if "📁" in line and "embedding" in line and tool_name in line
]
# Look for deduplication/filtering messages
filtering_messages = [
line for line in logs.split("\n") if "📁" in line and "Filtering" in line and tool_name in line
]
skipping_messages = [
line for line in logs.split("\n") if "📁" in line and "skipping" in line and tool_name in line
]
deduplication_found = len(filtering_messages) > 0 or len(skipping_messages) > 0
if deduplication_found:
self.logger.info(f"{tool_name}: Found deduplication evidence in logs")
for msg in filtering_messages + skipping_messages:
@@ -66,7 +70,7 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
else:
self.logger.warning(f" ⚠️ {tool_name}: No deduplication evidence found in logs")
self.logger.debug(f" 📁 All embedding messages: {embedding_messages}")
return deduplication_found
def run_test(self) -> bool:
@@ -76,21 +80,19 @@ class PerToolDeduplicationTest(BaseSimulatorTest):
# Setup test files
self.setup_test_files()
# Create a dummy file for precommit testing
dummy_content = '''def hello_world():
"""A simple hello world function with a bug"""
print("Hello world!")
return "hello"
# TODO: Fix the inconsistent return type
def calculate_sum(a, b):
# Create a short dummy file for quick testing
dummy_content = '''def add(a, b):
return a + b # Missing type hints
def divide(x, y):
return x / y # No zero check
'''
dummy_file_path = self.create_additional_test_file("dummy_code.py", dummy_content)
# Get timestamp for log filtering
import datetime
start_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Step 1: precommit tool with dummy file (low thinking mode)
@@ -98,98 +100,105 @@ def calculate_sum(a, b):
precommit_params = {
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path],
"original_request": "Please use low thinking mode. Review this code for commit readiness",
"thinking_mode": "low"
"original_request": "Please give me a quick one line reply. Review this code for commit readiness",
"thinking_mode": "low",
}
response1, continuation_id = self.call_mcp_tool("precommit", precommit_params)
if not response1:
self.logger.error(" ❌ Step 1: precommit tool failed")
return False
if not continuation_id:
self.logger.error(" ❌ Step 1: precommit tool didn't provide continuation_id")
return False
# Validate continuation_id format (should be UUID)
if len(continuation_id) < 32:
self.logger.error(f" ❌ Step 1: Invalid continuation_id format: {continuation_id}")
return False
self.logger.info(f" ✅ Step 1: precommit completed with continuation_id: {continuation_id[:8]}...")
# Step 2: codereview tool with same file (NO continuation - fresh conversation)
self.logger.info(" Step 2: codereview tool with same file (fresh conversation)")
codereview_params = {
"files": [dummy_file_path],
"context": "Please use low thinking mode. General code review for quality and best practices"
"context": "Please give me a quick one line reply. General code review for quality and best practices",
"thinking_mode": "low"
}
response2, _ = self.call_mcp_tool("codereview", codereview_params)
if not response2:
self.logger.error(" ❌ Step 2: codereview tool failed")
return False
self.logger.info(" ✅ Step 2: codereview completed (fresh conversation)")
# Step 3: Create new file and continue with precommit
self.logger.info(" Step 3: precommit continuation with old + new file")
new_file_content = '''def new_feature():
"""A new feature function"""
return {"status": "implemented", "version": "1.0"}
new_file_content = '''def multiply(x, y):
return x * y
class NewUtility:
"""A new utility class"""
def __init__(self):
self.initialized = True
def process_data(self, data):
return f"Processed: {data}"
def subtract(a, b):
return a - b
'''
new_file_path = self.create_additional_test_file("new_feature.py", new_file_content)
# Continue precommit with both files
continue_params = {
"continuation_id": continuation_id,
"path": self.test_dir, # Required path parameter
"files": [dummy_file_path, new_file_path], # Old + new file
"original_request": "Please use low thinking mode. Now also review the new feature file along with the previous one",
"thinking_mode": "low"
"original_request": "Please give me a quick one line reply. Now also review the new feature file along with the previous one",
"thinking_mode": "low",
}
response3, _ = self.call_mcp_tool("precommit", continue_params)
if not response3:
self.logger.error(" ❌ Step 3: precommit continuation failed")
return False
self.logger.info(" ✅ Step 3: precommit continuation completed")
# Validate results in docker logs
self.logger.info(" 📋 Validating conversation history and file deduplication...")
logs = self.get_docker_logs_since(start_time)
# Check for conversation history building
conversation_logs = [line for line in logs.split('\n') if 'conversation' in line.lower() or 'history' in line.lower()]
conversation_logs = [
line for line in logs.split("\n") if "conversation" in line.lower() or "history" in line.lower()
]
# Check for file embedding/deduplication
embedding_logs = [line for line in logs.split('\n') if '📁' in line or 'embedding' in line.lower() or 'file' in line.lower()]
embedding_logs = [
line
for line in logs.split("\n")
if "📁" in line or "embedding" in line.lower() or "file" in line.lower()
]
# Check for continuation evidence
continuation_logs = [line for line in logs.split('\n') if 'continuation' in line.lower() or continuation_id[:8] in line]
continuation_logs = [
line for line in logs.split("\n") if "continuation" in line.lower() or continuation_id[:8] in line
]
# Check for both files mentioned
dummy_file_mentioned = any("dummy_code.py" in line for line in logs.split('\n'))
new_file_mentioned = any("new_feature.py" in line for line in logs.split('\n'))
dummy_file_mentioned = any("dummy_code.py" in line for line in logs.split("\n"))
new_file_mentioned = any("new_feature.py" in line for line in logs.split("\n"))
# Print diagnostic information
self.logger.info(f" 📊 Conversation logs found: {len(conversation_logs)}")
self.logger.info(f" 📊 File embedding logs found: {len(embedding_logs)}")
self.logger.info(f" 📊 Continuation logs found: {len(continuation_logs)}")
self.logger.info(f" 📊 Dummy file mentioned: {dummy_file_mentioned}")
self.logger.info(f" 📊 New file mentioned: {new_file_mentioned}")
if self.verbose:
self.logger.debug(" 📋 Sample embedding logs:")
for log in embedding_logs[:5]: # Show first 5
if log.strip():
self.logger.debug(f" {log.strip()}")
self.logger.debug(" 📋 Sample continuation logs:")
for log in continuation_logs[:3]: # Show first 3
if log.strip():
@@ -200,14 +209,14 @@ class NewUtility:
len(embedding_logs) > 0, # File embedding occurred
len(continuation_logs) > 0, # Continuation worked
dummy_file_mentioned, # Original file processed
new_file_mentioned # New file processed
new_file_mentioned, # New file processed
]
passed_criteria = sum(success_criteria)
total_criteria = len(success_criteria)
self.logger.info(f" 📊 Success criteria met: {passed_criteria}/{total_criteria}")
if passed_criteria >= 3: # At least 3 out of 4 criteria
self.logger.info(" ✅ File deduplication workflow test: PASSED")
return True