595 lines
28 KiB
Python
595 lines
28 KiB
Python
"""
|
|
AnalyzeWorkflow tool - Step-by-step code analysis with systematic investigation
|
|
|
|
This tool provides a structured workflow for comprehensive code and file analysis.
|
|
It guides the CLI agent through systematic investigation steps with forced pauses between each step
|
|
to ensure thorough code examination, pattern identification, and architectural assessment before proceeding.
|
|
The tool supports complex analysis scenarios including architectural review, performance analysis,
|
|
security assessment, and maintainability evaluation.
|
|
|
|
Key features:
|
|
- Step-by-step analysis workflow with progress tracking
|
|
- Context-aware file embedding (references during investigation, full content for analysis)
|
|
- Automatic pattern and insight tracking with categorization
|
|
- Expert analysis integration with external models
|
|
- Support for focused analysis (architecture, performance, security, quality)
|
|
- Confidence-based workflow optimization
|
|
"""
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any, Literal, Optional
|
|
|
|
from pydantic import Field, model_validator
|
|
|
|
if TYPE_CHECKING:
|
|
from tools.models import ToolModelCategory
|
|
|
|
from config import TEMPERATURE_ANALYTICAL
|
|
from systemprompts import ANALYZE_PROMPT
|
|
from tools.shared.base_models import WorkflowRequest
|
|
|
|
from .workflow.base import WorkflowTool
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Tool-specific field descriptions for analyze workflow
|
|
ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS = {
|
|
"step": (
|
|
"The analysis plan. Step 1: State your strategy, including how you will map the codebase structure, "
|
|
"understand business logic, and assess code quality, performance implications, and architectural patterns. "
|
|
"Later steps: Report findings and adapt the approach as new insights emerge."
|
|
),
|
|
"step_number": (
|
|
"The index of the current step in the analysis sequence, beginning at 1. Each step should build upon or "
|
|
"revise the previous one."
|
|
),
|
|
"total_steps": (
|
|
"Your current estimate for how many steps will be needed to complete the analysis. "
|
|
"Adjust as new findings emerge."
|
|
),
|
|
"next_step_required": (
|
|
"Set to true if you plan to continue the investigation with another step. False means you believe the "
|
|
"analysis is complete and ready for expert validation."
|
|
),
|
|
"findings": (
|
|
"Summary of discoveries from this step, including architectural patterns, tech stack assessment, scalability characteristics, "
|
|
"performance implications, maintainability factors, and strategic improvement opportunities. "
|
|
"IMPORTANT: Document both strengths (good patterns, solid architecture) and concerns (tech debt, overengineering, unnecessary complexity). "
|
|
"In later steps, confirm or update past findings with additional evidence."
|
|
),
|
|
"files_checked": (
|
|
"List all files examined (absolute paths). Include even ruled-out files to track exploration path."
|
|
),
|
|
"relevant_files": (
|
|
"Subset of files_checked directly relevant to analysis findings (absolute paths). Include files with "
|
|
"significant patterns, architectural decisions, or strategic improvement opportunities."
|
|
),
|
|
"relevant_context": (
|
|
"List methods/functions central to analysis findings, in 'ClassName.methodName' or 'functionName' format. "
|
|
"Prioritize those demonstrating key patterns, architectural decisions, or improvement opportunities."
|
|
),
|
|
"backtrack_from_step": ("If an earlier finding needs revision, specify the step number to backtrack from."),
|
|
"images": (
|
|
"Optional absolute paths to architecture diagrams or visual references that help with analysis context."
|
|
),
|
|
"confidence": (
|
|
"Your confidence in the analysis: exploring, low, medium, high, very_high, almost_certain, or certain. "
|
|
"'certain' indicates the analysis is complete and ready for validation."
|
|
),
|
|
"analysis_type": "Type of analysis to perform (architecture, performance, security, quality, general)",
|
|
"output_format": "How to format the output (summary, detailed, actionable)",
|
|
}
|
|
|
|
|
|
class AnalyzeWorkflowRequest(WorkflowRequest):
|
|
"""Request model for analyze workflow investigation steps"""
|
|
|
|
# Required fields for each investigation step
|
|
step: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"])
|
|
step_number: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"])
|
|
total_steps: int = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"])
|
|
next_step_required: bool = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"])
|
|
|
|
# Investigation tracking fields
|
|
findings: str = Field(..., description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"])
|
|
files_checked: list[str] = Field(
|
|
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"]
|
|
)
|
|
relevant_files: list[str] = Field(
|
|
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"]
|
|
)
|
|
relevant_context: list[str] = Field(
|
|
default_factory=list, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_context"]
|
|
)
|
|
|
|
# Issues found during analysis (structured with severity)
|
|
issues_found: list[dict] = Field(
|
|
default_factory=list,
|
|
description="Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
|
|
)
|
|
|
|
# Optional backtracking field
|
|
backtrack_from_step: Optional[int] = Field(
|
|
None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["backtrack_from_step"]
|
|
)
|
|
|
|
# Optional images for visual context
|
|
images: Optional[list[str]] = Field(default=None, description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"])
|
|
|
|
# Analyze-specific fields (only used in step 1 to initialize)
|
|
# Note: Use relevant_files field instead of files for consistency across workflow tools
|
|
analysis_type: Optional[Literal["architecture", "performance", "security", "quality", "general"]] = Field(
|
|
"general", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"]
|
|
)
|
|
output_format: Optional[Literal["summary", "detailed", "actionable"]] = Field(
|
|
"detailed", description=ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"]
|
|
)
|
|
|
|
# Keep thinking_mode from original analyze tool; temperature is inherited from WorkflowRequest
|
|
|
|
@model_validator(mode="after")
|
|
def validate_step_one_requirements(self):
|
|
"""Ensure step 1 has required relevant_files."""
|
|
if self.step_number == 1:
|
|
if not self.relevant_files:
|
|
raise ValueError("Step 1 requires 'relevant_files' field to specify files or directories to analyze")
|
|
return self
|
|
|
|
|
|
class AnalyzeTool(WorkflowTool):
|
|
"""
|
|
Analyze workflow tool for step-by-step code analysis and expert validation.
|
|
|
|
This tool implements a structured analysis workflow that guides users through
|
|
methodical investigation steps, ensuring thorough code examination, pattern identification,
|
|
and architectural assessment before reaching conclusions. It supports complex analysis scenarios
|
|
including architectural review, performance analysis, security assessment, and maintainability evaluation.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.initial_request = None
|
|
self.analysis_config = {}
|
|
|
|
def get_name(self) -> str:
|
|
return "analyze"
|
|
|
|
def get_description(self) -> str:
|
|
return (
|
|
"Performs comprehensive code analysis with systematic investigation and expert validation. "
|
|
"Use for architecture, performance, maintainability, and pattern analysis. "
|
|
"Guides through structured code review and strategic planning."
|
|
)
|
|
|
|
def get_system_prompt(self) -> str:
|
|
return ANALYZE_PROMPT
|
|
|
|
def get_default_temperature(self) -> float:
|
|
return TEMPERATURE_ANALYTICAL
|
|
|
|
def get_model_category(self) -> "ToolModelCategory":
|
|
"""Analyze workflow requires thorough analysis and reasoning"""
|
|
from tools.models import ToolModelCategory
|
|
|
|
return ToolModelCategory.EXTENDED_REASONING
|
|
|
|
def get_workflow_request_model(self):
|
|
"""Return the analyze workflow-specific request model."""
|
|
return AnalyzeWorkflowRequest
|
|
|
|
def get_input_schema(self) -> dict[str, Any]:
|
|
"""Generate input schema using WorkflowSchemaBuilder with analyze-specific overrides."""
|
|
from .workflow.schema_builders import WorkflowSchemaBuilder
|
|
|
|
# Fields to exclude from analyze workflow (inherited from WorkflowRequest but not used)
|
|
excluded_fields = {"hypothesis", "confidence"}
|
|
|
|
# Analyze workflow-specific field overrides
|
|
analyze_field_overrides = {
|
|
"step": {
|
|
"type": "string",
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step"],
|
|
},
|
|
"step_number": {
|
|
"type": "integer",
|
|
"minimum": 1,
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["step_number"],
|
|
},
|
|
"total_steps": {
|
|
"type": "integer",
|
|
"minimum": 1,
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["total_steps"],
|
|
},
|
|
"next_step_required": {
|
|
"type": "boolean",
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["next_step_required"],
|
|
},
|
|
"findings": {
|
|
"type": "string",
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["findings"],
|
|
},
|
|
"files_checked": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["files_checked"],
|
|
},
|
|
"relevant_files": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["relevant_files"],
|
|
},
|
|
"confidence": {
|
|
"type": "string",
|
|
"enum": ["exploring", "low", "medium", "high", "very_high", "almost_certain", "certain"],
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["confidence"],
|
|
},
|
|
"backtrack_from_step": {
|
|
"type": "integer",
|
|
"minimum": 1,
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["backtrack_from_step"],
|
|
},
|
|
"images": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["images"],
|
|
},
|
|
"issues_found": {
|
|
"type": "array",
|
|
"items": {"type": "object"},
|
|
"description": "Issues or concerns identified during analysis, each with severity level (critical, high, medium, low)",
|
|
},
|
|
"analysis_type": {
|
|
"type": "string",
|
|
"enum": ["architecture", "performance", "security", "quality", "general"],
|
|
"default": "general",
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["analysis_type"],
|
|
},
|
|
"output_format": {
|
|
"type": "string",
|
|
"enum": ["summary", "detailed", "actionable"],
|
|
"default": "detailed",
|
|
"description": ANALYZE_WORKFLOW_FIELD_DESCRIPTIONS["output_format"],
|
|
},
|
|
}
|
|
|
|
# Use WorkflowSchemaBuilder with analyze-specific tool fields
|
|
return WorkflowSchemaBuilder.build_schema(
|
|
tool_specific_fields=analyze_field_overrides,
|
|
model_field_schema=self.get_model_field_schema(),
|
|
auto_mode=self.is_effective_auto_mode(),
|
|
tool_name=self.get_name(),
|
|
excluded_workflow_fields=list(excluded_fields),
|
|
)
|
|
|
|
def get_required_actions(
|
|
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None
|
|
) -> list[str]:
|
|
"""Define required actions for each investigation phase."""
|
|
if step_number == 1:
|
|
# Initial analysis investigation tasks
|
|
return [
|
|
"Read and understand the code files specified for analysis",
|
|
"Map the tech stack, frameworks, and overall architecture",
|
|
"Identify the main components, modules, and their relationships",
|
|
"Understand the business logic and intended functionality",
|
|
"Examine architectural patterns and design decisions used",
|
|
"Look for strengths, risks, and strategic improvement areas",
|
|
]
|
|
elif step_number < total_steps:
|
|
# Need deeper investigation
|
|
return [
|
|
"Examine specific architectural patterns and design decisions in detail",
|
|
"Analyze scalability characteristics and performance implications",
|
|
"Assess maintainability factors: module cohesion, coupling, tech debt",
|
|
"Identify security posture and potential systemic vulnerabilities",
|
|
"Look for overengineering, unnecessary complexity, or missing abstractions",
|
|
"Evaluate how well the architecture serves business and scaling goals",
|
|
]
|
|
else:
|
|
# Close to completion - need final verification
|
|
return [
|
|
"Verify all significant architectural insights have been documented",
|
|
"Confirm strategic improvement opportunities are comprehensively captured",
|
|
"Ensure both strengths and risks are properly identified with evidence",
|
|
"Validate that findings align with the analysis type and goals specified",
|
|
"Check that recommendations are actionable and proportional to the codebase",
|
|
"Confirm the analysis provides clear guidance for strategic decisions",
|
|
]
|
|
|
|
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
|
|
"""
|
|
Always call expert analysis for comprehensive validation.
|
|
|
|
Analysis benefits from a second opinion to ensure completeness.
|
|
"""
|
|
# Check if user explicitly requested to skip assistant model
|
|
if request and not self.get_request_use_assistant_model(request):
|
|
return False
|
|
|
|
# For analysis, we always want expert validation if we have any meaningful data
|
|
return len(consolidated_findings.relevant_files) > 0 or len(consolidated_findings.findings) >= 1
|
|
|
|
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
|
|
"""Prepare context for external model call for final analysis validation."""
|
|
context_parts = [
|
|
f"=== ANALYSIS REQUEST ===\\n{self.initial_request or 'Code analysis workflow initiated'}\\n=== END REQUEST ==="
|
|
]
|
|
|
|
# Add investigation summary
|
|
investigation_summary = self._build_analysis_summary(consolidated_findings)
|
|
context_parts.append(
|
|
f"\\n=== AGENT'S ANALYSIS INVESTIGATION ===\\n{investigation_summary}\\n=== END INVESTIGATION ==="
|
|
)
|
|
|
|
# Add analysis configuration context if available
|
|
if self.analysis_config:
|
|
config_text = "\\n".join(f"- {key}: {value}" for key, value in self.analysis_config.items() if value)
|
|
context_parts.append(f"\\n=== ANALYSIS CONFIGURATION ===\\n{config_text}\\n=== END CONFIGURATION ===")
|
|
|
|
# Add relevant code elements if available
|
|
if consolidated_findings.relevant_context:
|
|
methods_text = "\\n".join(f"- {method}" for method in consolidated_findings.relevant_context)
|
|
context_parts.append(f"\\n=== RELEVANT CODE ELEMENTS ===\\n{methods_text}\\n=== END CODE ELEMENTS ===")
|
|
|
|
# Add assessment evolution if available
|
|
if consolidated_findings.hypotheses:
|
|
assessments_text = "\\n".join(
|
|
f"Step {h['step']}: {h['hypothesis']}" for h in consolidated_findings.hypotheses
|
|
)
|
|
context_parts.append(f"\\n=== ASSESSMENT EVOLUTION ===\\n{assessments_text}\\n=== END ASSESSMENTS ===")
|
|
|
|
# Add images if available
|
|
if consolidated_findings.images:
|
|
images_text = "\\n".join(f"- {img}" for img in consolidated_findings.images)
|
|
context_parts.append(
|
|
f"\\n=== VISUAL ANALYSIS INFORMATION ===\\n{images_text}\\n=== END VISUAL INFORMATION ==="
|
|
)
|
|
|
|
return "\\n".join(context_parts)
|
|
|
|
def _build_analysis_summary(self, consolidated_findings) -> str:
|
|
"""Prepare a comprehensive summary of the analysis investigation."""
|
|
summary_parts = [
|
|
"=== SYSTEMATIC ANALYSIS INVESTIGATION SUMMARY ===",
|
|
f"Total steps: {len(consolidated_findings.findings)}",
|
|
f"Files examined: {len(consolidated_findings.files_checked)}",
|
|
f"Relevant files identified: {len(consolidated_findings.relevant_files)}",
|
|
f"Code elements analyzed: {len(consolidated_findings.relevant_context)}",
|
|
"",
|
|
"=== INVESTIGATION PROGRESSION ===",
|
|
]
|
|
|
|
for finding in consolidated_findings.findings:
|
|
summary_parts.append(finding)
|
|
|
|
return "\\n".join(summary_parts)
|
|
|
|
def should_include_files_in_expert_prompt(self) -> bool:
|
|
"""Include files in expert analysis for comprehensive validation."""
|
|
return True
|
|
|
|
def should_embed_system_prompt(self) -> bool:
|
|
"""Embed system prompt in expert analysis for proper context."""
|
|
return True
|
|
|
|
def get_expert_thinking_mode(self) -> str:
|
|
"""Use high thinking mode for thorough analysis."""
|
|
return "high"
|
|
|
|
def get_expert_analysis_instruction(self) -> str:
|
|
"""Get specific instruction for analysis expert validation."""
|
|
return (
|
|
"Please provide comprehensive analysis validation based on the investigation findings. "
|
|
"Focus on identifying any remaining architectural insights, validating the completeness of the analysis, "
|
|
"and providing final strategic recommendations following the structured format specified in the system prompt."
|
|
)
|
|
|
|
# Hook method overrides for analyze-specific behavior
|
|
|
|
def prepare_step_data(self, request) -> dict:
|
|
"""
|
|
Map analyze-specific fields for internal processing.
|
|
"""
|
|
step_data = {
|
|
"step": request.step,
|
|
"step_number": request.step_number,
|
|
"findings": request.findings,
|
|
"files_checked": request.files_checked,
|
|
"relevant_files": request.relevant_files,
|
|
"relevant_context": request.relevant_context,
|
|
"issues_found": request.issues_found, # Analyze workflow uses issues_found for structured problem tracking
|
|
"confidence": "medium", # Fixed value for workflow compatibility
|
|
"hypothesis": request.findings, # Map findings to hypothesis for compatibility
|
|
"images": request.images or [],
|
|
}
|
|
return step_data
|
|
|
|
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
|
|
"""
|
|
Analyze workflow always uses expert analysis for comprehensive validation.
|
|
|
|
Analysis benefits from a second opinion to ensure completeness and catch
|
|
any missed insights or alternative perspectives.
|
|
"""
|
|
return False
|
|
|
|
def store_initial_issue(self, step_description: str):
|
|
"""Store initial request for expert analysis."""
|
|
self.initial_request = step_description
|
|
|
|
# Override inheritance hooks for analyze-specific behavior
|
|
|
|
def get_completion_status(self) -> str:
|
|
"""Analyze tools use analysis-specific status."""
|
|
return "analysis_complete_ready_for_implementation"
|
|
|
|
def get_completion_data_key(self) -> str:
|
|
"""Analyze uses 'complete_analysis' key."""
|
|
return "complete_analysis"
|
|
|
|
def get_final_analysis_from_request(self, request):
|
|
"""Analyze tools use 'findings' field."""
|
|
return request.findings
|
|
|
|
def get_confidence_level(self, request) -> str:
|
|
"""Analyze tools use fixed confidence for consistency."""
|
|
return "medium"
|
|
|
|
def get_completion_message(self) -> str:
|
|
"""Analyze-specific completion message."""
|
|
return (
|
|
"Analysis complete. You have identified all significant patterns, "
|
|
"architectural insights, and strategic opportunities. MANDATORY: Present the user with the complete "
|
|
"analysis results organized by strategic impact, and IMMEDIATELY proceed with implementing the "
|
|
"highest priority recommendations or provide specific guidance for improvements. Focus on actionable "
|
|
"strategic insights."
|
|
)
|
|
|
|
def get_skip_reason(self) -> str:
|
|
"""Analyze-specific skip reason."""
|
|
return "Completed comprehensive analysis locally"
|
|
|
|
def get_skip_expert_analysis_status(self) -> str:
|
|
"""Analyze-specific expert analysis skip status."""
|
|
return "skipped_due_to_complete_analysis"
|
|
|
|
def prepare_work_summary(self) -> str:
|
|
"""Analyze-specific work summary."""
|
|
return self._build_analysis_summary(self.consolidated_findings)
|
|
|
|
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
|
|
"""
|
|
Analyze-specific completion message.
|
|
"""
|
|
base_message = (
|
|
"ANALYSIS IS COMPLETE. You MUST now summarize and present ALL analysis findings organized by "
|
|
"strategic impact (Critical → High → Medium → Low), specific architectural insights with code references, "
|
|
"and exact recommendations for improvement. Clearly prioritize the top 3 strategic opportunities that need "
|
|
"immediate attention. Provide concrete, actionable guidance for each finding—make it easy for a developer "
|
|
"to understand exactly what strategic improvements to implement and how to approach them."
|
|
)
|
|
|
|
# Add expert analysis guidance only when expert analysis was actually used
|
|
if expert_analysis_used:
|
|
expert_guidance = self.get_expert_analysis_guidance()
|
|
if expert_guidance:
|
|
return f"{base_message}\n\n{expert_guidance}"
|
|
|
|
return base_message
|
|
|
|
def get_expert_analysis_guidance(self) -> str:
|
|
"""
|
|
Provide specific guidance for handling expert analysis in code analysis.
|
|
"""
|
|
return (
|
|
"IMPORTANT: Analysis from an assistant model has been provided above. You MUST thoughtfully evaluate and validate "
|
|
"the expert insights rather than treating them as definitive conclusions. Cross-reference the expert "
|
|
"analysis with your own systematic investigation, verify that architectural recommendations are "
|
|
"appropriate for this codebase's scale and context, and ensure suggested improvements align with "
|
|
"the project's goals and constraints. Present a comprehensive synthesis that combines your detailed "
|
|
"analysis with validated expert perspectives, clearly distinguishing between patterns you've "
|
|
"independently identified and additional strategic insights from expert validation."
|
|
)
|
|
|
|
def get_step_guidance_message(self, request) -> str:
|
|
"""
|
|
Analyze-specific step guidance with detailed investigation instructions.
|
|
"""
|
|
step_guidance = self.get_analyze_step_guidance(request.step_number, request)
|
|
return step_guidance["next_steps"]
|
|
|
|
def get_analyze_step_guidance(self, step_number: int, request) -> dict[str, Any]:
|
|
"""
|
|
Provide step-specific guidance for analyze workflow.
|
|
"""
|
|
# Generate the next steps instruction based on required actions
|
|
required_actions = self.get_required_actions(step_number, "medium", request.findings, request.total_steps)
|
|
|
|
if step_number == 1:
|
|
next_steps = (
|
|
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first examine "
|
|
f"the code files thoroughly using appropriate tools. CRITICAL AWARENESS: You need to understand "
|
|
f"the architectural patterns, assess scalability and performance characteristics, identify strategic "
|
|
f"improvement areas, and look for systemic risks, overengineering, and missing abstractions. "
|
|
f"Use file reading tools, code analysis, and systematic examination to gather comprehensive information. "
|
|
f"Only call {self.get_name()} again AFTER completing your investigation. When you call "
|
|
f"{self.get_name()} next time, use step_number: {step_number + 1} and report specific "
|
|
f"files examined, architectural insights found, and strategic assessment discoveries."
|
|
)
|
|
elif step_number < request.total_steps:
|
|
next_steps = (
|
|
f"STOP! Do NOT call {self.get_name()} again yet. Based on your findings, you've identified areas that need "
|
|
f"deeper analysis. MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\\n"
|
|
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
|
|
+ f"\\n\\nOnly call {self.get_name()} again with step_number: {step_number + 1} AFTER "
|
|
+ "completing these analysis tasks."
|
|
)
|
|
else:
|
|
next_steps = (
|
|
f"WAIT! Your analysis needs final verification. DO NOT call {self.get_name()} immediately. REQUIRED ACTIONS:\\n"
|
|
+ "\\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
|
|
+ f"\\n\\nREMEMBER: Ensure you have identified all significant architectural insights and strategic "
|
|
f"opportunities across all areas. Document findings with specific file references and "
|
|
f"code examples where applicable, then call {self.get_name()} with step_number: {step_number + 1}."
|
|
)
|
|
|
|
return {"next_steps": next_steps}
|
|
|
|
def customize_workflow_response(self, response_data: dict, request) -> dict:
|
|
"""
|
|
Customize response to match analyze workflow format.
|
|
"""
|
|
# Store initial request on first step
|
|
if request.step_number == 1:
|
|
self.initial_request = request.step
|
|
# Store analysis configuration for expert analysis
|
|
if request.relevant_files:
|
|
self.analysis_config = {
|
|
"relevant_files": request.relevant_files,
|
|
"analysis_type": request.analysis_type,
|
|
"output_format": request.output_format,
|
|
}
|
|
|
|
# Convert generic status names to analyze-specific ones
|
|
tool_name = self.get_name()
|
|
status_mapping = {
|
|
f"{tool_name}_in_progress": "analysis_in_progress",
|
|
f"pause_for_{tool_name}": "pause_for_analysis",
|
|
f"{tool_name}_required": "analysis_required",
|
|
f"{tool_name}_complete": "analysis_complete",
|
|
}
|
|
|
|
if response_data["status"] in status_mapping:
|
|
response_data["status"] = status_mapping[response_data["status"]]
|
|
|
|
# Rename status field to match analyze workflow
|
|
if f"{tool_name}_status" in response_data:
|
|
response_data["analysis_status"] = response_data.pop(f"{tool_name}_status")
|
|
# Add analyze-specific status fields
|
|
response_data["analysis_status"]["insights_by_severity"] = {}
|
|
for insight in self.consolidated_findings.issues_found:
|
|
severity = insight.get("severity", "unknown")
|
|
if severity not in response_data["analysis_status"]["insights_by_severity"]:
|
|
response_data["analysis_status"]["insights_by_severity"][severity] = 0
|
|
response_data["analysis_status"]["insights_by_severity"][severity] += 1
|
|
response_data["analysis_status"]["analysis_confidence"] = self.get_request_confidence(request)
|
|
|
|
# Map complete_analyze to complete_analysis
|
|
if f"complete_{tool_name}" in response_data:
|
|
response_data["complete_analysis"] = response_data.pop(f"complete_{tool_name}")
|
|
|
|
# Map the completion flag to match analyze workflow
|
|
if f"{tool_name}_complete" in response_data:
|
|
response_data["analysis_complete"] = response_data.pop(f"{tool_name}_complete")
|
|
|
|
return response_data
|
|
|
|
# Required abstract methods from BaseTool
|
|
def get_request_model(self):
|
|
"""Return the analyze workflow-specific request model."""
|
|
return AnalyzeWorkflowRequest
|
|
|
|
async def prepare_prompt(self, request) -> str:
|
|
"""Not used - workflow tools use execute_workflow()."""
|
|
return "" # Workflow tools use execute_workflow() directly
|