Files
my-pal-mcp-server/utils/git_utils.py
Fahad 7ee610938b feat: add review_pending_changes tool and enforce absolute path security
- Add new review_pending_changes tool for comprehensive pre-commit reviews
- Implement filesystem sandboxing with MCP_PROJECT_ROOT
- Enforce absolute paths for all file/directory operations
- Add comprehensive git utilities for repository management
- Update all tools to use centralized path validation
- Add extensive test coverage for new features and security model
- Update documentation with new tool and path requirements
- Remove obsolete demo and guide files

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-09 12:42:40 +04:00

165 lines
4.6 KiB
Python

"""
Git utilities for finding repositories and generating diffs.
"""
import os
import subprocess
from typing import Dict, List, Optional, Tuple
from pathlib import Path
# Directories to ignore when searching for git repositories
IGNORED_DIRS = {
"node_modules",
"__pycache__",
"venv",
"env",
"build",
"dist",
"target",
".tox",
".pytest_cache",
}
def find_git_repositories(start_path: str, max_depth: int = 5) -> List[str]:
"""
Recursively find all git repositories starting from the given path.
Args:
start_path: Directory to start searching from
max_depth: Maximum depth to search (prevents excessive recursion)
Returns:
List of absolute paths to git repositories
"""
repositories = []
start_path = Path(start_path).resolve()
def _find_repos(current_path: Path, current_depth: int):
if current_depth > max_depth:
return
try:
# Check if current directory is a git repo
git_dir = current_path / ".git"
if git_dir.exists() and git_dir.is_dir():
repositories.append(str(current_path))
# Don't search inside .git directory
return
# Search subdirectories
for item in current_path.iterdir():
if item.is_dir() and not item.name.startswith("."):
# Skip common non-code directories
if item.name in IGNORED_DIRS:
continue
_find_repos(item, current_depth + 1)
except PermissionError:
# Skip directories we can't access
pass
_find_repos(start_path, 0)
return sorted(repositories)
def run_git_command(repo_path: str, command: List[str]) -> Tuple[bool, str]:
"""
Run a git command in the specified repository.
Args:
repo_path: Path to the git repository
command: Git command as a list of arguments
Returns:
Tuple of (success, output/error)
"""
try:
result = subprocess.run(
["git"] + command, cwd=repo_path, capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return True, result.stdout
else:
return False, result.stderr
except subprocess.TimeoutExpired:
return False, "Command timed out"
except Exception as e:
return False, str(e)
def get_git_status(repo_path: str) -> Dict[str, any]:
"""
Get the current git status of a repository.
Args:
repo_path: Path to the git repository
Returns:
Dictionary with status information
"""
status = {
"branch": "",
"ahead": 0,
"behind": 0,
"staged_files": [],
"unstaged_files": [],
"untracked_files": [],
}
# Get current branch
success, branch = run_git_command(repo_path, ["branch", "--show-current"])
if success:
status["branch"] = branch.strip()
# Get ahead/behind info
if status["branch"]:
success, ahead_behind = run_git_command(
repo_path,
[
"rev-list",
"--count",
"--left-right",
f'{status["branch"]}@{{upstream}}...HEAD',
],
)
if success:
if ahead_behind.strip():
parts = ahead_behind.strip().split()
if len(parts) == 2:
status["behind"] = int(parts[0])
status["ahead"] = int(parts[1])
# else: Could not get ahead/behind status (branch may not have upstream)
# Get file status
success, status_output = run_git_command(repo_path, ["status", "--porcelain"])
if success:
for line in status_output.strip().split("\n"):
if not line:
continue
status_code = line[:2]
path_info = line[3:]
# Handle staged changes
if status_code[0] == "R":
# Format is "old_path -> new_path" for renamed files
if " -> " in path_info:
_, new_path = path_info.split(" -> ", 1)
status["staged_files"].append(new_path)
else:
status["staged_files"].append(path_info)
elif status_code[0] in ["M", "A", "D", "C"]:
status["staged_files"].append(path_info)
# Handle unstaged changes
if status_code[1] in ["M", "D"]:
status["unstaged_files"].append(path_info)
elif status_code == "??":
status["untracked_files"].append(path_info)
return status