Add DocGen tool with comprehensive documentation generation capabilities (#109)

* WIP: new workflow architecture

* WIP: further improvements and cleanup

* WIP: cleanup and docks, replace old tool with new

* WIP: cleanup and docks, replace old tool with new

* WIP: new planner implementation using workflow

* WIP: precommit tool working as a workflow instead of a basic tool
Support for passing False to use_assistant_model to skip external models completely and use Claude only

* WIP: precommit workflow version swapped with old

* WIP: codereview

* WIP: replaced codereview

* WIP: replaced codereview

* WIP: replaced refactor

* WIP: workflow for thinkdeep

* WIP: ensure files get embedded correctly

* WIP: thinkdeep replaced with workflow version

* WIP: improved messaging when an external model's response is received

* WIP: analyze tool swapped

* WIP: updated tests
* Extract only the content when building history
* Use "relevant_files" for workflow tools only

* WIP: updated tests
* Extract only the content when building history
* Use "relevant_files" for workflow tools only

* WIP: fixed get_completion_next_steps_message missing param

* Fixed tests
Request for files consistently

* Fixed tests
Request for files consistently

* Fixed tests

* New testgen workflow tool
Updated docs

* Swap testgen workflow

* Fix CI test failures by excluding API-dependent tests

- Update GitHub Actions workflow to exclude simulation tests that require API keys
- Fix collaboration tests to properly mock workflow tool expert analysis calls
- Update test assertions to handle new workflow tool response format
- Ensure unit tests run without external API dependencies in CI

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* WIP - Update tests to match new tools

* WIP - Update tests to match new tools

* WIP - Update tests to match new tools

* Should help with https://github.com/BeehiveInnovations/zen-mcp-server/issues/97
Clear python cache when running script: https://github.com/BeehiveInnovations/zen-mcp-server/issues/96
Improved retry error logging
Cleanup

* WIP - chat tool using new architecture and improved code sharing

* Removed todo

* Removed todo

* Cleanup old name

* Tweak wordings

* Tweak wordings
Migrate old tests

* Support for Flash 2.0 and Flash Lite 2.0

* Support for Flash 2.0 and Flash Lite 2.0

* Support for Flash 2.0 and Flash Lite 2.0
Fixed test

* Improved consensus to use the workflow base class

* Improved consensus to use the workflow base class

* Allow images

* Allow images

* Replaced old consensus tool

* Cleanup tests

* Tests for prompt size

* New tool: docgen
Tests for prompt size
Fixes: https://github.com/BeehiveInnovations/zen-mcp-server/issues/107
Use available token size limits: https://github.com/BeehiveInnovations/zen-mcp-server/issues/105

* Improved docgen prompt
Exclude TestGen from pytest inclusion

* Updated errors

* Lint

* DocGen instructed not to fix bugs, surface them and stick to d

* WIP

* Stop claude from being lazy and only documenting a small handful

* More style rules

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Beehive Innovations
2025-06-21 23:21:19 -07:00
committed by GitHub
parent 0655590a51
commit c960bcb720
58 changed files with 5492 additions and 5558 deletions

View File

@@ -7,6 +7,7 @@ from .chat import ChatTool
from .codereview import CodeReviewTool
from .consensus import ConsensusTool
from .debug import DebugIssueTool
from .docgen import DocgenTool
from .listmodels import ListModelsTool
from .planner import PlannerTool
from .precommit import PrecommitTool
@@ -14,11 +15,13 @@ from .refactor import RefactorTool
from .testgen import TestGenTool
from .thinkdeep import ThinkDeepTool
from .tracer import TracerTool
from .version import VersionTool
__all__ = [
"ThinkDeepTool",
"CodeReviewTool",
"DebugIssueTool",
"DocgenTool",
"AnalyzeTool",
"ChatTool",
"ConsensusTool",
@@ -28,4 +31,5 @@ __all__ = [
"RefactorTool",
"TestGenTool",
"TracerTool",
"VersionTool",
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,9 @@
"""
Chat tool - General development chat and collaborative thinking
This tool provides a conversational interface for general development assistance,
brainstorming, problem-solving, and collaborative thinking. It supports file context,
images, and conversation continuation for seamless multi-turn interactions.
"""
from typing import TYPE_CHECKING, Any, Optional
@@ -11,10 +15,11 @@ if TYPE_CHECKING:
from config import TEMPERATURE_BALANCED
from systemprompts import CHAT_PROMPT
from tools.shared.base_models import ToolRequest
from .base import BaseTool, ToolRequest
from .simple.base import SimpleTool
# Field descriptions to avoid duplication between Pydantic and JSON schema
# Field descriptions matching the original Chat tool exactly
CHAT_FIELD_DESCRIPTIONS = {
"prompt": (
"You MUST provide a thorough, expressive question or share an idea with as much context as possible. "
@@ -32,15 +37,23 @@ CHAT_FIELD_DESCRIPTIONS = {
class ChatRequest(ToolRequest):
"""Request model for chat tool"""
"""Request model for Chat tool"""
prompt: str = Field(..., description=CHAT_FIELD_DESCRIPTIONS["prompt"])
files: Optional[list[str]] = Field(default_factory=list, description=CHAT_FIELD_DESCRIPTIONS["files"])
images: Optional[list[str]] = Field(default_factory=list, description=CHAT_FIELD_DESCRIPTIONS["images"])
class ChatTool(BaseTool):
"""General development chat and collaborative thinking tool"""
class ChatTool(SimpleTool):
"""
General development chat and collaborative thinking tool using SimpleTool architecture.
This tool provides identical functionality to the original Chat tool but uses the new
SimpleTool architecture for cleaner code organization and better maintainability.
Migration note: This tool is designed to be a drop-in replacement for the original
Chat tool with 100% behavioral compatibility.
"""
def get_name(self) -> str:
return "chat"
@@ -57,7 +70,33 @@ class ChatTool(BaseTool):
"provide enhanced capabilities."
)
def get_system_prompt(self) -> str:
return CHAT_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_BALANCED
def get_model_category(self) -> "ToolModelCategory":
"""Chat prioritizes fast responses and cost efficiency"""
from tools.models import ToolModelCategory
return ToolModelCategory.FAST_RESPONSE
def get_request_model(self):
"""Return the Chat-specific request model"""
return ChatRequest
# === Schema Generation ===
# For maximum compatibility, we override get_input_schema() to match the original Chat tool exactly
def get_input_schema(self) -> dict[str, Any]:
"""
Generate input schema matching the original Chat tool exactly.
This maintains 100% compatibility with the original Chat tool by using
the same schema generation approach while still benefiting from SimpleTool
convenience methods.
"""
schema = {
"type": "object",
"properties": {
@@ -115,79 +154,62 @@ class ChatTool(BaseTool):
return schema
def get_system_prompt(self) -> str:
return CHAT_PROMPT
# === Tool-specific field definitions (alternative approach for reference) ===
# These aren't used since we override get_input_schema(), but they show how
# the tool could be implemented using the automatic SimpleTool schema building
def get_default_temperature(self) -> float:
return TEMPERATURE_BALANCED
def get_tool_fields(self) -> dict[str, dict[str, Any]]:
"""
Tool-specific field definitions for ChatSimple.
def get_model_category(self) -> "ToolModelCategory":
"""Chat prioritizes fast responses and cost efficiency"""
from tools.models import ToolModelCategory
Note: This method isn't used since we override get_input_schema() for
exact compatibility, but it demonstrates how ChatSimple could be
implemented using automatic schema building.
"""
return {
"prompt": {
"type": "string",
"description": CHAT_FIELD_DESCRIPTIONS["prompt"],
},
"files": {
"type": "array",
"items": {"type": "string"},
"description": CHAT_FIELD_DESCRIPTIONS["files"],
},
"images": {
"type": "array",
"items": {"type": "string"},
"description": CHAT_FIELD_DESCRIPTIONS["images"],
},
}
return ToolModelCategory.FAST_RESPONSE
def get_required_fields(self) -> list[str]:
"""Required fields for ChatSimple tool"""
return ["prompt"]
def get_request_model(self):
return ChatRequest
# === Hook Method Implementations ===
async def prepare_prompt(self, request: ChatRequest) -> str:
"""Prepare the chat prompt with optional context files"""
# Check for prompt.txt in files
prompt_content, updated_files = self.handle_prompt_file(request.files)
"""
Prepare the chat prompt with optional context files.
# Use prompt.txt content if available, otherwise use the prompt field
user_content = prompt_content if prompt_content else request.prompt
# Check user input size at MCP transport boundary (before adding internal content)
size_check = self.check_prompt_size(user_content)
if size_check:
# Need to return error, but prepare_prompt returns str
# Use exception to handle this cleanly
from tools.models import ToolOutput
raise ValueError(f"MCP_SIZE_CHECK:{ToolOutput(**size_check).model_dump_json()}")
# Update request files list
if updated_files is not None:
request.files = updated_files
# Add context files if provided (using centralized file handling with filtering)
if request.files:
file_content, processed_files = self._prepare_file_content_for_prompt(
request.files, request.continuation_id, "Context files"
)
self._actually_processed_files = processed_files
if file_content:
user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===="
# Check token limits
self._validate_token_limit(user_content, "Content")
# Add web search instruction if enabled
websearch_instruction = self.get_websearch_instruction(
request.use_websearch,
"""When discussing topics, consider if searches for these would help:
- Documentation for any technologies or concepts mentioned
- Current best practices and patterns
- Recent developments or updates
- Community discussions and solutions""",
)
# Combine system prompt with user content
full_prompt = f"""{self.get_system_prompt()}{websearch_instruction}
=== USER REQUEST ===
{user_content}
=== END REQUEST ===
Please provide a thoughtful, comprehensive response:"""
return full_prompt
This implementation matches the original Chat tool exactly while using
SimpleTool convenience methods for cleaner code.
"""
# Use SimpleTool's Chat-style prompt preparation
return self.prepare_chat_style_prompt(request)
def format_response(self, response: str, request: ChatRequest, model_info: Optional[dict] = None) -> str:
"""Format the chat response"""
"""
Format the chat response to match the original Chat tool exactly.
"""
return (
f"{response}\n\n---\n\n**Claude's Turn:** Evaluate this perspective alongside your analysis to "
"form a comprehensive solution and continue with the user's request and task at hand."
)
def get_websearch_guidance(self) -> str:
"""
Return Chat tool-style web search guidance.
"""
return self.get_chat_style_websearch_guidance()

File diff suppressed because it is too large Load Diff

646
tools/docgen.py Normal file
View File

@@ -0,0 +1,646 @@
"""
Documentation Generation tool - Automated code documentation with complexity analysis
This tool provides a structured workflow for adding comprehensive documentation to codebases.
It guides you through systematic code analysis to generate modern documentation with:
- Function/method parameter documentation
- Big O complexity analysis
- Call flow and dependency documentation
- Inline comments for complex logic
- Smart updating of existing documentation
Key features:
- Step-by-step documentation workflow with progress tracking
- Context-aware file embedding (references during analysis, full content for documentation)
- Automatic conversation threading and history preservation
- Expert analysis integration with external models
- Support for multiple programming languages and documentation styles
- Configurable documentation features via parameters
"""
import logging
from typing import TYPE_CHECKING, Any, Optional
from pydantic import Field
if TYPE_CHECKING:
from tools.models import ToolModelCategory
from config import TEMPERATURE_ANALYTICAL
from systemprompts import DOCGEN_PROMPT
from tools.shared.base_models import WorkflowRequest
from .workflow.base import WorkflowTool
logger = logging.getLogger(__name__)
# Tool-specific field descriptions for documentation generation
DOCGEN_FIELD_DESCRIPTIONS = {
"step": (
"For step 1: DISCOVERY PHASE ONLY - describe your plan to discover ALL files that need documentation in the current directory. "
"DO NOT document anything yet. Count all files, list them clearly, report the total count, then IMMEDIATELY proceed to step 2. "
"For step 2 and beyond: DOCUMENTATION PHASE - describe what you're currently documenting, focusing on ONE FILE at a time "
"to ensure complete coverage of all functions and methods within that file. CRITICAL: DO NOT ALTER ANY CODE LOGIC - "
"only add documentation (docstrings, comments). ALWAYS use MODERN documentation style for the programming language "
'(e.g., /// for Objective-C, /** */ for Java/JavaScript, """ for Python, // for Swift/C++, etc. - NEVER use legacy styles). '
"Consider complexity analysis, call flow information, and parameter descriptions. "
"If you find bugs or logic issues, TRACK THEM but DO NOT FIX THEM - report after documentation is complete. "
"Report progress using num_files_documented out of total_files_to_document counters."
),
"step_number": (
"The index of the current step in the documentation generation sequence, beginning at 1. Each step should build upon or "
"revise the previous one."
),
"total_steps": (
"Total steps needed to complete documentation: 1 (discovery) + number of files to document. "
"This is calculated dynamically based on total_files_to_document counter."
),
"next_step_required": (
"Set to true if you plan to continue the documentation analysis with another step. False means you believe the "
"documentation plan is complete and ready for implementation."
),
"findings": (
"Summarize everything discovered in this step about the code and its documentation needs. Include analysis of missing "
"documentation, complexity assessments, call flow understanding, and opportunities for improvement. Be specific and "
"avoid vague language—document what you now know about the code structure and how it affects your documentation plan. "
"IMPORTANT: Document both well-documented areas (good examples to follow) and areas needing documentation. "
"ALWAYS use MODERN documentation style appropriate for the programming language (/// for Objective-C, /** */ for Java/JavaScript, "
'""" for Python, // for Swift/C++, etc. - NEVER use legacy /* */ style for languages that have modern alternatives). '
"If you discover any glaring, super-critical bugs that could cause serious harm or data corruption, IMMEDIATELY STOP "
"the documentation workflow and ask the user directly if this critical bug should be addressed first before continuing. "
"For any other non-critical bugs, flaws, or potential improvements, note them here so they can be surfaced later for review. "
"In later steps, confirm or update past findings with additional evidence."
),
"relevant_files": (
"Current focus files (as full absolute paths) for this step. In each step, focus on documenting "
"ONE FILE COMPLETELY before moving to the next. This should contain only the file(s) being "
"actively documented in the current step, not all files that might need documentation."
),
"relevant_context": (
"List methods, functions, or classes that need documentation, in the format "
"'ClassName.methodName' or 'functionName'. "
"Prioritize those with complex logic, important interfaces, or missing/inadequate documentation."
),
"num_files_documented": (
"CRITICAL COUNTER: Number of files you have COMPLETELY documented so far. Start at 0. "
"Increment by 1 only when a file is 100% documented (all functions/methods have documentation). "
"This counter prevents premature completion - you CANNOT set next_step_required=false "
"unless num_files_documented equals total_files_to_document."
),
"total_files_to_document": (
"CRITICAL COUNTER: Total number of files discovered that need documentation in current directory. "
"Set this in step 1 after discovering all files. This is the target number - when "
"num_files_documented reaches this number, then and ONLY then can you set next_step_required=false. "
"This prevents stopping after documenting just one file."
),
"document_complexity": (
"Whether to include algorithmic complexity (Big O) analysis in function/method documentation. "
"Default: true. When enabled, analyzes and documents the computational complexity of algorithms."
),
"document_flow": (
"Whether to include call flow and dependency information in documentation. "
"Default: true. When enabled, documents which methods this function calls and which methods call this function."
),
"update_existing": (
"Whether to update existing documentation when it's found to be incorrect or incomplete. "
"Default: true. When enabled, improves existing docs rather than just adding new ones."
),
"comments_on_complex_logic": (
"Whether to add inline comments around complex logic within functions. "
"Default: true. When enabled, adds explanatory comments for non-obvious algorithmic steps."
),
}
class DocgenRequest(WorkflowRequest):
"""Request model for documentation generation steps"""
# Required workflow fields
step: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step"])
step_number: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["step_number"])
total_steps: int = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["total_steps"])
next_step_required: bool = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["next_step_required"])
# Documentation analysis tracking fields
findings: str = Field(..., description=DOCGEN_FIELD_DESCRIPTIONS["findings"])
relevant_files: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_files"])
relevant_context: list[str] = Field(default_factory=list, description=DOCGEN_FIELD_DESCRIPTIONS["relevant_context"])
# Critical completion tracking counters
num_files_documented: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"])
total_files_to_document: int = Field(0, description=DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"])
# Documentation generation configuration parameters
document_complexity: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_complexity"])
document_flow: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["document_flow"])
update_existing: Optional[bool] = Field(True, description=DOCGEN_FIELD_DESCRIPTIONS["update_existing"])
comments_on_complex_logic: Optional[bool] = Field(
True, description=DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"]
)
class DocgenTool(WorkflowTool):
"""
Documentation generation tool for automated code documentation with complexity analysis.
This tool implements a structured documentation workflow that guides users through
methodical code analysis to generate comprehensive documentation including:
- Function/method signatures and parameter descriptions
- Algorithmic complexity (Big O) analysis
- Call flow and dependency documentation
- Inline comments for complex logic
- Modern documentation style appropriate for the language/platform
"""
def __init__(self):
super().__init__()
self.initial_request = None
def get_name(self) -> str:
return "docgen"
def get_description(self) -> str:
return (
"COMPREHENSIVE DOCUMENTATION GENERATION - Step-by-step code documentation with expert analysis. "
"This tool guides you through a systematic investigation process where you:\n\n"
"1. Start with step 1: describe your documentation investigation plan\n"
"2. STOP and investigate code structure, patterns, and documentation needs\n"
"3. Report findings in step 2 with concrete evidence from actual code analysis\n"
"4. Continue investigating between each step\n"
"5. Track findings, relevant files, and documentation opportunities throughout\n"
"6. Update assessments as understanding evolves\n"
"7. Once investigation is complete, receive expert analysis\n\n"
"IMPORTANT: This tool enforces investigation between steps:\n"
"- After each call, you MUST investigate before calling again\n"
"- Each step must include NEW evidence from code examination\n"
"- No recursive calls without actual investigation work\n"
"- The tool will specify which step number to use next\n"
"- Follow the required_actions list for investigation guidance\n\n"
"Perfect for: comprehensive documentation generation, code documentation analysis, "
"complexity assessment, documentation modernization, API documentation."
)
def get_system_prompt(self) -> str:
return DOCGEN_PROMPT
def get_default_temperature(self) -> float:
return TEMPERATURE_ANALYTICAL
def get_model_category(self) -> "ToolModelCategory":
"""Docgen requires analytical and reasoning capabilities"""
from tools.models import ToolModelCategory
return ToolModelCategory.EXTENDED_REASONING
def requires_model(self) -> bool:
"""
Docgen tool doesn't require model resolution at the MCP boundary.
The docgen tool is a self-contained workflow tool that guides Claude through
systematic documentation generation without calling external AI models.
Returns:
bool: False - docgen doesn't need external AI model access
"""
return False
def requires_expert_analysis(self) -> bool:
"""Docgen is self-contained and doesn't need expert analysis."""
return False
def get_workflow_request_model(self):
"""Return the docgen-specific request model."""
return DocgenRequest
def get_tool_fields(self) -> dict[str, dict[str, Any]]:
"""Return the tool-specific fields for docgen."""
return {
"document_complexity": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["document_complexity"],
},
"document_flow": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["document_flow"],
},
"update_existing": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["update_existing"],
},
"comments_on_complex_logic": {
"type": "boolean",
"default": True,
"description": DOCGEN_FIELD_DESCRIPTIONS["comments_on_complex_logic"],
},
"num_files_documented": {
"type": "integer",
"default": 0,
"minimum": 0,
"description": DOCGEN_FIELD_DESCRIPTIONS["num_files_documented"],
},
"total_files_to_document": {
"type": "integer",
"default": 0,
"minimum": 0,
"description": DOCGEN_FIELD_DESCRIPTIONS["total_files_to_document"],
},
}
def get_required_fields(self) -> list[str]:
"""Return additional required fields beyond the standard workflow requirements."""
return [
"document_complexity",
"document_flow",
"update_existing",
"comments_on_complex_logic",
"num_files_documented",
"total_files_to_document",
]
def get_input_schema(self) -> dict[str, Any]:
"""Generate input schema using WorkflowSchemaBuilder with field exclusions."""
from .workflow.schema_builders import WorkflowSchemaBuilder
# Exclude workflow fields that documentation generation doesn't need
excluded_workflow_fields = [
"confidence", # Documentation doesn't use confidence levels
"hypothesis", # Documentation doesn't use hypothesis
"backtrack_from_step", # Documentation uses simpler error recovery
"files_checked", # Documentation uses doc_files and doc_methods instead for better tracking
]
# Exclude common fields that documentation generation doesn't need
excluded_common_fields = [
"model", # Documentation doesn't need external model selection
"temperature", # Documentation doesn't need temperature control
"thinking_mode", # Documentation doesn't need thinking mode
"use_websearch", # Documentation doesn't need web search
"images", # Documentation doesn't use images
]
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=self.get_tool_fields(),
required_fields=self.get_required_fields(), # Include docgen-specific required fields
model_field_schema=None, # Exclude model field - docgen doesn't need external model selection
auto_mode=False, # Force non-auto mode to prevent model field addition
tool_name=self.get_name(),
excluded_workflow_fields=excluded_workflow_fields,
excluded_common_fields=excluded_common_fields,
)
def get_required_actions(self, step_number: int, confidence: str, findings: str, total_steps: int) -> list[str]:
"""Define required actions for comprehensive documentation analysis with step-by-step file focus."""
if step_number == 1:
# Initial discovery ONLY - no documentation yet
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Discover ALL files in the current directory (not nested) that need documentation",
"COUNT the exact number of files that need documentation",
"LIST all the files you found that need documentation by name",
"IDENTIFY the programming language(s) to use MODERN documentation style (/// for Objective-C, /** */ for Java/JavaScript, etc.)",
"DO NOT start documenting any files yet - this is discovery phase only",
"Report the total count and file list clearly to the user",
"IMMEDIATELY call docgen step 2 after discovery to begin documentation phase",
"WHEN CALLING DOCGEN step 2: Set total_files_to_document to the exact count you found",
"WHEN CALLING DOCGEN step 2: Set num_files_documented to 0 (haven't started yet)",
]
elif step_number == 2:
# Start documentation phase with first file
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Choose the FIRST file from your discovered list to start documentation",
"For the chosen file: identify ALL functions, classes, and methods within it",
'USE MODERN documentation style for the programming language (/// for Objective-C, /** */ for Java/JavaScript, """ for Python, etc.)',
"Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
"When file is 100% documented, increment num_files_documented from 0 to 1",
"Note any dependencies this file has (what it imports/calls) and what calls into it",
"Track any logic bugs/issues found but DO NOT FIX THEM - report after documentation complete",
"Report which specific functions you documented in this step for accountability",
"Report progress: num_files_documented (1) out of total_files_to_document",
]
elif step_number <= 4:
# Continue with focused file-by-file approach
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Choose the NEXT undocumented file from your discovered list",
"For the chosen file: identify ALL functions, classes, and methods within it",
"USE MODERN documentation style for the programming language (NEVER use legacy /* */ style for languages with modern alternatives)",
"Document ALL functions/methods in the chosen file - don't skip any - DOCUMENTATION ONLY",
"When file is 100% documented, increment num_files_documented by 1",
"Verify that EVERY function in the current file has proper documentation (no skipping)",
"Track any bugs/issues found but DO NOT FIX THEM - document first, report issues later",
"Report specific function names you documented for verification",
"Report progress: current num_files_documented out of total_files_to_document",
]
else:
# Continue systematic file-by-file coverage
return [
"CRITICAL: DO NOT ALTER ANY CODE LOGIC! Only add documentation (docstrings, comments)",
"Check counters: num_files_documented vs total_files_to_document",
"If num_files_documented < total_files_to_document: choose NEXT undocumented file",
"USE MODERN documentation style appropriate for each programming language (NEVER legacy styles)",
"Document every function, method, and class in current file with no exceptions",
"When file is 100% documented, increment num_files_documented by 1",
"Track bugs/issues found but DO NOT FIX THEM - focus on documentation only",
"Report progress: current num_files_documented out of total_files_to_document",
"If num_files_documented < total_files_to_document: RESTART docgen with next step",
"ONLY set next_step_required=false when num_files_documented equals total_files_to_document",
"For nested dependencies: check if functions call into subdirectories and document those too",
"Report any accumulated bugs/issues found during documentation for user decision",
]
def should_call_expert_analysis(self, consolidated_findings, request=None) -> bool:
"""Docgen is self-contained and doesn't need expert analysis."""
return False
def prepare_expert_analysis_context(self, consolidated_findings) -> str:
"""Docgen doesn't use expert analysis."""
return ""
def get_step_guidance(self, step_number: int, confidence: str, request) -> dict[str, Any]:
"""
Provide step-specific guidance for documentation generation workflow.
This method generates docgen-specific guidance used by get_step_guidance_message().
"""
# Generate the next steps instruction based on required actions
# Calculate dynamic total_steps based on files to document
total_files_to_document = self.get_request_total_files_to_document(request)
calculated_total_steps = 1 + total_files_to_document if total_files_to_document > 0 else request.total_steps
required_actions = self.get_required_actions(step_number, confidence, request.findings, calculated_total_steps)
if step_number == 1:
next_steps = (
f"DISCOVERY PHASE ONLY - DO NOT START DOCUMENTING YET!\n"
f"MANDATORY: DO NOT call the {self.get_name()} tool again immediately. You MUST first perform "
f"FILE DISCOVERY step by step. DO NOT DOCUMENT ANYTHING YET. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nCRITICAL: When you call {self.get_name()} step 2, set total_files_to_document to the exact count "
f"of files needing documentation and set num_files_documented to 0 (haven't started documenting yet). "
f"Your total_steps will be automatically calculated as 1 (discovery) + number of files to document. "
f"Step 2 will BEGIN the documentation phase. Report the count clearly and then IMMEDIATELY "
f"proceed to call {self.get_name()} step 2 to start documenting the first file."
)
elif step_number == 2:
next_steps = (
f"DOCUMENTATION PHASE BEGINS! ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"START FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented from 0 to 1 when first file complete. "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"If you found bugs/issues, LIST THEM but DO NOT FIX THEM - ask user what to do after documentation. "
f"Do NOT move to a new file until the current one is completely documented. "
f"When ready for step {step_number + 1}, report completed work with updated counters."
)
elif step_number <= 4:
next_steps = (
f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"CONTINUE FILE-BY-FILE APPROACH! Focus on ONE file until 100% complete. "
f"MANDATORY ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT your progress: which specific functions did you document? Update num_files_documented when file complete. "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"If you found bugs/issues, LIST THEM but DO NOT FIX THEM - ask user what to do after documentation. "
f"Do NOT move to a new file until the current one is completely documented. "
f"When ready for step {step_number + 1}, report completed work with updated counters."
)
else:
next_steps = (
f"ABSOLUTE RULE: DO NOT ALTER ANY CODE LOGIC! DOCUMENTATION ONLY!\n"
f"CRITICAL: Check if MORE FILES need documentation before finishing! "
f"REQUIRED ACTIONS before calling {self.get_name()} step {step_number + 1}:\n"
+ "\n".join(f"{i+1}. {action}" for i, action in enumerate(required_actions))
+ f"\n\nREPORT which functions you documented and update num_files_documented when file complete. "
f"CHECK: If num_files_documented < total_files_to_document, RESTART {self.get_name()} with next step! "
f"CRITICAL: Only set next_step_required=false when num_files_documented equals total_files_to_document! "
f"REPORT counters: current num_files_documented out of total_files_to_document. "
f"If you accumulated bugs/issues during documentation, REPORT THEM and ask user for guidance. "
f"NO recursive {self.get_name()} calls without actual documentation work!"
)
return {"next_steps": next_steps}
# Hook method overrides for docgen-specific behavior
async def handle_work_completion(self, response_data: dict, request, arguments: dict) -> dict:
"""
Override work completion to enforce counter validation.
The docgen tool MUST complete ALL files before finishing. If counters don't match,
force continuation regardless of next_step_required setting.
"""
# CRITICAL VALIDATION: Check if all files have been documented using proper inheritance hooks
num_files_documented = self.get_request_num_files_documented(request)
total_files_to_document = self.get_request_total_files_to_document(request)
if num_files_documented < total_files_to_document:
# Counters don't match - force continuation!
logger.warning(
f"Docgen stopping early: {num_files_documented} < {total_files_to_document}. "
f"Forcing continuation to document remaining files."
)
# Override to continuation mode
response_data["status"] = "documentation_analysis_required"
response_data[f"pause_for_{self.get_name()}"] = True
response_data["next_steps"] = (
f"CRITICAL ERROR: You attempted to finish documentation with only {num_files_documented} "
f"out of {total_files_to_document} files documented! You MUST continue documenting "
f"the remaining {total_files_to_document - num_files_documented} files. "
f"Call {self.get_name()} again with step {request.step_number + 1} and continue documentation "
f"of the next undocumented file. DO NOT set next_step_required=false until ALL files are documented!"
)
return response_data
# If counters match, proceed with normal completion
return await super().handle_work_completion(response_data, request, arguments)
def prepare_step_data(self, request) -> dict:
"""
Prepare docgen-specific step data for processing.
Calculates total_steps dynamically based on number of files to document:
- Step 1: Discovery phase
- Steps 2+: One step per file to document
"""
# Calculate dynamic total_steps based on files to document
total_files_to_document = self.get_request_total_files_to_document(request)
if total_files_to_document > 0:
# Discovery step (1) + one step per file
calculated_total_steps = 1 + total_files_to_document
else:
# Fallback to request total_steps if no file count available
calculated_total_steps = request.total_steps
step_data = {
"step": request.step,
"step_number": request.step_number,
"total_steps": calculated_total_steps, # Use calculated value
"findings": request.findings,
"relevant_files": request.relevant_files,
"relevant_context": request.relevant_context,
"num_files_documented": request.num_files_documented,
"total_files_to_document": request.total_files_to_document,
"issues_found": [], # Docgen uses this for documentation gaps
"confidence": "medium", # Default confidence for docgen
"hypothesis": "systematic_documentation_needed", # Default hypothesis
"images": [], # Docgen doesn't typically use images
# CRITICAL: Include documentation configuration parameters so the model can see them
"document_complexity": request.document_complexity,
"document_flow": request.document_flow,
"update_existing": request.update_existing,
"comments_on_complex_logic": request.comments_on_complex_logic,
}
return step_data
def should_skip_expert_analysis(self, request, consolidated_findings) -> bool:
"""
Docgen tool skips expert analysis when Claude has "certain" confidence.
"""
return request.confidence == "certain" and not request.next_step_required
# Override inheritance hooks for docgen-specific behavior
def get_completion_status(self) -> str:
"""Docgen tools use docgen-specific status."""
return "documentation_analysis_complete"
def get_completion_data_key(self) -> str:
"""Docgen uses 'complete_documentation_analysis' key."""
return "complete_documentation_analysis"
def get_final_analysis_from_request(self, request):
"""Docgen tools use 'hypothesis' field for documentation strategy."""
return request.hypothesis
def get_confidence_level(self, request) -> str:
"""Docgen tools use 'certain' for high confidence."""
return request.confidence or "high"
def get_completion_message(self) -> str:
"""Docgen-specific completion message."""
return (
"Documentation analysis complete with high confidence. You have identified the comprehensive "
"documentation needs and strategy. MANDATORY: Present the user with the documentation plan "
"and IMMEDIATELY proceed with implementing the documentation without requiring further "
"consultation. Focus on the precise documentation improvements needed."
)
def get_skip_reason(self) -> str:
"""Docgen-specific skip reason."""
return "Claude completed comprehensive documentation analysis"
def get_request_relevant_context(self, request) -> list:
"""Get relevant_context for docgen tool."""
try:
return request.relevant_context or []
except AttributeError:
return []
def get_request_num_files_documented(self, request) -> int:
"""Get num_files_documented from request. Override for custom handling."""
try:
return request.num_files_documented or 0
except AttributeError:
return 0
def get_request_total_files_to_document(self, request) -> int:
"""Get total_files_to_document from request. Override for custom handling."""
try:
return request.total_files_to_document or 0
except AttributeError:
return 0
def get_skip_expert_analysis_status(self) -> str:
"""Docgen-specific expert analysis skip status."""
return "skipped_due_to_complete_analysis"
def prepare_work_summary(self) -> str:
"""Docgen-specific work summary."""
try:
return f"Completed {len(self.work_history)} documentation analysis steps"
except AttributeError:
return "Completed documentation analysis"
def get_completion_next_steps_message(self, expert_analysis_used: bool = False) -> str:
"""
Docgen-specific completion message.
"""
return (
"DOCUMENTATION ANALYSIS IS COMPLETE FOR ALL FILES (num_files_documented equals total_files_to_document). "
"MANDATORY FINAL VERIFICATION: Before presenting your summary, you MUST perform a final verification scan. "
"Read through EVERY file you documented and check EVERY function, method, class, and property to confirm "
"it has proper documentation including complexity analysis and call flow information. If ANY items lack "
"documentation, document them immediately before finishing. "
"THEN present a clear summary showing: 1) Final counters: num_files_documented out of total_files_to_document, "
"2) Complete accountability list of ALL files you documented with verification status, "
"3) Detailed list of EVERY function/method you documented in each file (proving complete coverage), "
"4) Any dependency relationships you discovered between files, 5) Recommended documentation improvements with concrete examples including "
"complexity analysis and call flow information. 6) **CRITICAL**: List any bugs or logic issues you found "
"during documentation but did NOT fix - present these to the user and ask what they'd like to do about them. "
"Make it easy for a developer to see the complete documentation status across the entire codebase with full accountability."
)
def get_step_guidance_message(self, request) -> str:
"""
Docgen-specific step guidance with detailed analysis instructions.
"""
step_guidance = self.get_step_guidance(request.step_number, request.confidence, request)
return step_guidance["next_steps"]
def customize_workflow_response(self, response_data: dict, request) -> dict:
"""
Customize response to match docgen tool format.
"""
# Store initial request on first step
if request.step_number == 1:
self.initial_request = request.step
# Convert generic status names to docgen-specific ones
tool_name = self.get_name()
status_mapping = {
f"{tool_name}_in_progress": "documentation_analysis_in_progress",
f"pause_for_{tool_name}": "pause_for_documentation_analysis",
f"{tool_name}_required": "documentation_analysis_required",
f"{tool_name}_complete": "documentation_analysis_complete",
}
if response_data["status"] in status_mapping:
response_data["status"] = status_mapping[response_data["status"]]
# Rename status field to match docgen tool
if f"{tool_name}_status" in response_data:
response_data["documentation_analysis_status"] = response_data.pop(f"{tool_name}_status")
# Add docgen-specific status fields
response_data["documentation_analysis_status"]["documentation_strategies"] = len(
self.consolidated_findings.hypotheses
)
# Rename complete documentation analysis data
if f"complete_{tool_name}" in response_data:
response_data["complete_documentation_analysis"] = response_data.pop(f"complete_{tool_name}")
# Map the completion flag to match docgen tool
if f"{tool_name}_complete" in response_data:
response_data["documentation_analysis_complete"] = response_data.pop(f"{tool_name}_complete")
# Map the required flag to match docgen tool
if f"{tool_name}_required" in response_data:
response_data["documentation_analysis_required"] = response_data.pop(f"{tool_name}_required")
return response_data
# Required abstract methods from BaseTool
def get_request_model(self):
"""Return the docgen-specific request model."""
return DocgenRequest
async def prepare_prompt(self, request) -> str:
"""Not used - workflow tools use execute_workflow()."""
return "" # Workflow tools use execute_workflow() directly

View File

@@ -12,8 +12,9 @@ from typing import Any, Optional
from mcp.types import TextContent
from tools.base import BaseTool, ToolRequest
from tools.models import ToolModelCategory, ToolOutput
from tools.shared.base_models import ToolRequest
from tools.shared.base_tool import BaseTool
logger = logging.getLogger(__name__)
@@ -37,7 +38,7 @@ class ListModelsTool(BaseTool):
"LIST AVAILABLE MODELS - Display all AI models organized by provider. "
"Shows which providers are configured, available models, their aliases, "
"context windows, and capabilities. Useful for understanding what models "
"can be used and their characteristics."
"can be used and their characteristics. MANDATORY: Must display full output to the user."
)
def get_input_schema(self) -> dict[str, Any]:

View File

@@ -23,9 +23,6 @@ class ContinuationOffer(BaseModel):
..., description="Thread continuation ID for multi-turn conversations across different tools"
)
note: str = Field(..., description="Message explaining continuation opportunity to Claude")
suggested_tool_params: Optional[dict[str, Any]] = Field(
None, description="Suggested parameters for continued tool usage"
)
remaining_turns: int = Field(..., description="Number of conversation turns remaining")

View File

@@ -670,7 +670,7 @@ class RefactorTool(WorkflowTool):
response_data["refactoring_status"]["opportunities_by_type"] = refactor_types
response_data["refactoring_status"]["refactor_confidence"] = request.confidence
# Map complete_refactorworkflow to complete_refactoring
# Map complete_refactor to complete_refactoring
if f"complete_{tool_name}" in response_data:
response_data["complete_refactoring"] = response_data.pop(f"complete_{tool_name}")

View File

@@ -256,6 +256,7 @@ class BaseTool(ABC):
# Find all custom models (is_custom=true)
for alias in registry.list_aliases():
config = registry.resolve(alias)
# Use hasattr for defensive programming - is_custom is optional with default False
if config and hasattr(config, "is_custom") and config.is_custom:
if alias not in all_models:
all_models.append(alias)
@@ -345,6 +346,7 @@ class BaseTool(ABC):
# Find all custom models (is_custom=true)
for alias in registry.list_aliases():
config = registry.resolve(alias)
# Use hasattr for defensive programming - is_custom is optional with default False
if config and hasattr(config, "is_custom") and config.is_custom:
# Format context window
context_tokens = config.context_window
@@ -798,6 +800,23 @@ class BaseTool(ABC):
return prompt_content, updated_files if updated_files else None
def get_prompt_content_for_size_validation(self, user_content: str) -> str:
"""
Get the content that should be validated for MCP prompt size limits.
This hook method allows tools to specify what content should be checked
against the MCP transport size limit. By default, it returns the user content,
but can be overridden to exclude conversation history when needed.
Args:
user_content: The user content that would normally be validated
Returns:
The content that should actually be validated for size limits
"""
# Default implementation: validate the full user content
return user_content
def check_prompt_size(self, text: str) -> Optional[dict[str, Any]]:
"""
Check if USER INPUT text is too large for MCP transport boundary.
@@ -841,6 +860,7 @@ class BaseTool(ABC):
reserve_tokens: int = 1_000,
remaining_budget: Optional[int] = None,
arguments: Optional[dict] = None,
model_context: Optional[Any] = None,
) -> tuple[str, list[str]]:
"""
Centralized file processing implementing dual prioritization strategy.
@@ -855,6 +875,7 @@ class BaseTool(ABC):
reserve_tokens: Tokens to reserve for additional prompt content (default 1K)
remaining_budget: Remaining token budget after conversation history (from server.py)
arguments: Original tool arguments (used to extract _remaining_tokens if available)
model_context: Model context object with all model information including token allocation
Returns:
tuple[str, list[str]]: (formatted_file_content, actually_processed_files)
@@ -877,19 +898,18 @@ class BaseTool(ABC):
elif max_tokens is not None:
effective_max_tokens = max_tokens - reserve_tokens
else:
# The execute() method is responsible for setting self._model_context.
# A missing context is a programming error, not a fallback case.
if not hasattr(self, "_model_context") or not self._model_context:
logger.error(
f"[FILES] {self.name}: _prepare_file_content_for_prompt called without a valid model context. "
"This indicates an incorrect call sequence in the tool's implementation."
)
# Fail fast to reveal integration issues. A silent fallback with arbitrary
# limits can hide bugs and lead to unexpected token usage or silent failures.
raise RuntimeError("ModelContext not initialized before file preparation.")
# Use model_context for token allocation
if not model_context:
# Try to get from stored attributes as fallback
model_context = getattr(self, "_model_context", None)
if not model_context:
logger.error(
f"[FILES] {self.name}: _prepare_file_content_for_prompt called without model_context. "
"This indicates an incorrect call sequence in the tool's implementation."
)
raise RuntimeError("Model context not provided for file preparation.")
# This is now the single source of truth for token allocation.
model_context = self._model_context
try:
token_allocation = model_context.calculate_token_allocation()
# Standardize on `file_tokens` for consistency and correctness.
@@ -1222,6 +1242,220 @@ When recommending searches, be specific about what information you need and why
return model_name, model_context
def validate_and_correct_temperature(self, temperature: float, model_context: Any) -> tuple[float, list[str]]:
"""
Validate and correct temperature for the specified model.
This method ensures that the temperature value is within the valid range
for the specific model being used. Different models have different temperature
constraints (e.g., o1 models require temperature=1.0, GPT models support 0-2).
Args:
temperature: Temperature value to validate
model_context: Model context object containing model name, provider, and capabilities
Returns:
Tuple of (corrected_temperature, warning_messages)
"""
try:
# Use model context capabilities directly - clean OOP approach
capabilities = model_context.capabilities
constraint = capabilities.temperature_constraint
warnings = []
if not constraint.validate(temperature):
corrected = constraint.get_corrected_value(temperature)
warning = (
f"Temperature {temperature} invalid for {model_context.model_name}. "
f"{constraint.get_description()}. Using {corrected} instead."
)
warnings.append(warning)
return corrected, warnings
return temperature, warnings
except Exception as e:
# If validation fails for any reason, use the original temperature
# and log a warning (but don't fail the request)
logger.warning(f"Temperature validation failed for {model_context.model_name}: {e}")
return temperature, [f"Temperature validation failed: {e}"]
def _validate_image_limits(
self, images: Optional[list[str]], model_context: Optional[Any] = None, continuation_id: Optional[str] = None
) -> Optional[dict]:
"""
Validate image size and count against model capabilities.
This performs strict validation to ensure we don't exceed model-specific
image limits. Uses capability-based validation with actual model
configuration rather than hard-coded limits.
Args:
images: List of image paths/data URLs to validate
model_context: Model context object containing model name, provider, and capabilities
continuation_id: Optional continuation ID for conversation context
Returns:
Optional[dict]: Error response if validation fails, None if valid
"""
if not images:
return None
# Import here to avoid circular imports
import base64
from pathlib import Path
# Handle legacy calls (positional model_name string)
if isinstance(model_context, str):
# Legacy call: _validate_image_limits(images, "model-name")
logger.warning(
"Legacy _validate_image_limits call with model_name string. Use model_context object instead."
)
try:
from utils.model_context import ModelContext
model_context = ModelContext(model_context)
except Exception as e:
logger.warning(f"Failed to create model context from legacy model_name: {e}")
# Generic error response for any unavailable model
return {
"status": "error",
"content": f"Model '{model_context}' is not available. {str(e)}",
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_context,
"supports_images": None, # Unknown since model doesn't exist
"image_count": len(images) if images else 0,
},
}
if not model_context:
# Get from tool's stored context as fallback
model_context = getattr(self, "_model_context", None)
if not model_context:
logger.warning("No model context available for image validation")
return None
try:
# Use model context capabilities directly - clean OOP approach
capabilities = model_context.capabilities
model_name = model_context.model_name
except Exception as e:
logger.warning(f"Failed to get capabilities from model_context for image validation: {e}")
# Generic error response when capabilities cannot be accessed
model_name = getattr(model_context, "model_name", "unknown")
return {
"status": "error",
"content": f"Model '{model_name}' is not available. {str(e)}",
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"supports_images": None, # Unknown since model capabilities unavailable
"image_count": len(images) if images else 0,
},
}
# Check if model supports images
if not capabilities.supports_images:
return {
"status": "error",
"content": (
f"Image support not available: Model '{model_name}' does not support image processing. "
f"Please use a vision-capable model such as 'gemini-2.5-flash', 'o3', "
f"or 'claude-3-opus' for image analysis tasks."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"supports_images": False,
"image_count": len(images),
},
}
# Get model image limits from capabilities
max_images = 5 # Default max number of images
max_size_mb = capabilities.max_image_size_mb
# Check image count
if len(images) > max_images:
return {
"status": "error",
"content": (
f"Too many images: Model '{model_name}' supports a maximum of {max_images} images, "
f"but {len(images)} were provided. Please reduce the number of images."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"image_count": len(images),
"max_images": max_images,
},
}
# Calculate total size of all images
total_size_mb = 0.0
for image_path in images:
try:
if image_path.startswith("data:image/"):
# Handle data URL: data:image/png;base64,iVBORw0...
_, data = image_path.split(",", 1)
# Base64 encoding increases size by ~33%, so decode to get actual size
actual_size = len(base64.b64decode(data))
total_size_mb += actual_size / (1024 * 1024)
else:
# Handle file path
path = Path(image_path)
if path.exists():
file_size = path.stat().st_size
total_size_mb += file_size / (1024 * 1024)
else:
logger.warning(f"Image file not found: {image_path}")
# Assume a reasonable size for missing files to avoid breaking validation
total_size_mb += 1.0 # 1MB assumption
except Exception as e:
logger.warning(f"Failed to get size for image {image_path}: {e}")
# Assume a reasonable size for problematic files
total_size_mb += 1.0 # 1MB assumption
# Apply 40MB cap for custom models if needed
effective_limit_mb = max_size_mb
try:
from providers.base import ProviderType
# ModelCapabilities dataclass has provider field defined
if capabilities.provider == ProviderType.CUSTOM:
effective_limit_mb = min(max_size_mb, 40.0)
except Exception:
pass
# Validate against size limit
if total_size_mb > effective_limit_mb:
return {
"status": "error",
"content": (
f"Image size limit exceeded: Model '{model_name}' supports maximum {effective_limit_mb:.1f}MB "
f"for all images combined, but {total_size_mb:.1f}MB was provided. "
f"Please reduce image sizes or count and try again."
),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_name,
"total_size_mb": round(total_size_mb, 2),
"limit_mb": round(effective_limit_mb, 2),
"image_count": len(images),
"supports_images": True,
},
}
# All validations passed
logger.debug(f"Image validation passed: {len(images)} images, {total_size_mb:.1f}MB total")
return None
def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
"""Parse response - will be inherited for now."""
# Implementation inherited from current base.py

View File

@@ -100,6 +100,23 @@ class SimpleTool(BaseTool):
"""
return []
def format_response(self, response: str, request, model_info: Optional[dict] = None) -> str:
"""
Format the AI response before returning to the client.
This is a hook method that subclasses can override to customize
response formatting. The default implementation returns the response as-is.
Args:
response: The raw response from the AI model
request: The validated request object
model_info: Optional model information dictionary
Returns:
Formatted response string
"""
return response
def get_input_schema(self) -> dict[str, Any]:
"""
Generate the complete input schema using SchemaBuilder.
@@ -110,6 +127,9 @@ class SimpleTool(BaseTool):
- Model field with proper auto-mode handling
- Required fields from get_required_fields()
Tools can override this method for custom schema generation while
still benefiting from SimpleTool's convenience methods.
Returns:
Complete JSON schema for the tool
"""
@@ -129,6 +149,500 @@ class SimpleTool(BaseTool):
"""
return ToolRequest
# Hook methods for safe attribute access without hasattr/getattr
def get_request_model_name(self, request) -> Optional[str]:
"""Get model name from request. Override for custom model name handling."""
try:
return request.model
except AttributeError:
return None
def get_request_images(self, request) -> list:
"""Get images from request. Override for custom image handling."""
try:
return request.images if request.images is not None else []
except AttributeError:
return []
def get_request_continuation_id(self, request) -> Optional[str]:
"""Get continuation_id from request. Override for custom continuation handling."""
try:
return request.continuation_id
except AttributeError:
return None
def get_request_prompt(self, request) -> str:
"""Get prompt from request. Override for custom prompt handling."""
try:
return request.prompt
except AttributeError:
return ""
def get_request_temperature(self, request) -> Optional[float]:
"""Get temperature from request. Override for custom temperature handling."""
try:
return request.temperature
except AttributeError:
return None
def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
"""
Get temperature from request and validate it against model constraints.
This is a convenience method that combines temperature extraction and validation
for simple tools. It ensures temperature is within valid range for the model.
Args:
request: The request object containing temperature
model_context: Model context object containing model info
Returns:
Tuple of (validated_temperature, warning_messages)
"""
temperature = self.get_request_temperature(request)
if temperature is None:
temperature = self.get_default_temperature()
return self.validate_and_correct_temperature(temperature, model_context)
def get_request_thinking_mode(self, request) -> Optional[str]:
"""Get thinking_mode from request. Override for custom thinking mode handling."""
try:
return request.thinking_mode
except AttributeError:
return None
def get_request_files(self, request) -> list:
"""Get files from request. Override for custom file handling."""
try:
return request.files if request.files is not None else []
except AttributeError:
return []
def get_request_use_websearch(self, request) -> bool:
"""Get use_websearch from request. Override for custom websearch handling."""
try:
return request.use_websearch if request.use_websearch is not None else True
except AttributeError:
return True
def get_request_as_dict(self, request) -> dict:
"""Convert request to dictionary. Override for custom serialization."""
try:
# Try Pydantic v2 method first
return request.model_dump()
except AttributeError:
try:
# Fall back to Pydantic v1 method
return request.dict()
except AttributeError:
# Last resort - convert to dict manually
return {"prompt": self.get_request_prompt(request)}
def set_request_files(self, request, files: list) -> None:
"""Set files on request. Override for custom file setting."""
try:
request.files = files
except AttributeError:
# If request doesn't support file setting, ignore silently
pass
def get_actually_processed_files(self) -> list:
"""Get actually processed files. Override for custom file tracking."""
try:
return self._actually_processed_files
except AttributeError:
return []
async def execute(self, arguments: dict[str, Any]) -> list:
"""
Execute the simple tool using the comprehensive flow from old base.py.
This method replicates the proven execution pattern while using SimpleTool hooks.
"""
import json
import logging
from mcp.types import TextContent
from tools.models import ToolOutput
logger = logging.getLogger(f"tools.{self.get_name()}")
try:
# Store arguments for access by helper methods
self._current_arguments = arguments
logger.info(f"🔧 {self.get_name()} tool called with arguments: {list(arguments.keys())}")
# Validate request using the tool's Pydantic model
request_model = self.get_request_model()
request = request_model(**arguments)
logger.debug(f"Request validation successful for {self.get_name()}")
# Validate file paths for security
# This prevents path traversal attacks and ensures proper access control
path_error = self._validate_file_paths(request)
if path_error:
error_output = ToolOutput(
status="error",
content=path_error,
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]
# Handle model resolution like old base.py
model_name = self.get_request_model_name(request)
if not model_name:
from config import DEFAULT_MODEL
model_name = DEFAULT_MODEL
# Store the current model name for later use
self._current_model_name = model_name
# Handle model context from arguments (for in-process testing)
if "_model_context" in arguments:
self._model_context = arguments["_model_context"]
logger.debug(f"{self.get_name()}: Using model context from arguments")
else:
# Create model context if not provided
from utils.model_context import ModelContext
self._model_context = ModelContext(model_name)
logger.debug(f"{self.get_name()}: Created model context for {model_name}")
# Get images if present
images = self.get_request_images(request)
continuation_id = self.get_request_continuation_id(request)
# Handle conversation history and prompt preparation
if continuation_id:
# Check if conversation history is already embedded
field_value = self.get_request_prompt(request)
if "=== CONVERSATION HISTORY ===" in field_value:
# Use pre-embedded history
prompt = field_value
logger.debug(f"{self.get_name()}: Using pre-embedded conversation history")
else:
# No embedded history - reconstruct it (for in-process calls)
logger.debug(f"{self.get_name()}: No embedded history found, reconstructing conversation")
# Get thread context
from utils.conversation_memory import add_turn, build_conversation_history, get_thread
thread_context = get_thread(continuation_id)
if thread_context:
# Add user's new input to conversation
user_prompt = self.get_request_prompt(request)
user_files = self.get_request_files(request)
if user_prompt:
add_turn(continuation_id, "user", user_prompt, files=user_files)
# Get updated thread context after adding the turn
thread_context = get_thread(continuation_id)
logger.debug(
f"{self.get_name()}: Retrieved updated thread with {len(thread_context.turns)} turns"
)
# Build conversation history with updated thread context
conversation_history, conversation_tokens = build_conversation_history(
thread_context, self._model_context
)
# Get the base prompt from the tool
base_prompt = await self.prepare_prompt(request)
# Combine with conversation history
if conversation_history:
prompt = f"{conversation_history}\n\n=== NEW USER INPUT ===\n{base_prompt}"
else:
prompt = base_prompt
else:
# Thread not found, prepare normally
logger.warning(f"Thread {continuation_id} not found, preparing prompt normally")
prompt = await self.prepare_prompt(request)
else:
# New conversation, prepare prompt normally
prompt = await self.prepare_prompt(request)
# Add follow-up instructions for new conversations
from server import get_follow_up_instructions
follow_up_instructions = get_follow_up_instructions(0)
prompt = f"{prompt}\n\n{follow_up_instructions}"
logger.debug(f"Added follow-up instructions for new {self.get_name()} conversation")
# Validate images if any were provided
if images:
image_validation_error = self._validate_image_limits(
images, model_context=self._model_context, continuation_id=continuation_id
)
if image_validation_error:
return [TextContent(type="text", text=json.dumps(image_validation_error))]
# Get and validate temperature against model constraints
temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
# Log any temperature corrections
for warning in temp_warnings:
logger.warning(warning)
# Get thinking mode with defaults
thinking_mode = self.get_request_thinking_mode(request)
if thinking_mode is None:
thinking_mode = self.get_default_thinking_mode()
# Get the provider from model context (clean OOP - no re-fetching)
provider = self._model_context.provider
# Get system prompt for this tool
system_prompt = self.get_system_prompt()
# Generate AI response using the provider
logger.info(f"Sending request to {provider.get_provider_type().value} API for {self.get_name()}")
logger.info(
f"Using model: {self._model_context.model_name} via {provider.get_provider_type().value} provider"
)
# Estimate tokens for logging
from utils.token_utils import estimate_tokens
estimated_tokens = estimate_tokens(prompt)
logger.debug(f"Prompt length: {len(prompt)} characters (~{estimated_tokens:,} tokens)")
# Generate content with provider abstraction
model_response = provider.generate_content(
prompt=prompt,
model_name=self._current_model_name,
system_prompt=system_prompt,
temperature=temperature,
thinking_mode=thinking_mode if provider.supports_thinking_mode(self._current_model_name) else None,
images=images if images else None,
)
logger.info(f"Received response from {provider.get_provider_type().value} API for {self.get_name()}")
# Process the model's response
if model_response.content:
raw_text = model_response.content
# Create model info for conversation tracking
model_info = {
"provider": provider,
"model_name": self._current_model_name,
"model_response": model_response,
}
# Parse response using the same logic as old base.py
tool_output = self._parse_response(raw_text, request, model_info)
logger.info(f"{self.get_name()} tool completed successfully")
else:
# Handle cases where the model couldn't generate a response
finish_reason = model_response.metadata.get("finish_reason", "Unknown")
logger.warning(f"Response blocked or incomplete for {self.get_name()}. Finish reason: {finish_reason}")
tool_output = ToolOutput(
status="error",
content=f"Response blocked or incomplete. Finish reason: {finish_reason}",
content_type="text",
)
# Return the tool output as TextContent
return [TextContent(type="text", text=tool_output.model_dump_json())]
except Exception as e:
# Special handling for MCP size check errors
if str(e).startswith("MCP_SIZE_CHECK:"):
# Extract the JSON content after the prefix
json_content = str(e)[len("MCP_SIZE_CHECK:") :]
return [TextContent(type="text", text=json_content)]
logger.error(f"Error in {self.get_name()}: {str(e)}")
error_output = ToolOutput(
status="error",
content=f"Error in {self.get_name()}: {str(e)}",
content_type="text",
)
return [TextContent(type="text", text=error_output.model_dump_json())]
def _parse_response(self, raw_text: str, request, model_info: Optional[dict] = None):
"""
Parse the raw response and format it using the hook method.
This simplified version focuses on the SimpleTool pattern: format the response
using the format_response hook, then handle conversation continuation.
"""
from tools.models import ToolOutput
# Format the response using the hook method
formatted_response = self.format_response(raw_text, request, model_info)
# Handle conversation continuation like old base.py
continuation_id = self.get_request_continuation_id(request)
if continuation_id:
# Add turn to conversation memory
from utils.conversation_memory import add_turn
# Extract model metadata for conversation tracking
model_provider = None
model_name = None
model_metadata = None
if model_info:
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
model_provider = provider
else:
try:
model_provider = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
model_provider = str(provider)
model_name = model_info.get("model_name")
model_response = model_info.get("model_response")
if model_response:
model_metadata = {"usage": model_response.usage, "metadata": model_response.metadata}
# Only add the assistant's response to the conversation
# The user's turn is handled elsewhere (when thread is created/continued)
add_turn(
continuation_id, # thread_id as positional argument
"assistant", # role as positional argument
raw_text, # content as positional argument
files=self.get_request_files(request),
images=self.get_request_images(request),
tool_name=self.get_name(),
model_provider=model_provider,
model_name=model_name,
model_metadata=model_metadata,
)
# Create continuation offer like old base.py
continuation_data = self._create_continuation_offer(request, model_info)
if continuation_data:
return self._create_continuation_offer_response(formatted_response, continuation_data, request, model_info)
else:
# Build metadata with model and provider info for success response
metadata = {}
if model_info:
model_name = model_info.get("model_name")
if model_name:
metadata["model_used"] = model_name
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
metadata["provider_used"] = provider
else:
try:
metadata["provider_used"] = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
metadata["provider_used"] = str(provider)
return ToolOutput(
status="success",
content=formatted_response,
content_type="text",
metadata=metadata if metadata else None,
)
def _create_continuation_offer(self, request, model_info: Optional[dict] = None):
"""Create continuation offer following old base.py pattern"""
continuation_id = self.get_request_continuation_id(request)
try:
from utils.conversation_memory import create_thread, get_thread
if continuation_id:
# Existing conversation
thread_context = get_thread(continuation_id)
if thread_context and thread_context.turns:
turn_count = len(thread_context.turns)
from utils.conversation_memory import MAX_CONVERSATION_TURNS
if turn_count >= MAX_CONVERSATION_TURNS - 1:
return None # No more turns allowed
remaining_turns = MAX_CONVERSATION_TURNS - turn_count - 1
return {
"continuation_id": continuation_id,
"remaining_turns": remaining_turns,
"note": f"Claude can continue this conversation for {remaining_turns} more exchanges.",
}
else:
# New conversation - create thread and offer continuation
# Convert request to dict for initial_context
initial_request_dict = self.get_request_as_dict(request)
new_thread_id = create_thread(tool_name=self.get_name(), initial_request=initial_request_dict)
# Add the initial user turn to the new thread
from utils.conversation_memory import MAX_CONVERSATION_TURNS, add_turn
user_prompt = self.get_request_prompt(request)
user_files = self.get_request_files(request)
user_images = self.get_request_images(request)
# Add user's initial turn
add_turn(
new_thread_id, "user", user_prompt, files=user_files, images=user_images, tool_name=self.get_name()
)
return {
"continuation_id": new_thread_id,
"remaining_turns": MAX_CONVERSATION_TURNS - 1,
"note": f"Claude can continue this conversation for {MAX_CONVERSATION_TURNS - 1} more exchanges.",
}
except Exception:
return None
def _create_continuation_offer_response(
self, content: str, continuation_data: dict, request, model_info: Optional[dict] = None
):
"""Create response with continuation offer following old base.py pattern"""
from tools.models import ContinuationOffer, ToolOutput
try:
continuation_offer = ContinuationOffer(
continuation_id=continuation_data["continuation_id"],
note=continuation_data["note"],
remaining_turns=continuation_data["remaining_turns"],
)
# Build metadata with model and provider info
metadata = {"tool_name": self.get_name(), "conversation_ready": True}
if model_info:
model_name = model_info.get("model_name")
if model_name:
metadata["model_used"] = model_name
provider = model_info.get("provider")
if provider:
# Handle both provider objects and string values
if isinstance(provider, str):
metadata["provider_used"] = provider
else:
try:
metadata["provider_used"] = provider.get_provider_type().value
except AttributeError:
# Fallback if provider doesn't have get_provider_type method
metadata["provider_used"] = str(provider)
return ToolOutput(
status="continuation_available",
content=content,
content_type="text",
continuation_offer=continuation_offer,
metadata=metadata,
)
except Exception:
# Fallback to simple success if continuation offer fails
return ToolOutput(status="success", content=content, content_type="text")
# Convenience methods for common tool patterns
def build_standard_prompt(
@@ -153,9 +667,13 @@ class SimpleTool(BaseTool):
Complete formatted prompt ready for the AI model
"""
# Add context files if provided
if hasattr(request, "files") and request.files:
files = self.get_request_files(request)
if files:
file_content, processed_files = self._prepare_file_content_for_prompt(
request.files, request.continuation_id, "Context files"
files,
self.get_request_continuation_id(request),
"Context files",
model_context=getattr(self, "_model_context", None),
)
self._actually_processed_files = processed_files
if file_content:
@@ -166,8 +684,9 @@ class SimpleTool(BaseTool):
# Add web search instruction if enabled
websearch_instruction = ""
if hasattr(request, "use_websearch") and request.use_websearch:
websearch_instruction = self.get_websearch_instruction(request.use_websearch, self.get_websearch_guidance())
use_websearch = self.get_request_use_websearch(request)
if use_websearch:
websearch_instruction = self.get_websearch_instruction(use_websearch, self.get_websearch_guidance())
# Combine system prompt with user content
full_prompt = f"""{system_prompt}{websearch_instruction}
@@ -180,6 +699,32 @@ Please provide a thoughtful, comprehensive response:"""
return full_prompt
def get_prompt_content_for_size_validation(self, user_content: str) -> str:
"""
Override to use original user prompt for size validation when conversation history is embedded.
When server.py embeds conversation history into the prompt field, it also stores
the original user prompt in _original_user_prompt. We use that for size validation
to avoid incorrectly triggering size limits due to conversation history.
Args:
user_content: The user content (may include conversation history)
Returns:
The original user prompt if available, otherwise the full user content
"""
# Check if we have the current arguments from execute() method
current_args = getattr(self, "_current_arguments", None)
if current_args:
# If server.py embedded conversation history, it stores original prompt separately
original_user_prompt = current_args.get("_original_user_prompt")
if original_user_prompt is not None:
# Use original user prompt for size validation (excludes conversation history)
return original_user_prompt
# Fallback to default behavior (validate full user content)
return user_content
def get_websearch_guidance(self) -> Optional[str]:
"""
Return tool-specific web search guidance.
@@ -210,23 +755,121 @@ Please provide a thoughtful, comprehensive response:"""
ValueError: If prompt is too large for MCP transport
"""
# Check for prompt.txt in files
if hasattr(request, "files"):
prompt_content, updated_files = self.handle_prompt_file(request.files)
files = self.get_request_files(request)
if files:
prompt_content, updated_files = self.handle_prompt_file(files)
# Update request files list
# Update request files list if needed
if updated_files is not None:
request.files = updated_files
self.set_request_files(request, updated_files)
else:
prompt_content = None
# Use prompt.txt content if available, otherwise use the prompt field
user_content = prompt_content if prompt_content else getattr(request, "prompt", "")
user_content = prompt_content if prompt_content else self.get_request_prompt(request)
# Check user input size at MCP transport boundary
size_check = self.check_prompt_size(user_content)
# Check user input size at MCP transport boundary (excluding conversation history)
validation_content = self.get_prompt_content_for_size_validation(user_content)
size_check = self.check_prompt_size(validation_content)
if size_check:
from tools.models import ToolOutput
raise ValueError(f"MCP_SIZE_CHECK:{ToolOutput(**size_check).model_dump_json()}")
return user_content
def get_chat_style_websearch_guidance(self) -> str:
"""
Get Chat tool-style web search guidance.
Returns web search guidance that matches the original Chat tool pattern.
This is useful for tools that want to maintain the same search behavior.
Returns:
Web search guidance text
"""
return """When discussing topics, consider if searches for these would help:
- Documentation for any technologies or concepts mentioned
- Current best practices and patterns
- Recent developments or updates
- Community discussions and solutions"""
def supports_custom_request_model(self) -> bool:
"""
Indicate whether this tool supports custom request models.
Simple tools support custom request models by default. Tools that override
get_request_model() to return something other than ToolRequest should
return True here.
Returns:
True if the tool uses a custom request model
"""
return self.get_request_model() != ToolRequest
def _validate_file_paths(self, request) -> Optional[str]:
"""
Validate that all file paths in the request are absolute paths.
This is a security measure to prevent path traversal attacks and ensure
proper access control. All file paths must be absolute (starting with '/').
Args:
request: The validated request object
Returns:
Optional[str]: Error message if validation fails, None if all paths are valid
"""
import os
# Check if request has 'files' attribute (used by most tools)
files = self.get_request_files(request)
if files:
for file_path in files:
if not os.path.isabs(file_path):
return (
f"Error: All file paths must be FULL absolute paths to real files / folders - DO NOT SHORTEN. "
f"Received relative path: {file_path}\n"
f"Please provide the full absolute path starting with '/' (must be FULL absolute paths to real files / folders - DO NOT SHORTEN)"
)
return None
def prepare_chat_style_prompt(self, request, system_prompt: str = None) -> str:
"""
Prepare a prompt using Chat tool-style patterns.
This convenience method replicates the Chat tool's prompt preparation logic:
1. Handle prompt.txt file if present
2. Add file context with specific formatting
3. Add web search guidance
4. Format with system prompt
Args:
request: The validated request object
system_prompt: System prompt to use (uses get_system_prompt() if None)
Returns:
Complete formatted prompt
"""
# Use provided system prompt or get from tool
if system_prompt is None:
system_prompt = self.get_system_prompt()
# Get user content (handles prompt.txt files)
user_content = self.handle_prompt_file_with_fallback(request)
# Build standard prompt with Chat-style web search guidance
websearch_guidance = self.get_chat_style_websearch_guidance()
# Override the websearch guidance temporarily
original_guidance = self.get_websearch_guidance
self.get_websearch_guidance = lambda: websearch_guidance
try:
full_prompt = self.build_standard_prompt(system_prompt, user_content, request, "CONTEXT FILES")
finally:
# Restore original guidance method
self.get_websearch_guidance = original_guidance
return full_prompt

View File

@@ -147,6 +147,8 @@ class TestGenTool(WorkflowTool):
including edge case identification, framework detection, and comprehensive coverage planning.
"""
__test__ = False # Prevent pytest from collecting this class as a test
def __init__(self):
super().__init__()
self.initial_request = None

350
tools/version.py Normal file
View File

@@ -0,0 +1,350 @@
"""
Version Tool - Display Zen MCP Server version and system information
This tool provides version information about the Zen MCP Server including
version number, last update date, author, and basic system information.
It also checks for updates from the GitHub repository.
"""
import logging
import platform
import re
import sys
from pathlib import Path
from typing import Any, Optional
try:
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
HAS_URLLIB = True
except ImportError:
HAS_URLLIB = False
from mcp.types import TextContent
from config import __author__, __updated__, __version__
from tools.models import ToolModelCategory, ToolOutput
from tools.shared.base_models import ToolRequest
from tools.shared.base_tool import BaseTool
logger = logging.getLogger(__name__)
def parse_version(version_str: str) -> tuple[int, int, int]:
"""
Parse version string to tuple of integers for comparison.
Args:
version_str: Version string like "5.5.5"
Returns:
Tuple of (major, minor, patch) as integers
"""
try:
parts = version_str.strip().split(".")
if len(parts) >= 3:
return (int(parts[0]), int(parts[1]), int(parts[2]))
elif len(parts) == 2:
return (int(parts[0]), int(parts[1]), 0)
elif len(parts) == 1:
return (int(parts[0]), 0, 0)
else:
return (0, 0, 0)
except (ValueError, IndexError):
return (0, 0, 0)
def compare_versions(current: str, remote: str) -> int:
"""
Compare two version strings.
Args:
current: Current version string
remote: Remote version string
Returns:
-1 if current < remote (update available)
0 if current == remote (up to date)
1 if current > remote (ahead of remote)
"""
current_tuple = parse_version(current)
remote_tuple = parse_version(remote)
if current_tuple < remote_tuple:
return -1
elif current_tuple > remote_tuple:
return 1
else:
return 0
def fetch_github_version() -> Optional[tuple[str, str]]:
"""
Fetch the latest version information from GitHub repository.
Returns:
Tuple of (version, last_updated) if successful, None if failed
"""
if not HAS_URLLIB:
logger.warning("urllib not available, cannot check for updates")
return None
github_url = "https://raw.githubusercontent.com/BeehiveInnovations/zen-mcp-server/main/config.py"
try:
# Set a 10-second timeout
with urlopen(github_url, timeout=10) as response:
if response.status != 200:
logger.warning(f"HTTP error while checking GitHub: {response.status}")
return None
content = response.read().decode("utf-8")
# Extract version using regex
version_match = re.search(r'__version__\s*=\s*["\']([^"\']+)["\']', content)
updated_match = re.search(r'__updated__\s*=\s*["\']([^"\']+)["\']', content)
if version_match:
remote_version = version_match.group(1)
remote_updated = updated_match.group(1) if updated_match else "Unknown"
return (remote_version, remote_updated)
else:
logger.warning("Could not parse version from GitHub config.py")
return None
except HTTPError as e:
logger.warning(f"HTTP error while checking GitHub: {e.code}")
return None
except URLError as e:
logger.warning(f"URL error while checking GitHub: {e.reason}")
return None
except Exception as e:
logger.warning(f"Error checking GitHub for updates: {e}")
return None
class VersionTool(BaseTool):
"""
Tool for displaying Zen MCP Server version and system information.
This tool provides:
- Current server version
- Last update date
- Author information
- Python version
- Platform information
"""
def get_name(self) -> str:
return "version"
def get_description(self) -> str:
return (
"VERSION & CONFIGURATION - Get server version, configuration details, and list of available tools. "
"Useful for debugging and understanding capabilities."
)
def get_input_schema(self) -> dict[str, Any]:
"""Return the JSON schema for the tool's input"""
return {"type": "object", "properties": {}, "required": []}
def get_system_prompt(self) -> str:
"""No AI model needed for this tool"""
return ""
def get_request_model(self):
"""Return the Pydantic model for request validation."""
return ToolRequest
async def prepare_prompt(self, request: ToolRequest) -> str:
"""Not used for this utility tool"""
return ""
def format_response(self, response: str, request: ToolRequest, model_info: dict = None) -> str:
"""Not used for this utility tool"""
return response
async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
"""
Display Zen MCP Server version and system information.
This overrides the base class execute to provide direct output without AI model calls.
Args:
arguments: Standard tool arguments (none required)
Returns:
Formatted version and system information
"""
output_lines = ["# Zen MCP Server Version\n"]
# Server version information
output_lines.append("## Server Information")
output_lines.append(f"**Current Version**: {__version__}")
output_lines.append(f"**Last Updated**: {__updated__}")
output_lines.append(f"**Author**: {__author__}")
# Get the current working directory (MCP server location)
current_path = Path.cwd()
output_lines.append(f"**Installation Path**: `{current_path}`")
output_lines.append("")
# Check for updates from GitHub
output_lines.append("## Update Status")
try:
github_info = fetch_github_version()
if github_info:
remote_version, remote_updated = github_info
comparison = compare_versions(__version__, remote_version)
output_lines.append(f"**Latest Version (GitHub)**: {remote_version}")
output_lines.append(f"**Latest Updated**: {remote_updated}")
if comparison < 0:
# Update available
output_lines.append("")
output_lines.append("🚀 **UPDATE AVAILABLE!**")
output_lines.append(
f"Your version `{__version__}` is older than the latest version `{remote_version}`"
)
output_lines.append("")
output_lines.append("**To update:**")
output_lines.append("```bash")
output_lines.append(f"cd {current_path}")
output_lines.append("git pull")
output_lines.append("```")
output_lines.append("")
output_lines.append("*Note: Restart your Claude session after updating to use the new version.*")
elif comparison == 0:
# Up to date
output_lines.append("")
output_lines.append("✅ **UP TO DATE**")
output_lines.append("You are running the latest version.")
else:
# Ahead of remote (development version)
output_lines.append("")
output_lines.append("🔬 **DEVELOPMENT VERSION**")
output_lines.append(
f"Your version `{__version__}` is ahead of the published version `{remote_version}`"
)
output_lines.append("You may be running a development or custom build.")
else:
output_lines.append("❌ **Could not check for updates**")
output_lines.append("Unable to connect to GitHub or parse version information.")
output_lines.append("Check your internet connection or try again later.")
except Exception as e:
logger.error(f"Error during version check: {e}")
output_lines.append("❌ **Error checking for updates**")
output_lines.append(f"Error: {str(e)}")
output_lines.append("")
# Python and system information
output_lines.append("## System Information")
output_lines.append(
f"**Python Version**: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
)
output_lines.append(f"**Platform**: {platform.system()} {platform.release()}")
output_lines.append(f"**Architecture**: {platform.machine()}")
output_lines.append("")
# Available tools
try:
# Import here to avoid circular imports
from server import TOOLS
tool_names = sorted(TOOLS.keys())
output_lines.append("## Available Tools")
output_lines.append(f"**Total Tools**: {len(tool_names)}")
output_lines.append("\n**Tool List**:")
for tool_name in tool_names:
tool = TOOLS[tool_name]
# Get the first line of the tool's description for a brief summary
description = tool.get_description().split("\n")[0]
# Truncate if too long
if len(description) > 80:
description = description[:77] + "..."
output_lines.append(f"- `{tool_name}` - {description}")
output_lines.append("")
except Exception as e:
logger.warning(f"Error loading tools list: {e}")
output_lines.append("## Available Tools")
output_lines.append("**Error**: Could not load tools list")
output_lines.append("")
# Configuration information
output_lines.append("## Configuration")
# Check for configured providers
try:
from providers.base import ProviderType
from providers.registry import ModelProviderRegistry
provider_status = []
# Check each provider type
provider_types = [
ProviderType.GOOGLE,
ProviderType.OPENAI,
ProviderType.XAI,
ProviderType.OPENROUTER,
ProviderType.CUSTOM,
]
provider_names = ["Google Gemini", "OpenAI", "X.AI", "OpenRouter", "Custom/Local"]
for provider_type, provider_name in zip(provider_types, provider_names):
provider = ModelProviderRegistry.get_provider(provider_type)
status = "✅ Configured" if provider is not None else "❌ Not configured"
provider_status.append(f"- **{provider_name}**: {status}")
output_lines.append("**Providers**:")
output_lines.extend(provider_status)
# Get total available models
try:
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
output_lines.append(f"\n**Available Models**: {len(available_models)}")
except Exception:
output_lines.append("\n**Available Models**: Unknown")
except Exception as e:
logger.warning(f"Error checking provider configuration: {e}")
output_lines.append("**Providers**: Error checking configuration")
output_lines.append("")
# Usage information
output_lines.append("## Usage")
output_lines.append("- Use `listmodels` tool to see all available AI models")
output_lines.append("- Use `chat` for interactive conversations and brainstorming")
output_lines.append("- Use workflow tools (`debug`, `codereview`, `docgen`, etc.) for systematic analysis")
output_lines.append("- Set DEFAULT_MODEL=auto to let Claude choose the best model for each task")
# Format output
content = "\n".join(output_lines)
tool_output = ToolOutput(
status="success",
content=content,
content_type="text",
metadata={
"tool_name": self.name,
"server_version": __version__,
"last_updated": __updated__,
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"platform": f"{platform.system()} {platform.release()}",
},
)
return [TextContent(type="text", text=tool_output.model_dump_json())]
def get_model_category(self) -> ToolModelCategory:
"""Return the model category for this tool."""
return ToolModelCategory.FAST_RESPONSE # Simple version info, no AI needed

View File

@@ -28,6 +28,7 @@ from typing import Any, Optional
from mcp.types import TextContent
from config import MCP_PROMPT_SIZE_LIMIT
from utils.conversation_memory import add_turn, create_thread
from ..shared.base_models import ConsolidatedFindings
@@ -111,6 +112,7 @@ class BaseWorkflowMixin(ABC):
description: str,
remaining_budget: Optional[int] = None,
arguments: Optional[dict[str, Any]] = None,
model_context: Optional[Any] = None,
) -> tuple[str, list[str]]:
"""Prepare file content for prompts. Usually provided by BaseTool."""
pass
@@ -230,6 +232,23 @@ class BaseWorkflowMixin(ABC):
except AttributeError:
return self.get_default_temperature()
def get_validated_temperature(self, request, model_context: Any) -> tuple[float, list[str]]:
"""
Get temperature from request and validate it against model constraints.
This is a convenience method that combines temperature extraction and validation
for workflow tools. It ensures temperature is within valid range for the model.
Args:
request: The request object containing temperature
model_context: Model context object containing model info
Returns:
Tuple of (validated_temperature, warning_messages)
"""
temperature = self.get_request_temperature(request)
return self.validate_and_correct_temperature(temperature, model_context)
def get_request_thinking_mode(self, request) -> str:
"""Get thinking mode from request. Override for custom thinking mode handling."""
try:
@@ -496,19 +515,22 @@ class BaseWorkflowMixin(ABC):
return
try:
# Ensure model context is available - fall back to resolution if needed
# Model context should be available from early validation, but might be deferred for tests
current_model_context = self.get_current_model_context()
if not current_model_context:
# Try to resolve model context now (deferred from early validation)
try:
model_name, model_context = self._resolve_model_context(arguments, request)
self._model_context = model_context
self._current_model_name = model_name
except Exception as e:
logger.error(f"[WORKFLOW_FILES] {self.get_name()}: Failed to resolve model context: {e}")
# Create fallback model context
# Create fallback model context (preserves existing test behavior)
from utils.model_context import ModelContext
model_name = self.get_request_model_name(request)
self._model_context = ModelContext(model_name)
self._current_model_name = model_name
# Use the same file preparation logic as BaseTool with token budgeting
continuation_id = self.get_request_continuation_id(request)
@@ -520,6 +542,7 @@ class BaseWorkflowMixin(ABC):
"Workflow files for analysis",
remaining_budget=remaining_tokens,
arguments=arguments,
model_context=self._model_context,
)
# Store for use in expert analysis
@@ -595,6 +618,20 @@ class BaseWorkflowMixin(ABC):
# Validate request using tool-specific model
request = self.get_workflow_request_model()(**arguments)
# Validate step field size (basic validation for workflow instructions)
# If step is too large, user should use shorter instructions and put details in files
step_content = request.step
if step_content and len(step_content) > MCP_PROMPT_SIZE_LIMIT:
from tools.models import ToolOutput
error_output = ToolOutput(
status="resend_prompt",
content="Step instructions are too long. Please use shorter instructions and provide detailed context via file paths instead.",
content_type="text",
metadata={"prompt_size": len(step_content), "limit": MCP_PROMPT_SIZE_LIMIT},
)
raise ValueError(f"MCP_SIZE_CHECK:{error_output.model_dump_json()}")
# Validate file paths for security (same as base tool)
# Use try/except instead of hasattr as per coding standards
try:
@@ -612,6 +649,20 @@ class BaseWorkflowMixin(ABC):
# validate_file_paths method not available - skip validation
pass
# Try to validate model availability early for production scenarios
# For tests, defer model validation to later to allow mocks to work
try:
model_name, model_context = self._resolve_model_context(arguments, request)
# Store for later use
self._current_model_name = model_name
self._model_context = model_context
except ValueError as e:
# Model resolution failed - in production this would be an error,
# but for tests we defer to allow mocks to handle model resolution
logger.debug(f"Early model validation failed, deferring to later: {e}")
self._current_model_name = None
self._model_context = None
# Adjust total steps if needed
if request.step_number > request.total_steps:
request.total_steps = request.step_number
@@ -1364,29 +1415,26 @@ class BaseWorkflowMixin(ABC):
async def _call_expert_analysis(self, arguments: dict, request) -> dict:
"""Call external model for expert analysis"""
try:
# Use the same model resolution logic as BaseTool
model_context = arguments.get("_model_context")
resolved_model_name = arguments.get("_resolved_model_name")
if model_context and resolved_model_name:
self._model_context = model_context
model_name = resolved_model_name
else:
# Fallback for direct calls - requires BaseTool methods
# Model context should be resolved from early validation, but handle fallback for tests
if not self._model_context:
# Try to resolve model context for expert analysis (deferred from early validation)
try:
model_name, model_context = self._resolve_model_context(arguments, request)
self._model_context = model_context
self._current_model_name = model_name
except Exception as e:
logger.error(f"Failed to resolve model context: {e}")
# Use request model as fallback
logger.error(f"Failed to resolve model context for expert analysis: {e}")
# Use request model as fallback (preserves existing test behavior)
model_name = self.get_request_model_name(request)
from utils.model_context import ModelContext
model_context = ModelContext(model_name)
self._model_context = model_context
self._current_model_name = model_name
else:
model_name = self._current_model_name
self._current_model_name = model_name
provider = self.get_model_provider(model_name)
provider = self._model_context.provider
# Prepare expert analysis context
expert_context = self.prepare_expert_analysis_context(self.consolidated_findings)
@@ -1407,12 +1455,19 @@ class BaseWorkflowMixin(ABC):
else:
prompt = expert_context
# Validate temperature against model constraints
validated_temperature, temp_warnings = self.get_validated_temperature(request, self._model_context)
# Log any temperature corrections
for warning in temp_warnings:
logger.warning(warning)
# Generate AI response - use request parameters if available
model_response = provider.generate_content(
prompt=prompt,
model_name=model_name,
system_prompt=system_prompt,
temperature=self.get_request_temperature(request),
temperature=validated_temperature,
thinking_mode=self.get_request_thinking_mode(request),
use_websearch=self.get_request_use_websearch(request),
images=list(set(self.consolidated_findings.images)) if self.consolidated_findings.images else None,