fix: Minor tweaks to prompts fix: Improved support for smaller models that struggle with strict structured JSON output Rearranged reasons to use the MCP above quick start (collapsed)
1027 lines
46 KiB
Python
1027 lines
46 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CodeReview Tool Validation Test
|
|
|
|
Tests the codereview tool's capabilities using the new workflow architecture.
|
|
This validates that the workflow-based code review provides step-by-step
|
|
analysis with proper investigation guidance and expert analysis integration.
|
|
"""
|
|
|
|
import json
|
|
from typing import Optional
|
|
|
|
from .conversation_base_test import ConversationBaseTest
|
|
|
|
|
|
class CodeReviewValidationTest(ConversationBaseTest):
|
|
"""Test codereview tool with new workflow architecture"""
|
|
|
|
@property
|
|
def test_name(self) -> str:
|
|
return "codereview_validation"
|
|
|
|
@property
|
|
def test_description(self) -> str:
|
|
return "CodeReview tool validation with new workflow architecture"
|
|
|
|
def run_test(self) -> bool:
|
|
"""Test codereview tool capabilities"""
|
|
# Set up the test environment
|
|
self.setUp()
|
|
|
|
try:
|
|
self.logger.info("Test: CodeReviewWorkflow tool validation (new architecture)")
|
|
|
|
# Create test code with various issues for review
|
|
self._create_test_code_for_review()
|
|
|
|
# Test 1: Single review session with multiple steps
|
|
if not self._test_single_review_session():
|
|
return False
|
|
|
|
# Test 2: Review with backtracking
|
|
if not self._test_review_with_backtracking():
|
|
return False
|
|
|
|
# Test 3: Complete review with expert analysis
|
|
if not self._test_complete_review_with_analysis():
|
|
return False
|
|
|
|
# Test 4: Certain confidence behavior
|
|
if not self._test_certain_confidence():
|
|
return False
|
|
|
|
# Test 5: Context-aware file embedding
|
|
if not self._test_context_aware_file_embedding():
|
|
return False
|
|
|
|
# Test 6: Multi-step file context optimization
|
|
if not self._test_multi_step_file_context():
|
|
return False
|
|
|
|
self.logger.info(" ✅ All codereview validation tests passed")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"CodeReviewWorkflow validation test failed: {e}")
|
|
return False
|
|
|
|
def _create_test_code_for_review(self):
|
|
"""Create test files with various code quality issues for review"""
|
|
# Create a payment processing module with multiple issues
|
|
payment_code = """#!/usr/bin/env python3
|
|
import hashlib
|
|
import requests
|
|
import json
|
|
from datetime import datetime
|
|
|
|
class PaymentProcessor:
|
|
def __init__(self, api_key):
|
|
self.api_key = api_key # Security issue: API key stored in plain text
|
|
self.base_url = "https://payment-gateway.example.com"
|
|
self.session = requests.Session()
|
|
self.failed_payments = [] # Performance issue: unbounded list
|
|
|
|
def process_payment(self, amount, card_number, cvv, user_id):
|
|
\"\"\"Process a payment transaction\"\"\"
|
|
# Security issue: No input validation
|
|
# Performance issue: Inefficient nested loops
|
|
for attempt in range(3):
|
|
for retry in range(5):
|
|
try:
|
|
# Security issue: Logging sensitive data
|
|
print(f"Processing payment: {card_number}, CVV: {cvv}")
|
|
|
|
# Over-engineering: Complex hashing that's not needed
|
|
payment_hash = self._generate_complex_hash(amount, card_number, cvv, user_id, datetime.now())
|
|
|
|
# Security issue: Insecure HTTP request construction
|
|
url = f"{self.base_url}/charge?amount={amount}&card={card_number}&api_key={self.api_key}"
|
|
|
|
response = self.session.get(url) # Security issue: using GET for sensitive data
|
|
|
|
if response.status_code == 200:
|
|
return {"status": "success", "hash": payment_hash}
|
|
else:
|
|
# Code smell: Generic exception handling without specific error types
|
|
self.failed_payments.append({"amount": amount, "timestamp": datetime.now()})
|
|
|
|
except Exception as e:
|
|
# Code smell: Bare except clause and poor error handling
|
|
print(f"Payment failed: {e}")
|
|
continue
|
|
|
|
return {"status": "failed"}
|
|
|
|
def _generate_complex_hash(self, amount, card_number, cvv, user_id, timestamp):
|
|
\"\"\"Over-engineered hash generation with unnecessary complexity\"\"\"
|
|
# Over-engineering: Overly complex for no clear benefit
|
|
combined = f"{amount}-{card_number}-{cvv}-{user_id}-{timestamp}"
|
|
|
|
# Security issue: Weak hashing algorithm
|
|
hash1 = hashlib.md5(combined.encode()).hexdigest()
|
|
hash2 = hashlib.sha1(hash1.encode()).hexdigest()
|
|
hash3 = hashlib.md5(hash2.encode()).hexdigest()
|
|
|
|
# Performance issue: Unnecessary string operations in loop
|
|
result = ""
|
|
for i in range(len(hash3)):
|
|
for j in range(3): # Arbitrary nested loop
|
|
result += hash3[i] if i % 2 == 0 else hash3[i].upper()
|
|
|
|
return result[:32] # Arbitrary truncation
|
|
|
|
def get_payment_history(self, user_id):
|
|
\"\"\"Get payment history - has scalability issues\"\"\"
|
|
# Performance issue: No pagination, could return massive datasets
|
|
# Performance issue: Inefficient algorithm O(n²)
|
|
all_payments = self._fetch_all_payments() # Could be millions of records
|
|
user_payments = []
|
|
|
|
for payment in all_payments:
|
|
for field in payment: # Unnecessary nested iteration
|
|
if field == "user_id" and payment[field] == user_id:
|
|
user_payments.append(payment)
|
|
break
|
|
|
|
return user_payments
|
|
|
|
def _fetch_all_payments(self):
|
|
\"\"\"Simulated method that would fetch all payments\"\"\"
|
|
# Maintainability issue: Hard-coded test data
|
|
return [
|
|
{"user_id": 1, "amount": 100, "status": "success"},
|
|
{"user_id": 2, "amount": 200, "status": "failed"},
|
|
{"user_id": 1, "amount": 150, "status": "success"},
|
|
]
|
|
"""
|
|
|
|
# Create test file with multiple issues
|
|
self.payment_file = self.create_additional_test_file("payment_processor.py", payment_code)
|
|
self.logger.info(f" ✅ Created test file with code issues: {self.payment_file}")
|
|
|
|
# Create configuration file with additional issues
|
|
config_code = """#!/usr/bin/env python3
|
|
import os
|
|
|
|
# Security issue: Hardcoded secrets
|
|
DATABASE_PASSWORD = "admin123"
|
|
SECRET_KEY = "my-secret-key-12345"
|
|
|
|
# Over-engineering: Unnecessarily complex configuration class
|
|
class ConfigurationManager:
|
|
def __init__(self):
|
|
self.config_cache = {}
|
|
self.config_hierarchy = {}
|
|
self.config_validators = {}
|
|
self.config_transformers = {}
|
|
self.config_listeners = []
|
|
|
|
def get_config(self, key, default=None):
|
|
# Over-engineering: Complex caching for simple config lookup
|
|
if key in self.config_cache:
|
|
cached_value = self.config_cache[key]
|
|
if self._validate_cached_value(cached_value):
|
|
return self._transform_value(key, cached_value)
|
|
|
|
# Code smell: Complex nested conditionals
|
|
if key in self.config_hierarchy:
|
|
hierarchy = self.config_hierarchy[key]
|
|
for level in hierarchy:
|
|
if level == "env":
|
|
value = os.getenv(key.upper(), default)
|
|
elif level == "file":
|
|
value = self._read_from_file(key, default)
|
|
elif level == "database":
|
|
value = self._read_from_database(key, default)
|
|
else:
|
|
value = default
|
|
|
|
if value is not None:
|
|
self.config_cache[key] = value
|
|
return self._transform_value(key, value)
|
|
|
|
return default
|
|
|
|
def _validate_cached_value(self, value):
|
|
# Maintainability issue: Unclear validation logic
|
|
if isinstance(value, str) and len(value) > 1000:
|
|
return False
|
|
return True
|
|
|
|
def _transform_value(self, key, value):
|
|
# Code smell: Unnecessary abstraction
|
|
if key in self.config_transformers:
|
|
transformer = self.config_transformers[key]
|
|
return transformer(value)
|
|
return value
|
|
|
|
def _read_from_file(self, key, default):
|
|
# Maintainability issue: No error handling for file operations
|
|
with open(f"/etc/app/{key}.conf") as f:
|
|
return f.read().strip()
|
|
|
|
def _read_from_database(self, key, default):
|
|
# Performance issue: Database query for every config read
|
|
# No connection pooling or caching
|
|
import sqlite3
|
|
conn = sqlite3.connect("config.db")
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT value FROM config WHERE key = ?", (key,))
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
return result[0] if result else default
|
|
"""
|
|
|
|
self.config_file = self.create_additional_test_file("config.py", config_code)
|
|
self.logger.info(f" ✅ Created configuration file with issues: {self.config_file}")
|
|
|
|
def _test_single_review_session(self) -> bool:
|
|
"""Test a complete code review session with multiple steps"""
|
|
try:
|
|
self.logger.info(" 1.1: Testing single code review session")
|
|
|
|
# Step 1: Start review
|
|
self.logger.info(" 1.1.1: Step 1 - Initial review")
|
|
response1, continuation_id = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "I need to perform a comprehensive code review of the payment processing module. Let me start by examining the code structure and identifying potential issues.",
|
|
"step_number": 1,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Initial examination reveals a payment processing class with potential security and performance concerns.",
|
|
"files_checked": [self.payment_file],
|
|
"relevant_files": [self.payment_file],
|
|
"files": [self.payment_file], # Required for step 1
|
|
"review_type": "full",
|
|
"severity_filter": "all",
|
|
},
|
|
)
|
|
|
|
if not response1 or not continuation_id:
|
|
self.logger.error("Failed to get initial review response")
|
|
return False
|
|
|
|
# Parse and validate JSON response
|
|
response1_data = self._parse_review_response(response1)
|
|
if not response1_data:
|
|
return False
|
|
|
|
# Validate step 1 response structure - expect pause_for_code_review for next_step_required=True
|
|
if not self._validate_step_response(response1_data, 1, 4, True, "pause_for_code_review"):
|
|
return False
|
|
|
|
self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
|
|
|
|
# Step 2: Detailed analysis
|
|
self.logger.info(" 1.1.2: Step 2 - Detailed security analysis")
|
|
response2, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Now performing detailed security analysis of the payment processor code to identify vulnerabilities and code quality issues.",
|
|
"step_number": 2,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Found multiple security issues: API key stored in plain text, sensitive data logging, insecure HTTP methods, and weak hashing algorithms.",
|
|
"files_checked": [self.payment_file],
|
|
"relevant_files": [self.payment_file],
|
|
"relevant_context": ["PaymentProcessor.__init__", "PaymentProcessor.process_payment"],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "API key stored in plain text in memory"},
|
|
{"severity": "critical", "description": "Credit card and CVV logged in plain text"},
|
|
{"severity": "high", "description": "Using GET method for sensitive payment data"},
|
|
{"severity": "medium", "description": "Weak MD5 hashing algorithm used"},
|
|
],
|
|
"confidence": "high",
|
|
"continuation_id": continuation_id,
|
|
},
|
|
)
|
|
|
|
if not response2:
|
|
self.logger.error("Failed to continue review to step 2")
|
|
return False
|
|
|
|
response2_data = self._parse_review_response(response2)
|
|
if not self._validate_step_response(response2_data, 2, 4, True, "pause_for_code_review"):
|
|
return False
|
|
|
|
# Check review status tracking
|
|
review_status = response2_data.get("code_review_status", {})
|
|
if review_status.get("files_checked", 0) < 1:
|
|
self.logger.error("Files checked count not properly tracked")
|
|
return False
|
|
|
|
if review_status.get("relevant_context", 0) != 2:
|
|
self.logger.error("Relevant context not properly tracked")
|
|
return False
|
|
|
|
# Check issues by severity
|
|
issues_by_severity = review_status.get("issues_by_severity", {})
|
|
if issues_by_severity.get("critical", 0) != 2:
|
|
self.logger.error("Critical issues not properly tracked")
|
|
return False
|
|
|
|
if issues_by_severity.get("high", 0) != 1:
|
|
self.logger.error("High severity issues not properly tracked")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Step 2 successful with proper issue tracking")
|
|
|
|
# Store continuation_id for next test
|
|
self.review_continuation_id = continuation_id
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Single review session test failed: {e}")
|
|
return False
|
|
|
|
def _test_review_with_backtracking(self) -> bool:
|
|
"""Test code review with backtracking to revise findings"""
|
|
try:
|
|
self.logger.info(" 1.2: Testing code review with backtracking")
|
|
|
|
# Start a new review for testing backtracking
|
|
self.logger.info(" 1.2.1: Start review for backtracking test")
|
|
response1, continuation_id = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Reviewing configuration management code for best practices",
|
|
"step_number": 1,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Initial analysis shows complex configuration class",
|
|
"files_checked": [self.config_file],
|
|
"relevant_files": [self.config_file],
|
|
"files": [self.config_file],
|
|
"review_type": "full",
|
|
},
|
|
)
|
|
|
|
if not response1 or not continuation_id:
|
|
self.logger.error("Failed to start backtracking test review")
|
|
return False
|
|
|
|
# Step 2: Initial direction
|
|
self.logger.info(" 1.2.2: Step 2 - Initial analysis direction")
|
|
response2, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Focusing on configuration architecture patterns",
|
|
"step_number": 2,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Architecture seems overly complex, but need to look more carefully at security issues",
|
|
"files_checked": [self.config_file],
|
|
"relevant_files": [self.config_file],
|
|
"issues_found": [
|
|
{"severity": "medium", "description": "Complex configuration hierarchy"},
|
|
],
|
|
"confidence": "low",
|
|
"continuation_id": continuation_id,
|
|
},
|
|
)
|
|
|
|
if not response2:
|
|
self.logger.error("Failed to continue to step 2")
|
|
return False
|
|
|
|
# Step 3: Backtrack and focus on security
|
|
self.logger.info(" 1.2.3: Step 3 - Backtrack to focus on security issues")
|
|
response3, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Backtracking - need to focus on the critical security issues I initially missed. Found hardcoded secrets and credentials in plain text.",
|
|
"step_number": 3,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Found critical security vulnerabilities: hardcoded DATABASE_PASSWORD and SECRET_KEY in plain text",
|
|
"files_checked": [self.config_file],
|
|
"relevant_files": [self.config_file],
|
|
"relevant_context": ["ConfigurationManager.__init__"],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "Hardcoded database password in source code"},
|
|
{"severity": "critical", "description": "Hardcoded secret key in source code"},
|
|
{"severity": "high", "description": "Over-engineered configuration system"},
|
|
],
|
|
"confidence": "high",
|
|
"backtrack_from_step": 2, # Backtrack from step 2
|
|
"continuation_id": continuation_id,
|
|
},
|
|
)
|
|
|
|
if not response3:
|
|
self.logger.error("Failed to backtrack")
|
|
return False
|
|
|
|
response3_data = self._parse_review_response(response3)
|
|
if not self._validate_step_response(response3_data, 3, 4, True, "pause_for_code_review"):
|
|
return False
|
|
|
|
self.logger.info(" ✅ Backtracking working correctly")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Backtracking test failed: {e}")
|
|
return False
|
|
|
|
def _test_complete_review_with_analysis(self) -> bool:
|
|
"""Test complete code review ending with expert analysis"""
|
|
try:
|
|
self.logger.info(" 1.3: Testing complete review with expert analysis")
|
|
|
|
# Use the continuation from first test
|
|
continuation_id = getattr(self, "review_continuation_id", None)
|
|
if not continuation_id:
|
|
# Start fresh if no continuation available
|
|
self.logger.info(" 1.3.0: Starting fresh review")
|
|
response0, continuation_id = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Reviewing payment processor for security and quality issues",
|
|
"step_number": 1,
|
|
"total_steps": 2,
|
|
"next_step_required": True,
|
|
"findings": "Found multiple security and performance issues",
|
|
"files_checked": [self.payment_file],
|
|
"relevant_files": [self.payment_file],
|
|
"files": [self.payment_file],
|
|
"relevant_context": ["PaymentProcessor.process_payment"],
|
|
},
|
|
)
|
|
if not response0 or not continuation_id:
|
|
self.logger.error("Failed to start fresh review")
|
|
return False
|
|
|
|
# Final step - trigger expert analysis
|
|
self.logger.info(" 1.3.1: Final step - complete review")
|
|
response_final, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Code review complete. Identified comprehensive security, performance, and maintainability issues throughout the payment processing module.",
|
|
"step_number": 2,
|
|
"total_steps": 2,
|
|
"next_step_required": False, # Final step - triggers expert analysis
|
|
"findings": "Complete analysis reveals critical security vulnerabilities, performance bottlenecks, over-engineering patterns, and maintainability concerns. All issues documented with severity levels.",
|
|
"files_checked": [self.payment_file],
|
|
"relevant_files": [self.payment_file],
|
|
"relevant_context": [
|
|
"PaymentProcessor.process_payment",
|
|
"PaymentProcessor._generate_complex_hash",
|
|
"PaymentProcessor.get_payment_history",
|
|
],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "API key stored in plain text"},
|
|
{"severity": "critical", "description": "Sensitive payment data logged"},
|
|
{"severity": "high", "description": "SQL injection vulnerability potential"},
|
|
{"severity": "medium", "description": "Over-engineered hash generation"},
|
|
{"severity": "low", "description": "Poor error handling patterns"},
|
|
],
|
|
"confidence": "high",
|
|
"continuation_id": continuation_id,
|
|
"model": "flash", # Use flash for expert analysis
|
|
},
|
|
)
|
|
|
|
if not response_final:
|
|
self.logger.error("Failed to complete review")
|
|
return False
|
|
|
|
response_final_data = self._parse_review_response(response_final)
|
|
if not response_final_data:
|
|
return False
|
|
|
|
# Validate final response structure - expect calling_expert_analysis for next_step_required=False
|
|
if response_final_data.get("status") != "calling_expert_analysis":
|
|
self.logger.error(
|
|
f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
|
|
)
|
|
return False
|
|
|
|
if not response_final_data.get("code_review_complete"):
|
|
self.logger.error("Expected code_review_complete=true for final step")
|
|
return False
|
|
|
|
# Check for expert analysis
|
|
if "expert_analysis" not in response_final_data:
|
|
self.logger.error("Missing expert_analysis in final response")
|
|
return False
|
|
|
|
expert_analysis = response_final_data.get("expert_analysis", {})
|
|
|
|
# Check for expected analysis content (checking common patterns)
|
|
analysis_text = json.dumps(expert_analysis, ensure_ascii=False).lower()
|
|
|
|
# Look for code review identification
|
|
review_indicators = ["security", "vulnerability", "performance", "critical", "api", "key"]
|
|
found_indicators = sum(1 for indicator in review_indicators if indicator in analysis_text)
|
|
|
|
if found_indicators >= 3:
|
|
self.logger.info(" ✅ Expert analysis identified the issues correctly")
|
|
else:
|
|
self.logger.warning(
|
|
f" ⚠️ Expert analysis may not have fully identified the issues (found {found_indicators}/6 indicators)"
|
|
)
|
|
|
|
# Check complete review summary
|
|
if "complete_code_review" not in response_final_data:
|
|
self.logger.error("Missing complete_code_review in final response")
|
|
return False
|
|
|
|
complete_review = response_final_data["complete_code_review"]
|
|
if not complete_review.get("relevant_context"):
|
|
self.logger.error("Missing relevant context in complete review")
|
|
return False
|
|
|
|
if "PaymentProcessor.process_payment" not in complete_review["relevant_context"]:
|
|
self.logger.error("Expected method not found in review summary")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Complete review with expert analysis successful")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Complete review test failed: {e}")
|
|
return False
|
|
|
|
def _test_certain_confidence(self) -> bool:
|
|
"""Test certain confidence behavior - should skip expert analysis"""
|
|
try:
|
|
self.logger.info(" 1.4: Testing certain confidence behavior")
|
|
|
|
# Test certain confidence - should skip expert analysis
|
|
self.logger.info(" 1.4.1: Certain confidence review")
|
|
response_certain, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "I have completed a thorough code review with 100% certainty of all issues identified.",
|
|
"step_number": 1,
|
|
"total_steps": 1,
|
|
"next_step_required": False, # Final step
|
|
"findings": "Complete review identified all critical security issues, performance problems, and code quality concerns. All issues are documented with clear severity levels and specific recommendations.",
|
|
"files_checked": [self.payment_file],
|
|
"relevant_files": [self.payment_file],
|
|
"files": [self.payment_file],
|
|
"relevant_context": ["PaymentProcessor.process_payment"],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "Hardcoded API key security vulnerability"},
|
|
{"severity": "high", "description": "Performance bottleneck in payment history"},
|
|
],
|
|
"review_validation_type": "internal", # This should skip expert analysis
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response_certain:
|
|
self.logger.error("Failed to test certain confidence")
|
|
return False
|
|
|
|
response_certain_data = self._parse_review_response(response_certain)
|
|
if not response_certain_data:
|
|
return False
|
|
|
|
# Validate certain confidence response - should skip expert analysis
|
|
if response_certain_data.get("status") != "code_review_complete_ready_for_implementation":
|
|
self.logger.error(
|
|
f"Expected status 'code_review_complete_ready_for_implementation', got '{response_certain_data.get('status')}'"
|
|
)
|
|
return False
|
|
|
|
if not response_certain_data.get("skip_expert_analysis"):
|
|
self.logger.error("Expected skip_expert_analysis=true for certain confidence")
|
|
return False
|
|
|
|
expert_analysis = response_certain_data.get("expert_analysis", {})
|
|
if expert_analysis.get("status") not in [
|
|
"skipped_due_to_certain_review_confidence",
|
|
"skipped_due_to_internal_analysis_type",
|
|
]:
|
|
self.logger.error("Expert analysis should be skipped for certain confidence")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Certain confidence behavior working correctly")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Certain confidence test failed: {e}")
|
|
return False
|
|
|
|
def _test_context_aware_file_embedding(self) -> bool:
|
|
"""Test context-aware file embedding optimization"""
|
|
try:
|
|
self.logger.info(" 1.5: Testing context-aware file embedding")
|
|
|
|
# Create multiple test files for context testing
|
|
utils_content = """#!/usr/bin/env python3
|
|
def calculate_discount(price, discount_percent):
|
|
\"\"\"Calculate discount amount\"\"\"
|
|
if discount_percent < 0 or discount_percent > 100:
|
|
raise ValueError("Invalid discount percentage")
|
|
|
|
return price * (discount_percent / 100)
|
|
|
|
def format_currency(amount):
|
|
\"\"\"Format amount as currency\"\"\"
|
|
return f"${amount:.2f}"
|
|
"""
|
|
|
|
validator_content = """#!/usr/bin/env python3
|
|
import re
|
|
|
|
def validate_email(email):
|
|
\"\"\"Validate email format\"\"\"
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
|
|
return re.match(pattern, email) is not None
|
|
|
|
def validate_credit_card(card_number):
|
|
\"\"\"Basic credit card validation\"\"\"
|
|
# Remove spaces and dashes
|
|
card_number = re.sub(r'[\\s-]', '', card_number)
|
|
|
|
# Check if all digits
|
|
if not card_number.isdigit():
|
|
return False
|
|
|
|
# Basic length check
|
|
return len(card_number) in [13, 14, 15, 16]
|
|
"""
|
|
|
|
# Create test files
|
|
utils_file = self.create_additional_test_file("utils.py", utils_content)
|
|
validator_file = self.create_additional_test_file("validator.py", validator_content)
|
|
|
|
# Test 1: New conversation, intermediate step - should only reference files
|
|
self.logger.info(" 1.5.1: New conversation intermediate step (should reference only)")
|
|
response1, continuation_id = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Starting comprehensive code review of utility modules",
|
|
"step_number": 1,
|
|
"total_steps": 3,
|
|
"next_step_required": True, # Intermediate step
|
|
"findings": "Initial analysis of utility and validation functions",
|
|
"files_checked": [utils_file, validator_file],
|
|
"relevant_files": [utils_file], # This should be referenced, not embedded
|
|
"files": [utils_file, validator_file], # Required for step 1
|
|
"relevant_context": ["calculate_discount"],
|
|
"confidence": "low",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response1 or not continuation_id:
|
|
self.logger.error("Failed to start context-aware file embedding test")
|
|
return False
|
|
|
|
response1_data = self._parse_review_response(response1)
|
|
if not response1_data:
|
|
return False
|
|
|
|
# Check file context - should be reference_only for intermediate step
|
|
file_context = response1_data.get("file_context", {})
|
|
if file_context.get("type") != "reference_only":
|
|
self.logger.error(f"Expected reference_only file context, got: {file_context.get('type')}")
|
|
return False
|
|
|
|
if "Files referenced but not embedded" not in file_context.get("context_optimization", ""):
|
|
self.logger.error("Expected context optimization message for reference_only")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Intermediate step correctly uses reference_only file context")
|
|
|
|
# Test 2: Final step - should embed files for expert analysis
|
|
self.logger.info(" 1.5.2: Final step (should embed files)")
|
|
response3, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Code review complete - identified all issues and recommendations",
|
|
"step_number": 3,
|
|
"total_steps": 3,
|
|
"next_step_required": False, # Final step - should embed files
|
|
"continuation_id": continuation_id,
|
|
"findings": "Complete review: utility functions have proper error handling, validation functions are robust",
|
|
"files_checked": [utils_file, validator_file],
|
|
"relevant_files": [utils_file, validator_file], # Should be fully embedded
|
|
"relevant_context": ["calculate_discount", "validate_email", "validate_credit_card"],
|
|
"issues_found": [
|
|
{"severity": "low", "description": "Could add more comprehensive email validation"},
|
|
{"severity": "medium", "description": "Credit card validation logic could be more robust"},
|
|
],
|
|
"confidence": "medium",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response3:
|
|
self.logger.error("Failed to complete to final step")
|
|
return False
|
|
|
|
response3_data = self._parse_review_response(response3)
|
|
if not response3_data:
|
|
return False
|
|
|
|
# Check file context - should be fully_embedded for final step
|
|
file_context3 = response3_data.get("file_context", {})
|
|
if file_context3.get("type") != "fully_embedded":
|
|
self.logger.error(
|
|
f"Expected fully_embedded file context for final step, got: {file_context3.get('type')}"
|
|
)
|
|
return False
|
|
|
|
if "Full file content embedded for expert analysis" not in file_context3.get("context_optimization", ""):
|
|
self.logger.error("Expected expert analysis optimization message for fully_embedded")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Final step correctly uses fully_embedded file context")
|
|
|
|
# Verify expert analysis was called for final step
|
|
if response3_data.get("status") != "calling_expert_analysis":
|
|
self.logger.error("Final step should trigger expert analysis")
|
|
return False
|
|
|
|
if "expert_analysis" not in response3_data:
|
|
self.logger.error("Expert analysis should be present in final step")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Context-aware file embedding test completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Context-aware file embedding test failed: {e}")
|
|
return False
|
|
|
|
def _test_multi_step_file_context(self) -> bool:
|
|
"""Test multi-step workflow with proper file context transitions"""
|
|
try:
|
|
self.logger.info(" 1.6: Testing multi-step file context optimization")
|
|
|
|
# Use existing payment and config files for multi-step test
|
|
files_to_review = [self.payment_file, self.config_file]
|
|
|
|
# Step 1: Start review (new conversation)
|
|
self.logger.info(" 1.6.1: Step 1 - Start comprehensive review")
|
|
response1, continuation_id = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Starting comprehensive security and quality review of payment system components",
|
|
"step_number": 1,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"findings": "Initial review of payment processor and configuration management modules",
|
|
"files_checked": files_to_review,
|
|
"relevant_files": [self.payment_file],
|
|
"files": files_to_review,
|
|
"relevant_context": [],
|
|
"confidence": "low",
|
|
"review_type": "security",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response1 or not continuation_id:
|
|
self.logger.error("Failed to start multi-step file context test")
|
|
return False
|
|
|
|
response1_data = self._parse_review_response(response1)
|
|
|
|
# Validate step 1 - should use reference_only
|
|
file_context1 = response1_data.get("file_context", {})
|
|
if file_context1.get("type") != "reference_only":
|
|
self.logger.error("Step 1 should use reference_only file context")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Step 1: reference_only file context")
|
|
|
|
# Step 2: Security analysis
|
|
self.logger.info(" 1.6.2: Step 2 - Security analysis")
|
|
response2, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Focusing on critical security vulnerabilities across both modules",
|
|
"step_number": 2,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"continuation_id": continuation_id,
|
|
"findings": "Found critical security issues: hardcoded secrets in config, API key exposure in payment processor",
|
|
"files_checked": files_to_review,
|
|
"relevant_files": files_to_review,
|
|
"relevant_context": ["PaymentProcessor.__init__", "ConfigurationManager"],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "Hardcoded database password"},
|
|
{"severity": "critical", "description": "API key stored in plain text"},
|
|
],
|
|
"confidence": "medium",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response2:
|
|
self.logger.error("Failed to continue to step 2")
|
|
return False
|
|
|
|
response2_data = self._parse_review_response(response2)
|
|
|
|
# Validate step 2 - should still use reference_only
|
|
file_context2 = response2_data.get("file_context", {})
|
|
if file_context2.get("type") != "reference_only":
|
|
self.logger.error("Step 2 should use reference_only file context")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Step 2: reference_only file context")
|
|
|
|
# Step 3: Performance and architecture analysis
|
|
self.logger.info(" 1.6.3: Step 3 - Performance and architecture analysis")
|
|
response3, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Analyzing performance bottlenecks and architectural concerns",
|
|
"step_number": 3,
|
|
"total_steps": 4,
|
|
"next_step_required": True,
|
|
"continuation_id": continuation_id,
|
|
"findings": "Performance issues: unbounded lists, inefficient algorithms, over-engineered patterns",
|
|
"files_checked": files_to_review,
|
|
"relevant_files": files_to_review,
|
|
"relevant_context": [
|
|
"PaymentProcessor.get_payment_history",
|
|
"PaymentProcessor._generate_complex_hash",
|
|
],
|
|
"issues_found": [
|
|
{"severity": "high", "description": "O(n²) algorithm in payment history"},
|
|
{"severity": "medium", "description": "Over-engineered hash generation"},
|
|
{"severity": "medium", "description": "Unbounded failed_payments list"},
|
|
],
|
|
"confidence": "high",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response3:
|
|
self.logger.error("Failed to continue to step 3")
|
|
return False
|
|
|
|
response3_data = self._parse_review_response(response3)
|
|
|
|
# Validate step 3 - should still use reference_only
|
|
file_context3 = response3_data.get("file_context", {})
|
|
if file_context3.get("type") != "reference_only":
|
|
self.logger.error("Step 3 should use reference_only file context")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Step 3: reference_only file context")
|
|
|
|
# Step 4: Final comprehensive analysis
|
|
self.logger.info(" 1.6.4: Step 4 - Final comprehensive analysis")
|
|
response4, _ = self.call_mcp_tool(
|
|
"codereview",
|
|
{
|
|
"step": "Code review complete - comprehensive analysis of all security, performance, and quality issues",
|
|
"step_number": 4,
|
|
"total_steps": 4,
|
|
"next_step_required": False, # Final step - should embed files
|
|
"continuation_id": continuation_id,
|
|
"findings": "Complete review: identified critical security vulnerabilities, performance bottlenecks, over-engineering patterns, and maintainability concerns across payment and configuration modules.",
|
|
"files_checked": files_to_review,
|
|
"relevant_files": files_to_review,
|
|
"relevant_context": ["PaymentProcessor.process_payment", "ConfigurationManager.get_config"],
|
|
"issues_found": [
|
|
{"severity": "critical", "description": "Multiple hardcoded secrets"},
|
|
{"severity": "high", "description": "Performance and security issues in payment processing"},
|
|
{"severity": "medium", "description": "Over-engineered architecture patterns"},
|
|
],
|
|
"confidence": "high",
|
|
"model": "flash",
|
|
},
|
|
)
|
|
|
|
if not response4:
|
|
self.logger.error("Failed to complete to final step")
|
|
return False
|
|
|
|
response4_data = self._parse_review_response(response4)
|
|
|
|
# Validate step 4 - should use fully_embedded for expert analysis
|
|
file_context4 = response4_data.get("file_context", {})
|
|
if file_context4.get("type") != "fully_embedded":
|
|
self.logger.error("Step 4 (final) should use fully_embedded file context")
|
|
return False
|
|
|
|
if "expert analysis" not in file_context4.get("context_optimization", "").lower():
|
|
self.logger.error("Final step should mention expert analysis in context optimization")
|
|
return False
|
|
|
|
# Verify expert analysis was triggered
|
|
if response4_data.get("status") != "calling_expert_analysis":
|
|
self.logger.error("Final step should trigger expert analysis")
|
|
return False
|
|
|
|
# Check that expert analysis has content
|
|
expert_analysis = response4_data.get("expert_analysis", {})
|
|
if not expert_analysis:
|
|
self.logger.error("Expert analysis should be present in final step")
|
|
return False
|
|
|
|
self.logger.info(" ✅ Step 4: fully_embedded file context with expert analysis")
|
|
|
|
# Validate the complete workflow progression
|
|
progression_summary = {
|
|
"step_1": "reference_only (new conversation, intermediate)",
|
|
"step_2": "reference_only (continuation, intermediate)",
|
|
"step_3": "reference_only (continuation, intermediate)",
|
|
"step_4": "fully_embedded (continuation, final)",
|
|
}
|
|
|
|
self.logger.info(" 📋 File context progression:")
|
|
for step, context_type in progression_summary.items():
|
|
self.logger.info(f" {step}: {context_type}")
|
|
|
|
self.logger.info(" ✅ Multi-step file context optimization test completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Multi-step file context test failed: {e}")
|
|
return False
|
|
|
|
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
|
|
"""Call an MCP tool in-process - override for codereview-specific response handling"""
|
|
# Use in-process implementation to maintain conversation memory
|
|
response_text, _ = self.call_mcp_tool_direct(tool_name, params)
|
|
|
|
if not response_text:
|
|
return None, None
|
|
|
|
# Extract continuation_id from codereview response specifically
|
|
continuation_id = self._extract_review_continuation_id(response_text)
|
|
|
|
return response_text, continuation_id
|
|
|
|
def _extract_review_continuation_id(self, response_text: str) -> Optional[str]:
|
|
"""Extract continuation_id from codereview response"""
|
|
try:
|
|
# Parse the response
|
|
response_data = json.loads(response_text)
|
|
return response_data.get("continuation_id")
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.debug(f"Failed to parse response for review continuation_id: {e}")
|
|
return None
|
|
|
|
def _parse_review_response(self, response_text: str) -> dict:
|
|
"""Parse codereview tool JSON response"""
|
|
try:
|
|
# Parse the response - it should be direct JSON
|
|
return json.loads(response_text)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse review response as JSON: {e}")
|
|
self.logger.error(f"Response text: {response_text[:500]}...")
|
|
return {}
|
|
|
|
def _validate_step_response(
|
|
self,
|
|
response_data: dict,
|
|
expected_step: int,
|
|
expected_total: int,
|
|
expected_next_required: bool,
|
|
expected_status: str,
|
|
) -> bool:
|
|
"""Validate a codereview step response structure"""
|
|
try:
|
|
# Check status
|
|
if response_data.get("status") != expected_status:
|
|
self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
|
|
return False
|
|
|
|
# Check step number
|
|
if response_data.get("step_number") != expected_step:
|
|
self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
|
|
return False
|
|
|
|
# Check total steps
|
|
if response_data.get("total_steps") != expected_total:
|
|
self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
|
|
return False
|
|
|
|
# Check next_step_required
|
|
if response_data.get("next_step_required") != expected_next_required:
|
|
self.logger.error(
|
|
f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
|
|
)
|
|
return False
|
|
|
|
# Check code_review_status exists
|
|
if "code_review_status" not in response_data:
|
|
self.logger.error("Missing code_review_status in response")
|
|
return False
|
|
|
|
# Check next_steps guidance
|
|
if not response_data.get("next_steps"):
|
|
self.logger.error("Missing next_steps guidance in response")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error validating step response: {e}")
|
|
return False
|