Files
my-pal-mcp-server/tools/review_changes.py
Fahad 53303f86be feat: enhance review_changes with dynamic file requests
- Add instruction for Gemini to request files when needed
- Add comprehensive tests for files parameter functionality
- Test file request instruction presence/absence based on context
- Run all tests, ruff, and black formatting

Now review_changes can both accept context files and allow Gemini
to request additional files during review for better validation.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-09 21:43:45 +04:00

408 lines
18 KiB
Python

"""
Tool for reviewing pending git changes across multiple repositories.
"""
import os
import re
from typing import Any, Dict, List, Literal, Optional
from pydantic import Field
from config import MAX_CONTEXT_TOKENS
from prompts.tool_prompts import REVIEW_CHANGES_PROMPT
from utils.file_utils import read_files
from utils.git_utils import find_git_repositories, get_git_status, run_git_command
from utils.token_utils import estimate_tokens
from .base import BaseTool, ToolRequest
class ReviewChangesRequest(ToolRequest):
"""Request model for review_changes tool"""
path: str = Field(
...,
description="Starting directory to search for git repositories (must be absolute path).",
)
original_request: Optional[str] = Field(
None,
description="The original user request or ticket description for the changes. Provides critical context for the review.",
)
compare_to: Optional[str] = Field(
None,
description="Optional: A git ref (branch, tag, commit hash) to compare against. If not provided, reviews local staged and unstaged changes.",
)
include_staged: bool = Field(
True,
description="Include staged changes in the review. Only applies if 'compare_to' is not set.",
)
include_unstaged: bool = Field(
True,
description="Include uncommitted (unstaged) changes in the review. Only applies if 'compare_to' is not set.",
)
focus_on: Optional[str] = Field(
None,
description="Specific aspects to focus on (e.g., 'logic for user authentication', 'database query efficiency').",
)
review_type: Literal["full", "security", "performance", "quick"] = Field(
"full", description="Type of review to perform on the changes."
)
severity_filter: Literal["critical", "high", "medium", "all"] = Field(
"all",
description="Minimum severity level to report on the changes.",
)
max_depth: int = Field(
5,
description="Maximum depth to search for nested git repositories to prevent excessive recursion.",
)
temperature: Optional[float] = Field(
None,
description="Temperature for the response (0.0 to 1.0). Lower values are more focused and deterministic.",
ge=0.0,
le=1.0,
)
thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field(
None, description="Thinking depth mode for the assistant."
)
files: Optional[List[str]] = Field(
None,
description="Optional files or directories to provide as context (must be absolute paths). These files are not part of the changes but provide helpful context like configs, docs, or related code.",
)
class ReviewChanges(BaseTool):
"""Tool for reviewing git changes across multiple repositories."""
def get_name(self) -> str:
return "review_changes"
def get_description(self) -> str:
return (
"REVIEW PENDING GIT CHANGES BEFORE COMMITTING - ALWAYS use this tool before creating any git commit! "
"Comprehensive pre-commit validation that catches bugs, security issues, incomplete implementations, "
"and ensures changes match the original requirements. Searches all git repositories recursively and "
"provides deep analysis of staged/unstaged changes. Essential for code quality and preventing bugs. "
"Triggers: 'before commit', 'review changes', 'check my changes', 'validate changes', 'pre-commit review', "
"'about to commit', 'ready to commit'. Claude should proactively suggest using this tool whenever "
"the user mentions committing or when changes are complete."
)
def get_input_schema(self) -> Dict[str, Any]:
return self.get_request_model().model_json_schema()
def get_system_prompt(self) -> str:
return REVIEW_CHANGES_PROMPT
def get_request_model(self):
return ReviewChangesRequest
def get_default_temperature(self) -> float:
"""Use analytical temperature for code review."""
from config import TEMPERATURE_ANALYTICAL
return TEMPERATURE_ANALYTICAL
def _sanitize_filename(self, name: str) -> str:
"""Sanitize a string to be a valid filename."""
# Replace path separators and other problematic characters
name = name.replace("/", "_").replace("\\", "_").replace(" ", "_")
# Remove any remaining non-alphanumeric characters except dots, dashes, underscores
name = re.sub(r"[^a-zA-Z0-9._-]", "", name)
# Limit length to avoid filesystem issues
return name[:100]
async def prepare_prompt(self, request: ReviewChangesRequest) -> str:
"""Prepare the prompt with git diff information."""
# Find all git repositories
repositories = find_git_repositories(request.path, request.max_depth)
if not repositories:
return "No git repositories found in the specified path."
# Collect all diffs directly
all_diffs = []
repo_summaries = []
total_tokens = 0
max_tokens = (
MAX_CONTEXT_TOKENS - 50000
) # Reserve tokens for prompt and response
for repo_path in repositories:
repo_name = os.path.basename(repo_path) or "root"
repo_name = self._sanitize_filename(repo_name)
# Get status information
status = get_git_status(repo_path)
changed_files = []
# Process based on mode
if request.compare_to:
# Validate the ref
is_valid_ref, err_msg = run_git_command(
repo_path,
["rev-parse", "--verify", "--quiet", request.compare_to],
)
if not is_valid_ref:
repo_summaries.append(
{
"path": repo_path,
"error": f"Invalid or unknown git ref '{request.compare_to}': {err_msg}",
"changed_files": 0,
}
)
continue
# Get list of changed files
success, files_output = run_git_command(
repo_path,
["diff", "--name-only", f"{request.compare_to}...HEAD"],
)
if success and files_output.strip():
changed_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs
for file_path in changed_files:
success, diff = run_git_command(
repo_path,
[
"diff",
f"{request.compare_to}...HEAD",
"--",
file_path,
],
)
if success and diff.strip():
# Format diff with file header
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (compare to {request.compare_to}) ---\n"
diff_footer = (
f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
)
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
else:
# Handle staged/unstaged changes
staged_files = []
unstaged_files = []
if request.include_staged:
success, files_output = run_git_command(
repo_path, ["diff", "--name-only", "--cached"]
)
if success and files_output.strip():
staged_files = [
f for f in files_output.strip().split("\n") if f
]
# Generate per-file diffs for staged changes
for file_path in staged_files:
success, diff = run_git_command(
repo_path, ["diff", "--cached", "--", file_path]
)
if success and diff.strip():
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (staged) ---\n"
diff_footer = (
f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
)
formatted_diff = diff_header + diff + diff_footer
# Check token limit
from utils import estimate_tokens
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
if request.include_unstaged:
success, files_output = run_git_command(
repo_path, ["diff", "--name-only"]
)
if success and files_output.strip():
unstaged_files = [
f for f in files_output.strip().split("\n") if f
]
# Generate per-file diffs for unstaged changes
for file_path in unstaged_files:
success, diff = run_git_command(
repo_path, ["diff", "--", file_path]
)
if success and diff.strip():
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (unstaged) ---\n"
diff_footer = (
f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
)
formatted_diff = diff_header + diff + diff_footer
# Check token limit
from utils import estimate_tokens
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
# Combine unique files
changed_files = list(set(staged_files + unstaged_files))
# Add repository summary
if changed_files:
repo_summaries.append(
{
"path": repo_path,
"branch": status["branch"],
"ahead": status["ahead"],
"behind": status["behind"],
"changed_files": len(changed_files),
"files": changed_files[:20], # First 20 for summary
}
)
if not all_diffs:
return "No pending changes found in any of the git repositories."
# Process context files if provided
context_files_content = []
context_files_summary = []
context_tokens = 0
if request.files:
remaining_tokens = max_tokens - total_tokens
# Read context files with remaining token budget
file_content, file_summary = read_files(request.files)
# Check if context files fit in remaining budget
if file_content:
context_tokens = estimate_tokens(file_content)
if context_tokens <= remaining_tokens:
# Use the full content from read_files
context_files_content = [file_content]
# Parse summary to create individual file summaries
summary_lines = file_summary.split("\n")
for line in summary_lines:
if line.strip() and not line.startswith("Total files:"):
context_files_summary.append(f"✅ Included: {line.strip()}")
else:
context_files_summary.append(
f"⚠️ Context files too large (~{context_tokens:,} tokens, budget: ~{remaining_tokens:,} tokens)"
)
# Include as much as fits
if remaining_tokens > 1000: # Only if we have reasonable space
truncated_content = file_content[
: int(
len(file_content)
* (remaining_tokens / context_tokens)
* 0.9
)
]
context_files_content.append(
f"\n--- BEGIN CONTEXT FILES (TRUNCATED) ---\n{truncated_content}\n--- END CONTEXT FILES ---\n"
)
context_tokens = remaining_tokens
else:
context_tokens = 0
total_tokens += context_tokens
# Build the final prompt
prompt_parts = []
# Add original request context if provided
if request.original_request:
prompt_parts.append(
f"## Original Request/Ticket\n\n{request.original_request}\n"
)
# Add review parameters
prompt_parts.append("## Review Parameters\n")
prompt_parts.append(f"- Review Type: {request.review_type}")
prompt_parts.append(f"- Severity Filter: {request.severity_filter}")
if request.focus_on:
prompt_parts.append(f"- Focus Areas: {request.focus_on}")
if request.compare_to:
prompt_parts.append(f"- Comparing Against: {request.compare_to}")
else:
review_scope = []
if request.include_staged:
review_scope.append("staged")
if request.include_unstaged:
review_scope.append("unstaged")
prompt_parts.append(f"- Reviewing: {' and '.join(review_scope)} changes")
# Add repository summary
prompt_parts.append("\n## Repository Changes Summary\n")
prompt_parts.append(f"Found {len(repo_summaries)} repositories with changes:\n")
for idx, summary in enumerate(repo_summaries, 1):
prompt_parts.append(f"\n### Repository {idx}: {summary['path']}")
if "error" in summary:
prompt_parts.append(f"⚠️ Error: {summary['error']}")
else:
prompt_parts.append(f"- Branch: {summary['branch']}")
if summary["ahead"] or summary["behind"]:
prompt_parts.append(
f"- Ahead: {summary['ahead']}, Behind: {summary['behind']}"
)
prompt_parts.append(f"- Changed Files: {summary['changed_files']}")
if summary["files"]:
prompt_parts.append("\nChanged files:")
for file in summary["files"]:
prompt_parts.append(f" - {file}")
if summary["changed_files"] > len(summary["files"]):
prompt_parts.append(
f" ... and {summary['changed_files'] - len(summary['files'])} more files"
)
# Add context files summary if provided
if context_files_summary:
prompt_parts.append("\n## Context Files Summary\n")
for summary_item in context_files_summary:
prompt_parts.append(f"- {summary_item}")
# Add token usage summary
if total_tokens > 0:
prompt_parts.append(f"\nTotal context tokens used: ~{total_tokens:,}")
# Add the diff contents
prompt_parts.append("\n## Git Diffs\n")
if all_diffs:
prompt_parts.extend(all_diffs)
else:
prompt_parts.append("--- NO DIFFS FOUND ---")
# Add context files content if provided
if context_files_content:
prompt_parts.append("\n## Additional Context Files")
prompt_parts.append(
"The following files are provided for additional context. They have NOT been modified.\n"
)
prompt_parts.extend(context_files_content)
# Add review instructions
prompt_parts.append("\n## Review Instructions\n")
prompt_parts.append(
"Please review these changes according to the system prompt guidelines. "
"Pay special attention to alignment with the original request, completeness of implementation, "
"potential bugs, security issues, and any edge cases not covered."
)
# Add instruction for requesting files if needed
if not request.files:
prompt_parts.append(
"\nIf you need additional context files to properly review these changes "
"(such as configuration files, documentation, or related code), "
"you may request them using the standardized JSON response format."
)
return "\n".join(prompt_parts)