Files
my-pal-mcp-server/tools/precommit.py
2025-06-11 17:18:40 +04:00

441 lines
21 KiB
Python

"""
Tool for pre-commit validation of git changes across multiple repositories.
Design Note - File Content in Multiple Sections:
Files may legitimately appear in both "Git Diffs" and "Additional Context Files" sections:
- Git Diffs: Shows changed lines + limited context (marked with "BEGIN DIFF" / "END DIFF")
- Additional Context: Shows complete file content (marked with "BEGIN FILE" / "END FILE")
This provides comprehensive context for AI analysis - not a duplication bug.
"""
import os
from typing import Any, Literal, Optional
from mcp.types import TextContent
from pydantic import Field
from config import MAX_CONTEXT_TOKENS
from prompts.tool_prompts import PRECOMMIT_PROMPT
from utils.file_utils import translate_file_paths, translate_path_for_environment
from utils.git_utils import find_git_repositories, get_git_status, run_git_command
from utils.token_utils import estimate_tokens
from .base import BaseTool, ToolRequest
from .models import ToolOutput
class PrecommitRequest(ToolRequest):
"""Request model for precommit tool"""
path: str = Field(
...,
description="Starting directory to search for git repositories (must be absolute path).",
)
original_request: Optional[str] = Field(
None,
description="The original user request description for the changes. Provides critical context for the review.",
)
compare_to: Optional[str] = Field(
None,
description="Optional: A git ref (branch, tag, commit hash) to compare against. If not provided, reviews local staged and unstaged changes.",
)
include_staged: bool = Field(
True,
description="Include staged changes in the review. Only applies if 'compare_to' is not set.",
)
include_unstaged: bool = Field(
True,
description="Include uncommitted (unstaged) changes in the review. Only applies if 'compare_to' is not set.",
)
focus_on: Optional[str] = Field(
None,
description="Specific aspects to focus on (e.g., 'logic for user authentication', 'database query efficiency').",
)
review_type: Literal["full", "security", "performance", "quick"] = Field(
"full", description="Type of review to perform on the changes."
)
severity_filter: Literal["critical", "high", "medium", "all"] = Field(
"all",
description="Minimum severity level to report on the changes.",
)
max_depth: int = Field(
5,
description="Maximum depth to search for nested git repositories to prevent excessive recursion.",
)
temperature: Optional[float] = Field(
None,
description="Temperature for the response (0.0 to 1.0). Lower values are more focused and deterministic.",
ge=0.0,
le=1.0,
)
thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field(
None, description="Thinking depth mode for the assistant."
)
files: Optional[list[str]] = Field(
None,
description="Optional files or directories to provide as context (must be absolute paths). These files are not part of the changes but provide helpful context like configs, docs, or related code.",
)
class Precommit(BaseTool):
"""Tool for pre-commit validation of git changes across multiple repositories."""
def get_name(self) -> str:
return "precommit"
def get_description(self) -> str:
return (
"PRECOMMIT VALIDATION FOR GIT CHANGES - ALWAYS use this tool before creating any git commit! "
"Comprehensive pre-commit validation that catches bugs, security issues, incomplete implementations, "
"and ensures changes match the original requirements. Searches all git repositories recursively and "
"provides deep analysis of staged/unstaged changes. Essential for code quality and preventing bugs. "
"Use this before committing, when reviewing changes, checking your changes, validating changes, "
"or when you're about to commit or ready to commit. Claude should proactively suggest using this tool "
"whenever the user mentions committing or when changes are complete. "
"Choose thinking_mode based on changeset size: 'low' for small focused changes, "
"'medium' for standard commits (default), 'high' for large feature branches or complex refactoring, "
"'max' for critical releases or when reviewing extensive changes across multiple systems."
)
def get_input_schema(self) -> dict[str, Any]:
schema = self.get_request_model().model_json_schema()
# Ensure use_websearch is in the schema with proper description
if "properties" in schema and "use_websearch" not in schema["properties"]:
schema["properties"]["use_websearch"] = {
"type": "boolean",
"description": "Enable web search for documentation, best practices, and current information. Particularly useful for: brainstorming sessions, architectural design discussions, exploring industry best practices, working with specific frameworks/technologies, researching solutions to complex problems, or when current documentation and community insights would enhance the analysis.",
"default": True,
}
# Add continuation_id parameter
if "properties" in schema and "continuation_id" not in schema["properties"]:
schema["properties"]["continuation_id"] = {
"type": "string",
"description": "Thread continuation ID for multi-turn conversations. Can be used to continue conversations across different tools. Only provide this if continuing a previous conversation thread.",
}
return schema
def get_system_prompt(self) -> str:
return PRECOMMIT_PROMPT
def get_request_model(self):
return PrecommitRequest
def get_default_temperature(self) -> float:
"""Use analytical temperature for code review."""
from config import TEMPERATURE_ANALYTICAL
return TEMPERATURE_ANALYTICAL
async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
"""Override execute to check original_request size before processing"""
# First validate request
request_model = self.get_request_model()
request = request_model(**arguments)
# Check original_request size if provided
if request.original_request:
size_check = self.check_prompt_size(request.original_request)
if size_check:
return [TextContent(type="text", text=ToolOutput(**size_check).model_dump_json())]
# Continue with normal execution
return await super().execute(arguments)
async def prepare_prompt(self, request: PrecommitRequest) -> str:
"""Prepare the prompt with git diff information."""
# Check for prompt.txt in files
prompt_content, updated_files = self.handle_prompt_file(request.files)
# If prompt.txt was found, use it as original_request
if prompt_content:
request.original_request = prompt_content
# Update request files list
if updated_files is not None:
request.files = updated_files
# Translate the path and files if running in Docker
translated_path = translate_path_for_environment(request.path)
translated_files = translate_file_paths(request.files)
# Check if the path translation resulted in an error path
if translated_path.startswith("/inaccessible/"):
raise ValueError(
f"The path '{request.path}' is not accessible from within the Docker container. "
f"The Docker container can only access files within the mounted workspace. "
f"Please ensure the path is within the mounted directory or adjust your Docker volume mounts."
)
# Find all git repositories
repositories = find_git_repositories(translated_path, request.max_depth)
if not repositories:
return "No git repositories found in the specified path."
# Collect all diffs directly
all_diffs = []
repo_summaries = []
total_tokens = 0
max_tokens = MAX_CONTEXT_TOKENS - 50000 # Reserve tokens for prompt and response
for repo_path in repositories:
repo_name = os.path.basename(repo_path) or "root"
# Get status information
status = get_git_status(repo_path)
changed_files = []
# Process based on mode
if request.compare_to:
# Validate the ref
is_valid_ref, err_msg = run_git_command(
repo_path,
["rev-parse", "--verify", "--quiet", request.compare_to],
)
if not is_valid_ref:
repo_summaries.append(
{
"path": repo_path,
"error": f"Invalid or unknown git ref '{request.compare_to}': {err_msg}",
"changed_files": 0,
}
)
continue
# Get list of changed files
success, files_output = run_git_command(
repo_path,
["diff", "--name-only", f"{request.compare_to}...HEAD"],
)
if success and files_output.strip():
changed_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs
for file_path in changed_files:
success, diff = run_git_command(
repo_path,
[
"diff",
f"{request.compare_to}...HEAD",
"--",
file_path,
],
)
if success and diff.strip():
# Format diff with file header
diff_header = (
f"\n--- BEGIN DIFF: {repo_name} / {file_path} (compare to {request.compare_to}) ---\n"
)
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
else:
# Handle staged/unstaged changes
staged_files = []
unstaged_files = []
if request.include_staged:
success, files_output = run_git_command(repo_path, ["diff", "--name-only", "--cached"])
if success and files_output.strip():
staged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for staged changes
# Each diff is wrapped with clear markers to distinguish from full file content
for file_path in staged_files:
success, diff = run_git_command(repo_path, ["diff", "--cached", "--", file_path])
if success and diff.strip():
# Use "BEGIN DIFF" markers (distinct from "BEGIN FILE" markers in utils/file_utils.py)
# This allows AI to distinguish between diff context vs complete file content
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (staged) ---\n"
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
if request.include_unstaged:
success, files_output = run_git_command(repo_path, ["diff", "--name-only"])
if success and files_output.strip():
unstaged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for unstaged changes
# Same clear marker pattern as staged changes above
for file_path in unstaged_files:
success, diff = run_git_command(repo_path, ["diff", "--", file_path])
if success and diff.strip():
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (unstaged) ---\n"
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
# Combine unique files
changed_files = list(set(staged_files + unstaged_files))
# Add repository summary
if changed_files:
repo_summaries.append(
{
"path": repo_path,
"branch": status["branch"],
"ahead": status["ahead"],
"behind": status["behind"],
"changed_files": len(changed_files),
"files": changed_files[:20], # First 20 for summary
}
)
if not all_diffs:
return "No pending changes found in any of the git repositories."
# Process context files if provided using standardized file reading
context_files_content = []
context_files_summary = []
context_tokens = 0
if translated_files:
remaining_tokens = max_tokens - total_tokens
# Use centralized file handling with filtering for duplicate prevention
file_content = self._prepare_file_content_for_prompt(
translated_files,
request.continuation_id,
"Context files",
max_tokens=remaining_tokens + 1000, # Add back the reserve that was calculated
reserve_tokens=1000, # Small reserve for formatting
)
if file_content:
context_tokens = estimate_tokens(file_content)
context_files_content = [file_content]
context_files_summary.append(f"✅ Included: {len(translated_files)} context files")
else:
context_files_summary.append("⚠️ No context files could be read or files too large")
total_tokens += context_tokens
# Build the final prompt
prompt_parts = []
# Add original request context if provided
if request.original_request:
prompt_parts.append(f"## Original Request\n\n{request.original_request}\n")
# Add review parameters
prompt_parts.append("## Review Parameters\n")
prompt_parts.append(f"- Review Type: {request.review_type}")
prompt_parts.append(f"- Severity Filter: {request.severity_filter}")
if request.focus_on:
prompt_parts.append(f"- Focus Areas: {request.focus_on}")
if request.compare_to:
prompt_parts.append(f"- Comparing Against: {request.compare_to}")
else:
review_scope = []
if request.include_staged:
review_scope.append("staged")
if request.include_unstaged:
review_scope.append("unstaged")
prompt_parts.append(f"- Reviewing: {' and '.join(review_scope)} changes")
# Add repository summary
prompt_parts.append("\n## Repository Changes Summary\n")
prompt_parts.append(f"Found {len(repo_summaries)} repositories with changes:\n")
for idx, summary in enumerate(repo_summaries, 1):
prompt_parts.append(f"\n### Repository {idx}: {summary['path']}")
if "error" in summary:
prompt_parts.append(f"⚠️ Error: {summary['error']}")
else:
prompt_parts.append(f"- Branch: {summary['branch']}")
if summary["ahead"] or summary["behind"]:
prompt_parts.append(f"- Ahead: {summary['ahead']}, Behind: {summary['behind']}")
prompt_parts.append(f"- Changed Files: {summary['changed_files']}")
if summary["files"]:
prompt_parts.append("\nChanged files:")
for file in summary["files"]:
prompt_parts.append(f" - {file}")
if summary["changed_files"] > len(summary["files"]):
prompt_parts.append(f" ... and {summary['changed_files'] - len(summary['files'])} more files")
# Add context files summary if provided
if context_files_summary:
prompt_parts.append("\n## Context Files Summary\n")
for summary_item in context_files_summary:
prompt_parts.append(f"- {summary_item}")
# Add token usage summary
if total_tokens > 0:
prompt_parts.append(f"\nTotal context tokens used: ~{total_tokens:,}")
# Add the diff contents with clear section markers
# Each diff is wrapped with "--- BEGIN DIFF: ... ---" and "--- END DIFF: ... ---"
prompt_parts.append("\n## Git Diffs\n")
if all_diffs:
prompt_parts.extend(all_diffs)
else:
prompt_parts.append("--- NO DIFFS FOUND ---")
# Add context files content if provided
# IMPORTANT: Files may legitimately appear in BOTH sections:
# - Git Diffs: Show only changed lines + limited context (what changed)
# - Additional Context: Show complete file content (full understanding)
# This is intentional design for comprehensive AI analysis, not duplication bug.
# Each file in this section is wrapped with "--- BEGIN FILE: ... ---" and "--- END FILE: ... ---"
if context_files_content:
prompt_parts.append("\n## Additional Context Files")
prompt_parts.append(
"The following files are provided for additional context. They have NOT been modified.\n"
)
prompt_parts.extend(context_files_content)
# Add web search instruction if enabled
websearch_instruction = self.get_websearch_instruction(
request.use_websearch,
"""When validating changes, consider if searches for these would help:
- Best practices for new features or patterns introduced
- Security implications of the changes
- Known issues with libraries or APIs being used
- Migration guides if updating dependencies
- Performance considerations for the implemented approach""",
)
# Add review instructions
prompt_parts.append("\n## Review Instructions\n")
prompt_parts.append(
"Please review these changes according to the system prompt guidelines. "
"Pay special attention to alignment with the original request, completeness of implementation, "
"potential bugs, security issues, and any edge cases not covered."
)
# Add instruction for requesting files if needed
if not translated_files:
prompt_parts.append(
"\nIf you need additional context files to properly review these changes "
"(such as configuration files, documentation, or related code), "
"you may request them using the standardized JSON response format."
)
# Combine with system prompt and websearch instruction
full_prompt = f"{self.get_system_prompt()}{websearch_instruction}\n\n" + "\n".join(prompt_parts)
return full_prompt
def format_response(self, response: str, request: PrecommitRequest) -> str:
"""Format the response with commit guidance"""
return f"{response}\n\n---\n\n**Commit Status:** If no critical issues found, changes are ready for commit. Otherwise, address issues first and re-run review. Check with user before proceeding with any commit."