- Add new review_pending_changes tool for comprehensive pre-commit reviews - Implement filesystem sandboxing with MCP_PROJECT_ROOT - Enforce absolute paths for all file/directory operations - Add comprehensive git utilities for repository management - Update all tools to use centralized path validation - Add extensive test coverage for new features and security model - Update documentation with new tool and path requirements - Remove obsolete demo and guide files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
325 lines
9.6 KiB
Python
325 lines
9.6 KiB
Python
"""
|
|
File reading utilities with directory support and token management
|
|
"""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple, Set
|
|
import sys
|
|
|
|
from .token_utils import estimate_tokens, MAX_CONTEXT_TOKENS
|
|
|
|
# Get project root from environment or use current directory
|
|
# This defines the sandbox directory where file access is allowed
|
|
PROJECT_ROOT = Path(os.environ.get("MCP_PROJECT_ROOT", os.getcwd())).resolve()
|
|
|
|
# Security: Prevent running with overly permissive root
|
|
if str(PROJECT_ROOT) == "/":
|
|
raise RuntimeError(
|
|
"Security Error: MCP_PROJECT_ROOT cannot be set to '/'. "
|
|
"This would give access to the entire filesystem."
|
|
)
|
|
|
|
|
|
# Common code file extensions
|
|
CODE_EXTENSIONS = {
|
|
".py",
|
|
".js",
|
|
".ts",
|
|
".jsx",
|
|
".tsx",
|
|
".java",
|
|
".cpp",
|
|
".c",
|
|
".h",
|
|
".hpp",
|
|
".cs",
|
|
".go",
|
|
".rs",
|
|
".rb",
|
|
".php",
|
|
".swift",
|
|
".kt",
|
|
".scala",
|
|
".r",
|
|
".m",
|
|
".mm",
|
|
".sql",
|
|
".sh",
|
|
".bash",
|
|
".zsh",
|
|
".fish",
|
|
".ps1",
|
|
".bat",
|
|
".cmd",
|
|
".yml",
|
|
".yaml",
|
|
".json",
|
|
".xml",
|
|
".toml",
|
|
".ini",
|
|
".cfg",
|
|
".conf",
|
|
".txt",
|
|
".md",
|
|
".rst",
|
|
".tex",
|
|
".html",
|
|
".css",
|
|
".scss",
|
|
".sass",
|
|
".less",
|
|
}
|
|
|
|
|
|
def resolve_and_validate_path(path_str: str) -> Path:
|
|
"""
|
|
Validates that a path is absolute and resolves it.
|
|
|
|
Args:
|
|
path_str: Path string (must be absolute)
|
|
|
|
Returns:
|
|
Resolved Path object
|
|
|
|
Raises:
|
|
ValueError: If path is not absolute
|
|
PermissionError: If path is outside allowed directory
|
|
"""
|
|
# Create a Path object from the user-provided path
|
|
user_path = Path(path_str)
|
|
|
|
# Require absolute paths
|
|
if not user_path.is_absolute():
|
|
raise ValueError(
|
|
f"Relative paths are not supported. Please provide an absolute path.\n"
|
|
f"Received: {path_str}"
|
|
)
|
|
|
|
# Resolve the absolute path
|
|
resolved_path = user_path.resolve()
|
|
|
|
# Security check: ensure the resolved path is within PROJECT_ROOT
|
|
try:
|
|
resolved_path.relative_to(PROJECT_ROOT)
|
|
except ValueError:
|
|
raise PermissionError(
|
|
f"Path outside project root: {path_str}\n"
|
|
f"Project root: {PROJECT_ROOT}\n"
|
|
f"Resolved path: {resolved_path}"
|
|
)
|
|
|
|
return resolved_path
|
|
|
|
|
|
def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> List[str]:
|
|
"""
|
|
Expand paths to individual files, handling both files and directories.
|
|
|
|
Args:
|
|
paths: List of file or directory paths
|
|
extensions: Optional set of file extensions to include
|
|
|
|
Returns:
|
|
List of individual file paths
|
|
"""
|
|
if extensions is None:
|
|
extensions = CODE_EXTENSIONS
|
|
|
|
expanded_files = []
|
|
seen = set()
|
|
|
|
for path in paths:
|
|
try:
|
|
path_obj = resolve_and_validate_path(path)
|
|
except (ValueError, PermissionError):
|
|
# Skip invalid paths
|
|
continue
|
|
|
|
if not path_obj.exists():
|
|
continue
|
|
|
|
if path_obj.is_file():
|
|
# Add file directly
|
|
if str(path_obj) not in seen:
|
|
expanded_files.append(str(path_obj))
|
|
seen.add(str(path_obj))
|
|
|
|
elif path_obj.is_dir():
|
|
# Walk directory recursively
|
|
for root, dirs, files in os.walk(path_obj):
|
|
# Skip hidden directories and __pycache__
|
|
dirs[:] = [
|
|
d for d in dirs if not d.startswith(".") and d != "__pycache__"
|
|
]
|
|
|
|
for file in files:
|
|
# Skip hidden files
|
|
if file.startswith("."):
|
|
continue
|
|
|
|
file_path = Path(root) / file
|
|
|
|
# Check extension
|
|
if not extensions or file_path.suffix.lower() in extensions:
|
|
full_path = str(file_path)
|
|
if full_path not in seen:
|
|
expanded_files.append(full_path)
|
|
seen.add(full_path)
|
|
|
|
# Sort for consistent ordering
|
|
expanded_files.sort()
|
|
return expanded_files
|
|
|
|
|
|
def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]:
|
|
"""
|
|
Read a single file and format it for Gemini.
|
|
|
|
Args:
|
|
file_path: Path to file (must be absolute)
|
|
max_size: Maximum file size to read
|
|
|
|
Returns:
|
|
(formatted_content, estimated_tokens)
|
|
"""
|
|
try:
|
|
path = resolve_and_validate_path(file_path)
|
|
except (ValueError, PermissionError) as e:
|
|
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
|
|
return content, estimate_tokens(content)
|
|
|
|
try:
|
|
# Check if path exists and is a file
|
|
if not path.exists():
|
|
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
|
|
return content, estimate_tokens(content)
|
|
|
|
if not path.is_file():
|
|
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
|
|
return content, estimate_tokens(content)
|
|
|
|
# Check file size
|
|
file_size = path.stat().st_size
|
|
if file_size > max_size:
|
|
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
|
|
return content, estimate_tokens(content)
|
|
|
|
# Read the file
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
file_content = f.read()
|
|
|
|
# Format with clear delimiters for Gemini
|
|
formatted = f"\n--- BEGIN FILE: {file_path} ---\n{file_content}\n--- END FILE: {file_path} ---\n"
|
|
return formatted, estimate_tokens(formatted)
|
|
|
|
except Exception as e:
|
|
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
|
|
return content, estimate_tokens(content)
|
|
|
|
|
|
def read_files(
|
|
file_paths: List[str],
|
|
code: Optional[str] = None,
|
|
max_tokens: Optional[int] = None,
|
|
reserve_tokens: int = 50_000,
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Read multiple files and optional direct code with smart token management.
|
|
|
|
Args:
|
|
file_paths: List of file or directory paths
|
|
code: Optional direct code to include
|
|
max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS)
|
|
reserve_tokens: Tokens to reserve for prompt and response
|
|
|
|
Returns:
|
|
(full_content, brief_summary)
|
|
"""
|
|
if max_tokens is None:
|
|
max_tokens = MAX_CONTEXT_TOKENS
|
|
|
|
content_parts = []
|
|
summary_parts = []
|
|
total_tokens = 0
|
|
available_tokens = max_tokens - reserve_tokens
|
|
|
|
files_read = []
|
|
files_skipped = []
|
|
dirs_processed = []
|
|
|
|
# First, handle direct code if provided
|
|
if code:
|
|
formatted_code = (
|
|
f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
|
|
)
|
|
code_tokens = estimate_tokens(formatted_code)
|
|
|
|
if code_tokens <= available_tokens:
|
|
content_parts.append(formatted_code)
|
|
total_tokens += code_tokens
|
|
available_tokens -= code_tokens
|
|
code_preview = code[:50] + "..." if len(code) > 50 else code
|
|
summary_parts.append(f"Direct code: {code_preview}")
|
|
else:
|
|
summary_parts.append("Direct code skipped (too large)")
|
|
|
|
# Expand all paths to get individual files
|
|
if file_paths:
|
|
# Track which paths are directories
|
|
for path in file_paths:
|
|
if Path(path).is_dir():
|
|
dirs_processed.append(path)
|
|
|
|
# Expand to get all files
|
|
all_files = expand_paths(file_paths)
|
|
|
|
if not all_files and file_paths:
|
|
# No files found but paths were provided
|
|
content_parts.append(
|
|
f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n"
|
|
)
|
|
else:
|
|
# Read files up to token limit
|
|
for file_path in all_files:
|
|
if total_tokens >= available_tokens:
|
|
files_skipped.append(file_path)
|
|
continue
|
|
|
|
file_content, file_tokens = read_file_content(file_path)
|
|
|
|
# Check if adding this file would exceed limit
|
|
if total_tokens + file_tokens <= available_tokens:
|
|
content_parts.append(file_content)
|
|
total_tokens += file_tokens
|
|
files_read.append(file_path)
|
|
else:
|
|
files_skipped.append(file_path)
|
|
|
|
# Build summary
|
|
if dirs_processed:
|
|
summary_parts.append(f"Processed {len(dirs_processed)} dir(s)")
|
|
if files_read:
|
|
summary_parts.append(f"Read {len(files_read)} file(s)")
|
|
if files_skipped:
|
|
summary_parts.append(f"Skipped {len(files_skipped)} file(s) (token limit)")
|
|
if total_tokens > 0:
|
|
summary_parts.append(f"~{total_tokens:,} tokens used")
|
|
|
|
# Add skipped files note if any were skipped
|
|
if files_skipped:
|
|
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
|
|
skip_note += f"Total skipped: {len(files_skipped)}\n"
|
|
# Show first 10 skipped files
|
|
for i, file_path in enumerate(files_skipped[:10]):
|
|
skip_note += f" - {file_path}\n"
|
|
if len(files_skipped) > 10:
|
|
skip_note += f" ... and {len(files_skipped) - 10} more\n"
|
|
skip_note += "--- END SKIPPED FILES ---\n"
|
|
content_parts.append(skip_note)
|
|
|
|
full_content = "\n\n".join(content_parts) if content_parts else ""
|
|
summary = " | ".join(summary_parts) if summary_parts else "No input provided"
|
|
|
|
return full_content, summary
|