Files
my-pal-mcp-server/tools/review_changes.py
Fahad 27add4d05d feat: Major refactoring and improvements v2.11.0
## 🚀 Major Improvements

### Docker Environment Simplification
- **BREAKING**: Simplified Docker configuration by auto-detecting sandbox from WORKSPACE_ROOT
- Removed redundant MCP_PROJECT_ROOT requirement for Docker setups
- Updated all Docker config examples and setup scripts
- Added security validation for dangerous WORKSPACE_ROOT paths

### Security Enhancements
- **CRITICAL**: Fixed insecure PROJECT_ROOT fallback to use current directory instead of home
- Enhanced path validation with proper Docker environment detection
- Removed information disclosure in error messages
- Strengthened symlink and path traversal protection

### File Handling Optimization
- **PERFORMANCE**: Optimized read_files() to return content only (removed summary)
- Unified file reading across all tools using standardized file_utils routines
- Fixed review_changes tool to use consistent file loading patterns
- Improved token management and reduced unnecessary processing

### Tool Improvements
- **UX**: Enhanced ReviewCodeTool to require user context for targeted reviews
- Removed deprecated _get_secure_container_path function and _sanitize_filename
- Standardized file access patterns across analyze, review_changes, and other tools
- Added contextual prompting to align reviews with user expectations

### Code Quality & Testing
- Updated all tests for new function signatures and requirements
- Added comprehensive Docker path integration tests
- Achieved 100% test coverage (95 tests passing)
- Full compliance with ruff, black, and isort linting standards

### Configuration & Deployment
- Added pyproject.toml for modern Python packaging
- Streamlined Docker setup removing redundant environment variables
- Updated setup scripts across all platforms (Windows, macOS, Linux)
- Improved error handling and validation throughout

## 🔧 Technical Changes

- **Removed**: `_get_secure_container_path()`, `_sanitize_filename()`, unused SANDBOX_MODE
- **Enhanced**: Path translation, security validation, token management
- **Standardized**: File reading patterns, error handling, Docker detection
- **Updated**: All tool prompts for better context alignment

## 🛡️ Security Notes

This release significantly improves the security posture by:
- Eliminating broad filesystem access defaults
- Adding validation for Docker environment variables
- Removing information disclosure in error paths
- Strengthening path traversal and symlink protections

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-10 09:50:05 +04:00

392 lines
17 KiB
Python

"""
Tool for reviewing pending git changes across multiple repositories.
"""
import os
from typing import Any, Literal, Optional
from mcp.types import TextContent
from pydantic import Field
from config import MAX_CONTEXT_TOKENS
from prompts.tool_prompts import REVIEW_CHANGES_PROMPT
from utils.file_utils import read_files, translate_path_for_environment
from utils.git_utils import find_git_repositories, get_git_status, run_git_command
from utils.token_utils import estimate_tokens
from .base import BaseTool, ToolRequest
from .models import ToolOutput
class ReviewChangesRequest(ToolRequest):
"""Request model for review_changes tool"""
path: str = Field(
...,
description="Starting directory to search for git repositories (must be absolute path).",
)
original_request: Optional[str] = Field(
None,
description="The original user request or ticket description for the changes. Provides critical context for the review.",
)
compare_to: Optional[str] = Field(
None,
description="Optional: A git ref (branch, tag, commit hash) to compare against. If not provided, reviews local staged and unstaged changes.",
)
include_staged: bool = Field(
True,
description="Include staged changes in the review. Only applies if 'compare_to' is not set.",
)
include_unstaged: bool = Field(
True,
description="Include uncommitted (unstaged) changes in the review. Only applies if 'compare_to' is not set.",
)
focus_on: Optional[str] = Field(
None,
description="Specific aspects to focus on (e.g., 'logic for user authentication', 'database query efficiency').",
)
review_type: Literal["full", "security", "performance", "quick"] = Field(
"full", description="Type of review to perform on the changes."
)
severity_filter: Literal["critical", "high", "medium", "all"] = Field(
"all",
description="Minimum severity level to report on the changes.",
)
max_depth: int = Field(
5,
description="Maximum depth to search for nested git repositories to prevent excessive recursion.",
)
temperature: Optional[float] = Field(
None,
description="Temperature for the response (0.0 to 1.0). Lower values are more focused and deterministic.",
ge=0.0,
le=1.0,
)
thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field(
None, description="Thinking depth mode for the assistant."
)
files: Optional[list[str]] = Field(
None,
description="Optional files or directories to provide as context (must be absolute paths). These files are not part of the changes but provide helpful context like configs, docs, or related code.",
)
class ReviewChanges(BaseTool):
"""Tool for reviewing git changes across multiple repositories."""
def get_name(self) -> str:
return "review_changes"
def get_description(self) -> str:
return (
"REVIEW PENDING GIT CHANGES BEFORE COMMITTING - ALWAYS use this tool before creating any git commit! "
"Comprehensive pre-commit validation that catches bugs, security issues, incomplete implementations, "
"and ensures changes match the original requirements. Searches all git repositories recursively and "
"provides deep analysis of staged/unstaged changes. Essential for code quality and preventing bugs. "
"Triggers: 'before commit', 'review changes', 'check my changes', 'validate changes', 'pre-commit review', "
"'about to commit', 'ready to commit'. Claude should proactively suggest using this tool whenever "
"the user mentions committing or when changes are complete. "
"Choose thinking_mode based on changeset size: 'low' for small focused changes, "
"'medium' for standard commits (default), 'high' for large feature branches or complex refactoring, "
"'max' for critical releases or when reviewing extensive changes across multiple systems."
)
def get_input_schema(self) -> dict[str, Any]:
return self.get_request_model().model_json_schema()
def get_system_prompt(self) -> str:
return REVIEW_CHANGES_PROMPT
def get_request_model(self):
return ReviewChangesRequest
def get_default_temperature(self) -> float:
"""Use analytical temperature for code review."""
from config import TEMPERATURE_ANALYTICAL
return TEMPERATURE_ANALYTICAL
async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
"""Override execute to check original_request size before processing"""
# First validate request
request_model = self.get_request_model()
request = request_model(**arguments)
# Check original_request size if provided
if request.original_request:
size_check = self.check_prompt_size(request.original_request)
if size_check:
return [TextContent(type="text", text=ToolOutput(**size_check).model_dump_json())]
# Continue with normal execution
return await super().execute(arguments)
async def prepare_prompt(self, request: ReviewChangesRequest) -> str:
"""Prepare the prompt with git diff information."""
# Check for prompt.txt in files
prompt_content, updated_files = self.handle_prompt_file(request.files)
# If prompt.txt was found, use it as original_request
if prompt_content:
request.original_request = prompt_content
# Update request files list
if updated_files is not None:
request.files = updated_files
# Translate the path if running in Docker
translated_path = translate_path_for_environment(request.path)
# Check if the path translation resulted in an error path
if translated_path.startswith("/inaccessible/"):
raise ValueError(
f"The path '{request.path}' is not accessible from within the Docker container. "
f"The Docker container can only access files within the mounted workspace. "
f"Please ensure the path is within the mounted directory or adjust your Docker volume mounts."
)
# Find all git repositories
repositories = find_git_repositories(translated_path, request.max_depth)
if not repositories:
return "No git repositories found in the specified path."
# Collect all diffs directly
all_diffs = []
repo_summaries = []
total_tokens = 0
max_tokens = MAX_CONTEXT_TOKENS - 50000 # Reserve tokens for prompt and response
for repo_path in repositories:
repo_name = os.path.basename(repo_path) or "root"
# Get status information
status = get_git_status(repo_path)
changed_files = []
# Process based on mode
if request.compare_to:
# Validate the ref
is_valid_ref, err_msg = run_git_command(
repo_path,
["rev-parse", "--verify", "--quiet", request.compare_to],
)
if not is_valid_ref:
repo_summaries.append(
{
"path": repo_path,
"error": f"Invalid or unknown git ref '{request.compare_to}': {err_msg}",
"changed_files": 0,
}
)
continue
# Get list of changed files
success, files_output = run_git_command(
repo_path,
["diff", "--name-only", f"{request.compare_to}...HEAD"],
)
if success and files_output.strip():
changed_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs
for file_path in changed_files:
success, diff = run_git_command(
repo_path,
[
"diff",
f"{request.compare_to}...HEAD",
"--",
file_path,
],
)
if success and diff.strip():
# Format diff with file header
diff_header = (
f"\n--- BEGIN DIFF: {repo_name} / {file_path} (compare to {request.compare_to}) ---\n"
)
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
else:
# Handle staged/unstaged changes
staged_files = []
unstaged_files = []
if request.include_staged:
success, files_output = run_git_command(repo_path, ["diff", "--name-only", "--cached"])
if success and files_output.strip():
staged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for staged changes
for file_path in staged_files:
success, diff = run_git_command(repo_path, ["diff", "--cached", "--", file_path])
if success and diff.strip():
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (staged) ---\n"
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
if request.include_unstaged:
success, files_output = run_git_command(repo_path, ["diff", "--name-only"])
if success and files_output.strip():
unstaged_files = [f for f in files_output.strip().split("\n") if f]
# Generate per-file diffs for unstaged changes
for file_path in unstaged_files:
success, diff = run_git_command(repo_path, ["diff", "--", file_path])
if success and diff.strip():
diff_header = f"\n--- BEGIN DIFF: {repo_name} / {file_path} (unstaged) ---\n"
diff_footer = f"\n--- END DIFF: {repo_name} / {file_path} ---\n"
formatted_diff = diff_header + diff + diff_footer
# Check token limit
diff_tokens = estimate_tokens(formatted_diff)
if total_tokens + diff_tokens <= max_tokens:
all_diffs.append(formatted_diff)
total_tokens += diff_tokens
# Combine unique files
changed_files = list(set(staged_files + unstaged_files))
# Add repository summary
if changed_files:
repo_summaries.append(
{
"path": repo_path,
"branch": status["branch"],
"ahead": status["ahead"],
"behind": status["behind"],
"changed_files": len(changed_files),
"files": changed_files[:20], # First 20 for summary
}
)
if not all_diffs:
return "No pending changes found in any of the git repositories."
# Process context files if provided using standardized file reading
context_files_content = []
context_files_summary = []
context_tokens = 0
if request.files:
remaining_tokens = max_tokens - total_tokens
# Use standardized file reading with token budget
file_content = read_files(
request.files, max_tokens=remaining_tokens, reserve_tokens=1000 # Small reserve for formatting
)
if file_content:
context_tokens = estimate_tokens(file_content)
context_files_content = [file_content]
context_files_summary.append(f"✅ Included: {len(request.files)} context files")
else:
context_files_summary.append("⚠️ No context files could be read or files too large")
total_tokens += context_tokens
# Build the final prompt
prompt_parts = []
# Add original request context if provided
if request.original_request:
prompt_parts.append(f"## Original Request/Ticket\n\n{request.original_request}\n")
# Add review parameters
prompt_parts.append("## Review Parameters\n")
prompt_parts.append(f"- Review Type: {request.review_type}")
prompt_parts.append(f"- Severity Filter: {request.severity_filter}")
if request.focus_on:
prompt_parts.append(f"- Focus Areas: {request.focus_on}")
if request.compare_to:
prompt_parts.append(f"- Comparing Against: {request.compare_to}")
else:
review_scope = []
if request.include_staged:
review_scope.append("staged")
if request.include_unstaged:
review_scope.append("unstaged")
prompt_parts.append(f"- Reviewing: {' and '.join(review_scope)} changes")
# Add repository summary
prompt_parts.append("\n## Repository Changes Summary\n")
prompt_parts.append(f"Found {len(repo_summaries)} repositories with changes:\n")
for idx, summary in enumerate(repo_summaries, 1):
prompt_parts.append(f"\n### Repository {idx}: {summary['path']}")
if "error" in summary:
prompt_parts.append(f"⚠️ Error: {summary['error']}")
else:
prompt_parts.append(f"- Branch: {summary['branch']}")
if summary["ahead"] or summary["behind"]:
prompt_parts.append(f"- Ahead: {summary['ahead']}, Behind: {summary['behind']}")
prompt_parts.append(f"- Changed Files: {summary['changed_files']}")
if summary["files"]:
prompt_parts.append("\nChanged files:")
for file in summary["files"]:
prompt_parts.append(f" - {file}")
if summary["changed_files"] > len(summary["files"]):
prompt_parts.append(f" ... and {summary['changed_files'] - len(summary['files'])} more files")
# Add context files summary if provided
if context_files_summary:
prompt_parts.append("\n## Context Files Summary\n")
for summary_item in context_files_summary:
prompt_parts.append(f"- {summary_item}")
# Add token usage summary
if total_tokens > 0:
prompt_parts.append(f"\nTotal context tokens used: ~{total_tokens:,}")
# Add the diff contents
prompt_parts.append("\n## Git Diffs\n")
if all_diffs:
prompt_parts.extend(all_diffs)
else:
prompt_parts.append("--- NO DIFFS FOUND ---")
# Add context files content if provided
if context_files_content:
prompt_parts.append("\n## Additional Context Files")
prompt_parts.append(
"The following files are provided for additional context. They have NOT been modified.\n"
)
prompt_parts.extend(context_files_content)
# Add review instructions
prompt_parts.append("\n## Review Instructions\n")
prompt_parts.append(
"Please review these changes according to the system prompt guidelines. "
"Pay special attention to alignment with the original request, completeness of implementation, "
"potential bugs, security issues, and any edge cases not covered."
)
# Add instruction for requesting files if needed
if not request.files:
prompt_parts.append(
"\nIf you need additional context files to properly review these changes "
"(such as configuration files, documentation, or related code), "
"you may request them using the standardized JSON response format."
)
return "\n".join(prompt_parts)
def format_response(self, response: str, request: ReviewChangesRequest) -> str:
"""Format the response with commit guidance"""
return f"{response}\n\n---\n\n**Commit Status:** If no critical issues found, changes are ready for commit. Otherwise, address issues first and re-run review. Check with user before proceeding with any commit."