Re-imagined and re-written Debug tool. Instead of prompting Claude to perform initial analysis (and hoping it did), the tool now works through the debug process as an 'investigation', encouraging Claud to gather its 'findings' / 'hypothesis', stepping back as needed, collecting files it's gone through and keeping track of files relevant to the issue at hand. This structured investiion is then passed to the other model with far greater insight than the original debug tool ever could.

Improved prompts, guard against overengineering and flag that as an antipattern
This commit is contained in:
Fahad
2025-06-19 10:22:30 +04:00
parent 2641c78f8d
commit fccfb0d999
16 changed files with 2243 additions and 707 deletions

View File

@@ -1,21 +1,23 @@
#!/usr/bin/env python3
"""
Debug Tool Validation Test
Debug Tool Self-Investigation Validation Test
Tests the debug tool with real bugs to validate:
- Proper execution with flash model
- Actual bug identification and analysis
- Hypothesis generation for root causes
- Log validation for tool execution
Tests the debug tool's systematic self-investigation capabilities including:
- Step-by-step investigation with proper JSON responses
- Progressive tracking of findings, files, and methods
- Hypothesis formation and confidence tracking
- Backtracking and revision capabilities
- Final expert analysis after investigation completion
"""
import json
from typing import Optional
from .base_test import BaseSimulatorTest
class DebugValidationTest(BaseSimulatorTest):
"""Test debug tool with actual bug scenarios"""
"""Test debug tool's self-investigation and expert analysis features"""
@property
def test_name(self) -> str:
@@ -23,23 +25,48 @@ class DebugValidationTest(BaseSimulatorTest):
@property
def test_description(self) -> str:
return "Debug tool validation with actual bugs"
return "Debug tool self-investigation pattern validation"
def run_test(self) -> bool:
"""Test debug tool with real bugs"""
"""Test debug tool self-investigation capabilities"""
try:
self.logger.info("Test: Debug tool validation")
self.logger.info("Test: Debug tool self-investigation validation")
# Setup test files directory first
self.setup_test_files()
# Create a Python file with a subtle but realistic bug
buggy_code = """#!/usr/bin/env python3
self._create_buggy_code()
# Test 1: Single investigation session with multiple steps
if not self._test_single_investigation_session():
return False
# Test 2: Investigation with backtracking
if not self._test_investigation_with_backtracking():
return False
# Test 3: Complete investigation with expert analysis
if not self._test_complete_investigation_with_analysis():
return False
self.logger.info(" ✅ All debug validation tests passed")
return True
except Exception as e:
self.logger.error(f"Debug validation test failed: {e}")
return False
finally:
self.cleanup_test_files()
def _create_buggy_code(self):
"""Create test files with a subtle bug for debugging"""
# Create a Python file with dictionary iteration bug
buggy_code = """#!/usr/bin/env python3
import json
import requests
from datetime import datetime, timedelta
class UserSessionManager:
class SessionManager:
def __init__(self):
self.active_sessions = {}
self.session_timeout = 30 * 60 # 30 minutes in seconds
@@ -52,7 +79,6 @@ class UserSessionManager:
'user_id': user_id,
'user_data': user_data,
'created_at': datetime.now(),
'last_activity': datetime.now(),
'expires_at': datetime.now() + timedelta(seconds=self.session_timeout)
}
@@ -72,322 +98,356 @@ class UserSessionManager:
del self.active_sessions[session_id]
return False
# Update last activity
session['last_activity'] = current_time
return True
def cleanup_expired_sessions(self):
\"\"\"Remove expired sessions from memory\"\"\"
current_time = datetime.now()
expired_sessions = []
expired_count = 0
# BUG: Modifying dictionary while iterating over it
for session_id, session in self.active_sessions.items():
if current_time > session['expires_at']:
expired_sessions.append(session_id)
del self.active_sessions[session_id] # This causes RuntimeError
expired_count += 1
for session_id in expired_sessions:
del self.active_sessions[session_id]
return len(expired_sessions)
class APIHandler:
def __init__(self):
self.session_manager = UserSessionManager()
self.request_count = 0
def authenticate_user(self, username, password):
\"\"\"Authenticate user and create session\"\"\"
# Simulate API call to auth service
auth_response = self._call_auth_service(username, password)
if auth_response.get('success'):
user_data = auth_response.get('user_data', {})
session_id = self.session_manager.create_session(
user_data['id'], user_data
)
return {'success': True, 'session_id': session_id}
return {'success': False, 'error': 'Authentication failed'}
def process_request(self, session_id, request_data):
\"\"\"Process an API request with session validation\"\"\"
self.request_count += 1
# Validate session before processing
if not self.session_manager.validate_session(session_id):
return {'error': 'Invalid or expired session', 'code': 401}
# Simulate request processing
try:
result = self._process_business_logic(request_data)
return {'success': True, 'data': result}
except Exception as e:
return {'error': str(e), 'code': 500}
def _call_auth_service(self, username, password):
\"\"\"Simulate external authentication service call\"\"\"
# Simulate network delay and response
import time
time.sleep(0.1)
# Mock successful authentication
if username and password:
return {
'success': True,
'user_data': {
'id': hash(username) % 10000,
'username': username,
'roles': ['user']
}
}
return {'success': False}
def _process_business_logic(self, request_data):
\"\"\"Simulate business logic processing\"\"\"
if not request_data:
raise ValueError("Invalid request data")
# Simulate some processing
return {
'processed_at': datetime.now().isoformat(),
'request_id': self.request_count,
'status': 'completed'
}
# Global API handler instance
api_handler = APIHandler()
def handle_api_request(session_id, request_data):
\"\"\"Main API request handler\"\"\"
return api_handler.process_request(session_id, request_data)
return expired_count
"""
# Create test file with subtle bug
test_file = self.create_additional_test_file("session_manager.py", buggy_code)
self.logger.info(f" ✅ Created test file with subtle bug: {test_file}")
# Create test file with subtle bug
self.buggy_file = self.create_additional_test_file("session_manager.py", buggy_code)
self.logger.info(f" ✅ Created test file with subtle bug: {self.buggy_file}")
# Create a realistic problem description with subtle symptoms
error_description = """ISSUE DESCRIPTION:
Our API service is experiencing intermittent session validation failures in production.
# Create error description
error_description = """ISSUE DESCRIPTION:
Our session management system is experiencing intermittent failures during cleanup operations.
SYMPTOMS OBSERVED:
- Users randomly get "Invalid or expired session" errors even with valid sessions
- The issue happens more frequently during high-traffic periods
- Sessions that should still be valid (created < 30 minutes ago) are being rejected
- The problem occurs maybe 2-3% of requests but is hard to reproduce consistently
- Server logs show session validation failing but no clear pattern
SYMPTOMS:
- Random RuntimeError: dictionary changed size during iteration
- Occurs during high load when many sessions expire simultaneously
- Error happens in cleanup_expired_sessions method
- Affects about 5% of cleanup operations
ENVIRONMENT:
- Python 3.13 API service
- Running in production with multiple concurrent users
- Redis not used for session storage (in-memory only)
- Load balancer distributes requests across multiple instances
ERROR LOG:
RuntimeError: dictionary changed size during iteration
File "session_manager.py", line 44, in cleanup_expired_sessions
for session_id, session in self.active_sessions.items():
"""
RECENT CHANGES:
- Increased session timeout from 15 to 30 minutes last week
- Added cleanup routine to remove expired sessions
- No major code changes to session management
self.error_file = self.create_additional_test_file("error_description.txt", error_description)
self.logger.info(f" ✅ Created error description file: {self.error_file}")
USER IMPACT:
- Users have to re-authenticate randomly
- Affects user experience and causes complaints
- Seems to happen more on busy days
def _test_single_investigation_session(self) -> bool:
"""Test a complete investigation session with multiple steps"""
try:
self.logger.info(" 1.1: Testing single investigation session")
The code looks correct to me, but something is causing valid sessions to be treated as expired or invalid. I'm not sure what's causing this intermittent behavior."""
error_file = self.create_additional_test_file("error_description.txt", error_description)
self.logger.info(f" ✅ Created error description file: {error_file}")
# Call debug tool with flash model and realistic problem description
self.logger.info(" 🔍 Calling debug tool to investigate session validation issues...")
response, continuation_id = self.call_mcp_tool(
# Step 1: Start investigation
self.logger.info(" 1.1.1: Step 1 - Initial investigation")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"prompt": "Investigate why our API is experiencing intermittent session validation failures in production",
"files": [test_file, error_file],
"findings": "Users getting 'Invalid or expired session' errors randomly, occurs more during high traffic, sessions should still be valid",
"error_context": "Sessions created < 30 minutes ago being rejected, happens ~2-3% of requests, load balanced environment",
"systematic_investigation": True,
"model": "flash",
"thinking_mode": "medium",
"step": "I need to investigate intermittent RuntimeError during session cleanup. Let me start by examining the error description and understanding the symptoms.",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "RuntimeError occurs during dictionary iteration in cleanup_expired_sessions method. Error happens intermittently during high load.",
"files_checked": [self.error_file],
"relevant_files": [self.error_file],
},
)
if not response:
self.logger.error("Failed to get debug response")
if not response1 or not continuation_id:
self.logger.error("Failed to get initial investigation response")
return False
self.logger.info(" ✅ Got debug response")
# Parse and validate JSON response
response1_data = self._parse_debug_response(response1)
if not response1_data:
return False
# Parse response to validate bug identification
try:
response_data = json.loads(response)
self.logger.debug(f"Response keys: {list(response_data.keys())}")
# Validate step 1 response structure
if not self._validate_step_response(response1_data, 1, 4, True, "investigation_in_progress"):
return False
# Extract the actual content if it's wrapped
if "content" in response_data:
content = response_data["content"]
# Handle markdown JSON blocks
if content.startswith("```json"):
content = content[7:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
self.logger.info(f" ✅ Step 1 successful, continuation_id: {continuation_id}")
# Parse the inner JSON
inner_data = json.loads(content)
self.logger.debug(f"Inner data keys: {list(inner_data.keys())}")
else:
inner_data = response_data
# Step 2: Examine the code
self.logger.info(" 1.1.2: Step 2 - Code examination")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Now examining the session_manager.py file to understand the cleanup_expired_sessions implementation and identify the root cause.",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Found the issue: cleanup_expired_sessions modifies self.active_sessions dictionary while iterating over it with .items(). This causes RuntimeError when del is called during iteration.",
"files_checked": [self.error_file, self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_methods": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary is being modified during iteration causing RuntimeError",
"confidence": "high",
"continuation_id": continuation_id,
},
)
# Check for structured debug analysis (should have analysis_complete status)
if inner_data.get("status") == "analysis_complete":
self.logger.info(" ✅ Got structured debug analysis")
if not response2:
self.logger.error("Failed to continue investigation to step 2")
return False
# Validate hypothesis generation
hypotheses = inner_data.get("hypotheses", [])
if not hypotheses:
self.logger.error("No hypotheses found in debug analysis")
return False
response2_data = self._parse_debug_response(response2)
if not self._validate_step_response(response2_data, 2, 4, True, "investigation_in_progress"):
return False
self.logger.info(f" 🧠 Found {len(hypotheses)} hypotheses")
# Check investigation status tracking
investigation_status = response2_data.get("investigation_status", {})
if investigation_status.get("files_checked", 0) < 2:
self.logger.error("Files checked count not properly tracked")
return False
# Check if the model identified the real bug: dictionary modification during iteration
analysis_text = json.dumps(inner_data).lower()
if investigation_status.get("relevant_methods", 0) != 1:
self.logger.error("Relevant methods not properly tracked")
return False
# Look for the actual bug - modifying dictionary while iterating
bug_indicators = [
"dictionary",
"iteration",
"modify",
"concurrent",
"runtime error",
"dictionary changed size during iteration",
"cleanup_expired_sessions",
"active_sessions",
"del",
"removing while iterating",
]
if investigation_status.get("current_confidence") != "high":
self.logger.error("Confidence level not properly tracked")
return False
found_indicators = [indicator for indicator in bug_indicators if indicator in analysis_text]
self.logger.info(" ✅ Step 2 successful with proper tracking")
# Check for specific mentions of the problematic pattern
dictionary_bug_patterns = [
"modifying dictionary while iterating",
"dictionary changed size",
"concurrent modification",
"iterating over dictionary",
"del.*active_sessions",
"cleanup.*iteration",
]
# Step 3: Validate hypothesis
self.logger.info(" 1.1.3: Step 3 - Hypothesis validation")
response3, _ = self.call_mcp_tool(
"debug",
{
"step": "Confirming the bug pattern: the for loop iterates over self.active_sessions.items() while del self.active_sessions[session_id] modifies the dictionary inside the loop.",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"findings": "Confirmed: Line 44-47 shows classic dictionary modification during iteration bug. The fix would be to collect expired session IDs first, then delete them after iteration completes.",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_methods": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary modification during iteration in cleanup_expired_sessions causes RuntimeError",
"confidence": "high",
"continuation_id": continuation_id,
},
)
import re
if not response3:
self.logger.error("Failed to continue investigation to step 3")
return False
pattern_matches = []
for pattern in dictionary_bug_patterns:
if re.search(pattern, analysis_text):
pattern_matches.append(pattern)
response3_data = self._parse_debug_response(response3)
if not self._validate_step_response(response3_data, 3, 4, True, "investigation_in_progress"):
return False
if len(found_indicators) >= 3 or len(pattern_matches) >= 1:
self.logger.info(" ✅ Flash identified the dictionary iteration bug")
self.logger.info(f" Found indicators: {found_indicators[:3]}")
if pattern_matches:
self.logger.info(f" Pattern matches: {pattern_matches}")
else:
self.logger.error(" ❌ Flash missed the dictionary iteration bug")
self.logger.error(f" Found only: {found_indicators}")
return False
self.logger.info(" ✅ Investigation session progressing successfully")
# Validate hypothesis quality (should have confidence levels and reasoning)
valid_hypotheses = 0
for i, hypothesis in enumerate(hypotheses[:3]): # Check top 3
confidence = hypothesis.get("confidence", "").lower()
reasoning = hypothesis.get("reasoning", "")
# Store continuation_id for next test
self.investigation_continuation_id = continuation_id
return True
if confidence in ["high", "medium", "low"] and len(reasoning) > 20:
valid_hypotheses += 1
self.logger.debug(f" Hypothesis {i+1}: {confidence} confidence, good reasoning")
else:
self.logger.debug(f" Hypothesis {i+1}: weak ({confidence}, {len(reasoning)} chars)")
except Exception as e:
self.logger.error(f"Single investigation session test failed: {e}")
return False
if valid_hypotheses >= 2:
self.logger.info(f" ✅ Found {valid_hypotheses} well-structured hypotheses")
else:
self.logger.error(f" ❌ Only {valid_hypotheses} well-structured hypotheses")
return False
def _test_investigation_with_backtracking(self) -> bool:
"""Test investigation with backtracking to revise findings"""
try:
self.logger.info(" 1.2: Testing investigation with backtracking")
# Check for line-specific references
if "line" in analysis_text or "lines" in analysis_text:
self.logger.info(" 📍 Analysis includes line-specific references")
else:
self.logger.warning(" ⚠️ No line-specific references found")
# Start a new investigation for testing backtracking
self.logger.info(" 1.2.1: Start investigation for backtracking test")
response1, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Investigating performance degradation in data processing pipeline",
"step_number": 1,
"total_steps": 4,
"next_step_required": True,
"findings": "Initial analysis shows slow database queries",
"files_checked": ["/db/queries.py"],
"relevant_files": ["/db/queries.py"],
},
)
else:
# Non-structured response - check for dictionary iteration bug identification
self.logger.info(" 📝 Got general debug response")
if not response1 or not continuation_id:
self.logger.error("Failed to start backtracking test investigation")
return False
response_text = response.lower()
# Step 2: Wrong direction
self.logger.info(" 1.2.2: Step 2 - Wrong investigation path")
response2, _ = self.call_mcp_tool(
"debug",
{
"step": "Focusing on database optimization strategies",
"step_number": 2,
"total_steps": 4,
"next_step_required": True,
"findings": "Database queries seem optimized, might be looking in wrong place",
"files_checked": ["/db/queries.py", "/db/indexes.py"],
"relevant_files": [],
"hypothesis": "Database performance issues",
"confidence": "low",
"continuation_id": continuation_id,
},
)
# Check for the specific bug in general response
bug_indicators = [
"dictionary",
"iteration",
"modify",
"concurrent",
"active_sessions",
"cleanup",
"del ",
"removing",
"changed size",
]
if not response2:
self.logger.error("Failed to continue to step 2")
return False
found_indicators = [indicator for indicator in bug_indicators if indicator in response_text]
# Step 3: Backtrack from step 2
self.logger.info(" 1.2.3: Step 3 - Backtrack and revise approach")
response3, _ = self.call_mcp_tool(
"debug",
{
"step": "Backtracking - the issue might not be database related. Let me investigate the data processing algorithm instead.",
"step_number": 3,
"total_steps": 4,
"next_step_required": True,
"findings": "Found inefficient nested loops in data processor causing O(n²) complexity",
"files_checked": ["/processor/algorithm.py"],
"relevant_files": ["/processor/algorithm.py"],
"relevant_methods": ["DataProcessor.process_batch"],
"hypothesis": "Inefficient algorithm causing performance issues",
"confidence": "medium",
"backtrack_from_step": 2, # Backtrack from step 2
"continuation_id": continuation_id,
},
)
if len(found_indicators) >= 3:
self.logger.info(f" ✅ Found {len(found_indicators)} relevant indicators in response")
self.logger.info(f" Found: {found_indicators}")
else:
self.logger.error(f" ❌ Only found {len(found_indicators)} relevant indicators")
self.logger.error(f" Found: {found_indicators}")
return False
if not response3:
self.logger.error("Failed to backtrack")
return False
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse debug response as JSON: {e}")
# For non-JSON responses, check for dictionary iteration bug
response_text = response.lower()
response3_data = self._parse_debug_response(response3)
if not self._validate_step_response(response3_data, 3, 4, True, "investigation_in_progress"):
return False
bug_indicators = [
"dictionary",
"iteration",
"modify",
"concurrent",
"active_sessions",
"cleanup",
"del ",
"removing",
]
self.logger.info(" ✅ Backtracking working correctly")
return True
found_indicators = [indicator for indicator in bug_indicators if indicator in response_text]
except Exception as e:
self.logger.error(f"Backtracking test failed: {e}")
return False
if len(found_indicators) >= 3:
self.logger.info(f"Text response found {len(found_indicators)} relevant indicators")
else:
self.logger.error(f" ❌ Text response only found {len(found_indicators)} relevant indicators")
def _test_complete_investigation_with_analysis(self) -> bool:
"""Test complete investigation ending with expert analysis"""
try:
self.logger.info(" 1.3: Testing complete investigation with expert analysis")
# Use the continuation from first test
continuation_id = getattr(self, "investigation_continuation_id", None)
if not continuation_id:
# Start fresh if no continuation available
self.logger.info(" 1.3.0: Starting fresh investigation")
response0, continuation_id = self.call_mcp_tool(
"debug",
{
"step": "Investigating the dictionary iteration bug in session cleanup",
"step_number": 1,
"total_steps": 2,
"next_step_required": True,
"findings": "Found dictionary modification during iteration",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_methods": ["SessionManager.cleanup_expired_sessions"],
},
)
if not response0 or not continuation_id:
self.logger.error("Failed to start fresh investigation")
return False
# Final step - trigger expert analysis
self.logger.info(" 1.3.1: Final step - complete investigation")
response_final, _ = self.call_mcp_tool(
"debug",
{
"step": "Investigation complete. The root cause is confirmed: cleanup_expired_sessions modifies the dictionary while iterating, causing RuntimeError.",
"step_number": 2,
"total_steps": 2,
"next_step_required": False, # Final step - triggers expert analysis
"findings": "Root cause identified: del self.active_sessions[session_id] on line 46 modifies dictionary during iteration starting at line 44. Fix: collect expired IDs first, then delete.",
"files_checked": [self.buggy_file],
"relevant_files": [self.buggy_file],
"relevant_methods": ["SessionManager.cleanup_expired_sessions"],
"hypothesis": "Dictionary modification during iteration causes RuntimeError in cleanup_expired_sessions",
"confidence": "high",
"continuation_id": continuation_id,
"model": "flash", # Use flash for expert analysis
},
)
if not response_final:
self.logger.error("Failed to complete investigation")
return False
response_final_data = self._parse_debug_response(response_final)
if not response_final_data:
return False
# Validate final response structure
if response_final_data.get("status") != "calling_expert_analysis":
self.logger.error(
f"Expected status 'calling_expert_analysis', got '{response_final_data.get('status')}'"
)
return False
if not response_final_data.get("investigation_complete"):
self.logger.error("Expected investigation_complete=true for final step")
return False
# Check for expert analysis
if "expert_analysis" not in response_final_data:
self.logger.error("Missing expert_analysis in final response")
return False
expert_analysis = response_final_data.get("expert_analysis", {})
# Check for expected analysis content (checking common patterns)
analysis_text = json.dumps(expert_analysis).lower()
# Look for bug identification
bug_indicators = ["dictionary", "iteration", "modify", "runtime", "error", "del"]
found_indicators = sum(1 for indicator in bug_indicators if indicator in analysis_text)
if found_indicators >= 3:
self.logger.info(" ✅ Expert analysis identified the bug correctly")
else:
self.logger.warning(
f" ⚠️ Expert analysis may not have fully identified the bug (found {found_indicators}/6 indicators)"
)
# Check complete investigation summary
if "complete_investigation" not in response_final_data:
self.logger.error("Missing complete_investigation in final response")
return False
complete_investigation = response_final_data["complete_investigation"]
if not complete_investigation.get("relevant_methods"):
self.logger.error("Missing relevant methods in complete investigation")
return False
if "SessionManager.cleanup_expired_sessions" not in complete_investigation["relevant_methods"]:
self.logger.error("Expected method not found in investigation summary")
return False
self.logger.info(" ✅ Complete investigation with expert analysis successful")
# Validate logs
self.logger.info(" 📋 Validating execution logs...")
# Get server logs using inherited method
# Get server logs
logs = self.get_recent_server_logs(500)
# Look for debug tool execution patterns
debug_patterns = [
"debug tool",
"[DEBUG]",
"systematic investigation",
"Token budget",
"Essential files for debugging",
"investigation",
"Expert analysis",
"calling_expert_analysis",
]
patterns_found = 0
@@ -396,34 +456,101 @@ The code looks correct to me, but something is causing valid sessions to be trea
patterns_found += 1
self.logger.debug(f" ✅ Found log pattern: {pattern}")
if patterns_found >= 3:
if patterns_found >= 2:
self.logger.info(f" ✅ Log validation passed ({patterns_found}/{len(debug_patterns)} patterns)")
else:
self.logger.warning(f" ⚠️ Only found {patterns_found}/{len(debug_patterns)} log patterns")
# Test continuation if available
if continuation_id:
self.logger.info(" 🔄 Testing debug continuation...")
follow_up_response, _ = self.call_mcp_tool(
"debug",
{
"prompt": "Based on your analysis, which bug should we fix first and how?",
"continuation_id": continuation_id,
"model": "flash",
},
)
if follow_up_response:
self.logger.info(" ✅ Debug continuation worked")
else:
self.logger.warning(" ⚠️ Debug continuation failed")
self.logger.info(" ✅ Debug tool validation completed successfully")
return True
except Exception as e:
self.logger.error(f"Debug validation test failed: {e}")
self.logger.error(f"Complete investigation test failed: {e}")
return False
def call_mcp_tool(self, tool_name: str, params: dict) -> tuple[Optional[str], Optional[str]]:
"""Call an MCP tool via standalone server - override for debug-specific response handling"""
# Use parent implementation to get the raw response
response_text, _ = super().call_mcp_tool(tool_name, params)
if not response_text:
return None, None
# Extract continuation_id from debug response specifically
continuation_id = self._extract_debug_continuation_id(response_text)
return response_text, continuation_id
def _extract_debug_continuation_id(self, response_text: str) -> Optional[str]:
"""Extract continuation_id from debug response"""
try:
# Parse the response
response_data = json.loads(response_text)
return response_data.get("continuation_id")
except json.JSONDecodeError as e:
self.logger.debug(f"Failed to parse response for debug continuation_id: {e}")
return None
def _parse_debug_response(self, response_text: str) -> dict:
"""Parse debug tool JSON response"""
try:
# Parse the response - it should be direct JSON
return json.loads(response_text)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse debug response as JSON: {e}")
self.logger.error(f"Response text: {response_text[:500]}...")
return {}
def _validate_step_response(
self,
response_data: dict,
expected_step: int,
expected_total: int,
expected_next_required: bool,
expected_status: str,
) -> bool:
"""Validate a debug investigation step response structure"""
try:
# Check status
if response_data.get("status") != expected_status:
self.logger.error(f"Expected status '{expected_status}', got '{response_data.get('status')}'")
return False
# Check step number
if response_data.get("step_number") != expected_step:
self.logger.error(f"Expected step_number {expected_step}, got {response_data.get('step_number')}")
return False
# Check total steps
if response_data.get("total_steps") != expected_total:
self.logger.error(f"Expected total_steps {expected_total}, got {response_data.get('total_steps')}")
return False
# Check next_step_required
if response_data.get("next_step_required") != expected_next_required:
self.logger.error(
f"Expected next_step_required {expected_next_required}, got {response_data.get('next_step_required')}"
)
return False
# Check investigation_status exists
if "investigation_status" not in response_data:
self.logger.error("Missing investigation_status in response")
return False
# Check output guidance exists
if "output" not in response_data:
self.logger.error("Missing output guidance in response")
return False
# Check next_steps guidance
if not response_data.get("next_steps"):
self.logger.error("Missing next_steps guidance in response")
return False
return True
except Exception as e:
self.logger.error(f"Error validating step response: {e}")
return False
finally:
self.cleanup_test_files()