refactor: cleanup and comprehensive documentation

Major changes:
- Add comprehensive documentation to all modules with detailed docstrings
- Remove unused THINKING_MODEL config (use single GEMINI_MODEL with thinking_mode param)
- Remove list_models functionality (simplified to single model configuration)
- Rename DEFAULT_MODEL to GEMINI_MODEL for clarity
- Remove unused python-dotenv dependency
- Fix missing pydantic in setup.py dependencies

Documentation improvements:
- Document security measures in file_utils.py (path validation, sandboxing)
- Add detailed comments to critical logic sections
- Document tool creation process in BaseTool
- Explain configuration values and their impact
- Add comprehensive function-level documentation

Code quality:
- Apply black formatting to all files
- Fix all ruff linting issues
- Update tests to match refactored code
- All 63 tests passing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Fahad
2025-06-09 19:04:24 +04:00
parent fd6e2f9b64
commit 783ba73181
12 changed files with 639 additions and 260 deletions

View File

@@ -1,5 +1,21 @@
"""
File reading utilities with directory support and token management
This module provides secure file access functionality for the MCP server.
It implements critical security measures to prevent unauthorized file access
and manages token limits to ensure efficient API usage.
Key Features:
- Path validation and sandboxing to prevent directory traversal attacks
- Support for both individual files and recursive directory reading
- Token counting and management to stay within API limits
- Automatic file type detection and filtering
- Comprehensive error handling with informative messages
Security Model:
- All file access is restricted to PROJECT_ROOT and its subdirectories
- Absolute paths are required to prevent ambiguity
- Symbolic links are resolved to ensure they stay within bounds
"""
import os
@@ -10,9 +26,12 @@ from .token_utils import estimate_tokens, MAX_CONTEXT_TOKENS
# Get project root from environment or use current directory
# This defines the sandbox directory where file access is allowed
# Security: All file operations are restricted to this directory and its children
PROJECT_ROOT = Path(os.environ.get("MCP_PROJECT_ROOT", os.getcwd())).resolve()
# Security: Prevent running with overly permissive root
# Critical Security Check: Prevent running with overly permissive root
# Setting PROJECT_ROOT to "/" would allow access to the entire filesystem,
# which is a severe security vulnerability
if str(PROJECT_ROOT) == "/":
raise RuntimeError(
"Security Error: MCP_PROJECT_ROOT cannot be set to '/'. "
@@ -20,7 +39,8 @@ if str(PROJECT_ROOT) == "/":
)
# Common code file extensions
# Common code file extensions that are automatically included when processing directories
# This set can be extended to support additional file types
CODE_EXTENSIONS = {
".py",
".js",
@@ -75,11 +95,16 @@ def resolve_and_validate_path(path_str: str) -> Path:
"""
Validates that a path is absolute and resolves it.
This is the primary security function that ensures all file access
is properly sandboxed. It enforces two critical security policies:
1. All paths must be absolute (no ambiguity)
2. All paths must resolve to within PROJECT_ROOT (sandboxing)
Args:
path_str: Path string (must be absolute)
Returns:
Resolved Path object
Resolved Path object that is guaranteed to be within PROJECT_ROOT
Raises:
ValueError: If path is not absolute
@@ -88,17 +113,19 @@ def resolve_and_validate_path(path_str: str) -> Path:
# Create a Path object from the user-provided path
user_path = Path(path_str)
# Require absolute paths
# Security Policy 1: Require absolute paths to prevent ambiguity
# Relative paths could be interpreted differently depending on working directory
if not user_path.is_absolute():
raise ValueError(
f"Relative paths are not supported. Please provide an absolute path.\n"
f"Received: {path_str}"
)
# Resolve the absolute path
# Resolve the absolute path (follows symlinks, removes .. and .)
resolved_path = user_path.resolve()
# Security check: ensure the resolved path is within PROJECT_ROOT
# Security Policy 2: Ensure the resolved path is within PROJECT_ROOT
# This prevents directory traversal attacks (e.g., /project/../../../etc/passwd)
try:
resolved_path.relative_to(PROJECT_ROOT)
except ValueError:
@@ -115,12 +142,16 @@ def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> Lis
"""
Expand paths to individual files, handling both files and directories.
This function recursively walks directories to find all matching files.
It automatically filters out hidden files and common non-code directories
like __pycache__ to avoid including generated or system files.
Args:
paths: List of file or directory paths
extensions: Optional set of file extensions to include
paths: List of file or directory paths (must be absolute)
extensions: Optional set of file extensions to include (defaults to CODE_EXTENSIONS)
Returns:
List of individual file paths
List of individual file paths, sorted for consistent ordering
"""
if extensions is None:
extensions = CODE_EXTENSIONS
@@ -130,9 +161,10 @@ def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> Lis
for path in paths:
try:
# Validate each path for security before processing
path_obj = resolve_and_validate_path(path)
except (ValueError, PermissionError):
# Skip invalid paths
# Skip invalid paths silently to allow partial success
continue
if not path_obj.exists():
@@ -145,51 +177,61 @@ def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> Lis
seen.add(str(path_obj))
elif path_obj.is_dir():
# Walk directory recursively
# Walk directory recursively to find all files
for root, dirs, files in os.walk(path_obj):
# Skip hidden directories and __pycache__
# Filter directories in-place to skip hidden and cache directories
# This prevents descending into .git, .venv, __pycache__, etc.
dirs[:] = [
d for d in dirs if not d.startswith(".") and d != "__pycache__"
]
for file in files:
# Skip hidden files
# Skip hidden files (e.g., .DS_Store, .gitignore)
if file.startswith("."):
continue
file_path = Path(root) / file
# Check extension
# Filter by extension if specified
if not extensions or file_path.suffix.lower() in extensions:
full_path = str(file_path)
# Use set to prevent duplicates
if full_path not in seen:
expanded_files.append(full_path)
seen.add(full_path)
# Sort for consistent ordering
# Sort for consistent ordering across different runs
# This makes output predictable and easier to debug
expanded_files.sort()
return expanded_files
def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]:
"""
Read a single file and format it for Gemini.
Read a single file and format it for inclusion in AI prompts.
This function handles various error conditions gracefully and always
returns formatted content, even for errors. This ensures the AI model
gets context about what files were attempted but couldn't be read.
Args:
file_path: Path to file (must be absolute)
max_size: Maximum file size to read
max_size: Maximum file size to read (default 1MB to prevent memory issues)
Returns:
(formatted_content, estimated_tokens)
Tuple of (formatted_content, estimated_tokens)
Content is wrapped with clear delimiters for AI parsing
"""
try:
# Validate path security before any file operations
path = resolve_and_validate_path(file_path)
except (ValueError, PermissionError) as e:
# Return error in a format that provides context to the AI
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
return content, estimate_tokens(content)
try:
# Check if path exists and is a file
# Validate file existence and type
if not path.exists():
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
@@ -198,17 +240,19 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, i
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size
# Check file size to prevent memory exhaustion
file_size = path.stat().st_size
if file_size > max_size:
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Read the file
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
with open(path, "r", encoding="utf-8", errors="replace") as f:
file_content = f.read()
# Format with clear delimiters for Gemini
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
formatted = f"\n--- BEGIN FILE: {file_path} ---\n{file_content}\n--- END FILE: {file_path} ---\n"
return formatted, estimate_tokens(formatted)
@@ -226,14 +270,21 @@ def read_files(
"""
Read multiple files and optional direct code with smart token management.
This function implements intelligent token budgeting to maximize the amount
of relevant content that can be included in an AI prompt while staying
within token limits. It prioritizes direct code and reads files until
the token budget is exhausted.
Args:
file_paths: List of file or directory paths
code: Optional direct code to include
file_paths: List of file or directory paths (absolute paths required)
code: Optional direct code to include (prioritized over files)
max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS)
reserve_tokens: Tokens to reserve for prompt and response
reserve_tokens: Tokens to reserve for prompt and response (default 50K)
Returns:
(full_content, brief_summary)
Tuple of (full_content, brief_summary)
- full_content: All file contents formatted for AI consumption
- brief_summary: Human-readable summary of what was processed
"""
if max_tokens is None:
max_tokens = MAX_CONTEXT_TOKENS
@@ -247,7 +298,8 @@ def read_files(
files_skipped = []
dirs_processed = []
# First, handle direct code if provided
# Priority 1: Handle direct code if provided
# Direct code is prioritized because it's explicitly provided by the user
if code:
formatted_code = (
f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
@@ -258,19 +310,23 @@ def read_files(
content_parts.append(formatted_code)
total_tokens += code_tokens
available_tokens -= code_tokens
# Create a preview for the summary
code_preview = code[:50] + "..." if len(code) > 50 else code
summary_parts.append(f"Direct code: {code_preview}")
else:
summary_parts.append("Direct code skipped (too large)")
# Expand all paths to get individual files
# Priority 2: Process file paths
if file_paths:
# Track which paths are directories
# Track which paths are directories for summary
for path in file_paths:
if Path(path).is_dir():
dirs_processed.append(path)
try:
if Path(path).is_dir():
dirs_processed.append(path)
except Exception:
pass # Ignore invalid paths
# Expand to get all files
# Expand directories to get all individual files
all_files = expand_paths(file_paths)
if not all_files and file_paths:
@@ -279,7 +335,7 @@ def read_files(
f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n"
)
else:
# Read files up to token limit
# Read files sequentially until token limit is reached
for file_path in all_files:
if total_tokens >= available_tokens:
files_skipped.append(file_path)
@@ -293,9 +349,10 @@ def read_files(
total_tokens += file_tokens
files_read.append(file_path)
else:
# File too large for remaining budget
files_skipped.append(file_path)
# Build summary
# Build human-readable summary of what was processed
if dirs_processed:
summary_parts.append(f"Processed {len(dirs_processed)} dir(s)")
if files_read:
@@ -305,11 +362,12 @@ def read_files(
if total_tokens > 0:
summary_parts.append(f"~{total_tokens:,} tokens used")
# Add skipped files note if any were skipped
# Add informative note about skipped files to help users understand
# what was omitted and why
if files_skipped:
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files
# Show first 10 skipped files as examples
for i, file_path in enumerate(files_skipped[:10]):
skip_note += f" - {file_path}\n"
if len(files_skipped) > 10:

View File

@@ -1,5 +1,20 @@
"""
Git utilities for finding repositories and generating diffs.
This module provides Git integration functionality for the MCP server,
enabling tools to work with version control information. It handles
repository discovery, status checking, and diff generation.
Key Features:
- Recursive repository discovery with depth limits
- Safe command execution with timeouts
- Comprehensive status information extraction
- Support for staged and unstaged changes
Security Considerations:
- All git commands are run with timeouts to prevent hanging
- Repository discovery ignores common build/dependency directories
- Error handling for permission-denied scenarios
"""
import subprocess
@@ -8,16 +23,18 @@ from pathlib import Path
# Directories to ignore when searching for git repositories
# These are typically build artifacts, dependencies, or cache directories
# that don't contain source code and would slow down repository discovery
IGNORED_DIRS = {
"node_modules",
"__pycache__",
"venv",
"env",
"build",
"dist",
"target",
".tox",
".pytest_cache",
"node_modules", # Node.js dependencies
"__pycache__", # Python bytecode cache
"venv", # Python virtual environment
"env", # Alternative virtual environment name
"build", # Common build output directory
"dist", # Distribution/release builds
"target", # Maven/Rust build output
".tox", # Tox testing environments
".pytest_cache", # Pytest cache directory
}
@@ -25,38 +42,45 @@ def find_git_repositories(start_path: str, max_depth: int = 5) -> List[str]:
"""
Recursively find all git repositories starting from the given path.
This function walks the directory tree looking for .git directories,
which indicate the root of a git repository. It respects depth limits
to prevent excessive recursion in deep directory structures.
Args:
start_path: Directory to start searching from
max_depth: Maximum depth to search (prevents excessive recursion)
start_path: Directory to start searching from (must be absolute)
max_depth: Maximum depth to search (default 5 prevents excessive recursion)
Returns:
List of absolute paths to git repositories
List of absolute paths to git repositories, sorted alphabetically
"""
repositories = []
start_path = Path(start_path).resolve()
def _find_repos(current_path: Path, current_depth: int):
# Stop recursion if we've reached maximum depth
if current_depth > max_depth:
return
try:
# Check if current directory is a git repo
# Check if current directory contains a .git directory
git_dir = current_path / ".git"
if git_dir.exists() and git_dir.is_dir():
repositories.append(str(current_path))
# Don't search inside .git directory
# Don't search inside git repositories for nested repos
# This prevents finding submodules which should be handled separately
return
# Search subdirectories
# Search subdirectories for more repositories
for item in current_path.iterdir():
if item.is_dir() and not item.name.startswith("."):
# Skip common non-code directories
# Skip common non-code directories to improve performance
if item.name in IGNORED_DIRS:
continue
_find_repos(item, current_depth + 1)
except PermissionError:
# Skip directories we can't access
# Skip directories we don't have permission to read
# This is common for system directories or other users' files
pass
_find_repos(start_path, 0)
@@ -67,16 +91,28 @@ def run_git_command(repo_path: str, command: List[str]) -> Tuple[bool, str]:
"""
Run a git command in the specified repository.
This function provides a safe way to execute git commands with:
- Timeout protection (30 seconds) to prevent hanging
- Proper error handling and output capture
- Working directory context management
Args:
repo_path: Path to the git repository
command: Git command as a list of arguments
repo_path: Path to the git repository (working directory)
command: Git command as a list of arguments (excluding 'git' itself)
Returns:
Tuple of (success, output/error)
- success: True if command returned 0, False otherwise
- output/error: stdout if successful, stderr or error message if failed
"""
try:
# Execute git command with safety measures
result = subprocess.run(
["git"] + command, cwd=repo_path, capture_output=True, text=True, timeout=30
["git"] + command,
cwd=repo_path, # Run in repository directory
capture_output=True, # Capture stdout and stderr
text=True, # Return strings instead of bytes
timeout=30, # Prevent hanging on slow operations
)
if result.returncode == 0:
@@ -85,21 +121,36 @@ def run_git_command(repo_path: str, command: List[str]) -> Tuple[bool, str]:
return False, result.stderr
except subprocess.TimeoutExpired:
return False, "Command timed out"
return False, "Command timed out after 30 seconds"
except Exception as e:
return False, str(e)
return False, f"Git command failed: {str(e)}"
def get_git_status(repo_path: str) -> Dict[str, any]:
"""
Get the current git status of a repository.
Get comprehensive git status information for a repository.
This function gathers various pieces of repository state including:
- Current branch name
- Commits ahead/behind upstream
- Lists of staged, unstaged, and untracked files
The function is resilient to repositories without remotes or
in detached HEAD state.
Args:
repo_path: Path to the git repository
Returns:
Dictionary with status information
Dictionary with status information:
- branch: Current branch name (empty if detached)
- ahead: Number of commits ahead of upstream
- behind: Number of commits behind upstream
- staged_files: List of files with staged changes
- unstaged_files: List of files with unstaged changes
- untracked_files: List of untracked files
"""
# Initialize status structure with default values
status = {
"branch": "",
"ahead": 0,
@@ -109,12 +160,12 @@ def get_git_status(repo_path: str) -> Dict[str, any]:
"untracked_files": [],
}
# Get current branch
# Get current branch name (empty if in detached HEAD state)
success, branch = run_git_command(repo_path, ["branch", "--show-current"])
if success:
status["branch"] = branch.strip()
# Get ahead/behind info
# Get ahead/behind information relative to upstream branch
if status["branch"]:
success, ahead_behind = run_git_command(
repo_path,
@@ -131,33 +182,38 @@ def get_git_status(repo_path: str) -> Dict[str, any]:
if len(parts) == 2:
status["behind"] = int(parts[0])
status["ahead"] = int(parts[1])
# else: Could not get ahead/behind status (branch may not have upstream)
# Note: This will fail gracefully if branch has no upstream set
# Get file status
# Get file status using porcelain format for machine parsing
# Format: XY filename where X=staged status, Y=unstaged status
success, status_output = run_git_command(repo_path, ["status", "--porcelain"])
if success:
for line in status_output.strip().split("\n"):
if not line:
continue
status_code = line[:2]
path_info = line[3:]
status_code = line[:2] # Two-character status code
path_info = line[3:] # Filename (after space)
# Handle staged changes
# Parse staged changes (first character of status code)
if status_code[0] == "R":
# Format is "old_path -> new_path" for renamed files
# Special handling for renamed files
# Format is "old_path -> new_path"
if " -> " in path_info:
_, new_path = path_info.split(" -> ", 1)
status["staged_files"].append(new_path)
else:
status["staged_files"].append(path_info)
elif status_code[0] in ["M", "A", "D", "C"]:
# M=modified, A=added, D=deleted, C=copied
status["staged_files"].append(path_info)
# Handle unstaged changes
# Parse unstaged changes (second character of status code)
if status_code[1] in ["M", "D"]:
# M=modified, D=deleted in working tree
status["unstaged_files"].append(path_info)
elif status_code == "??":
# Untracked files have special marker "??"
status["untracked_files"].append(path_info)
return status

View File

@@ -1,5 +1,12 @@
"""
Token counting utilities
Token counting utilities for managing API context limits
This module provides functions for estimating token counts to ensure
requests stay within the Gemini API's context window limits.
Note: The estimation uses a simple character-to-token ratio which is
approximate. For production systems requiring precise token counts,
consider using the actual tokenizer for the specific model.
"""
from typing import Tuple
@@ -8,14 +15,40 @@ from config import MAX_CONTEXT_TOKENS
def estimate_tokens(text: str) -> int:
"""Estimate token count (rough: 1 token ≈ 4 characters)"""
"""
Estimate token count using a character-based approximation.
This uses a rough heuristic where 1 token ≈ 4 characters, which is
a reasonable approximation for English text. The actual token count
may vary based on:
- Language (non-English text may have different ratios)
- Code vs prose (code often has more tokens per character)
- Special characters and formatting
Args:
text: The text to estimate tokens for
Returns:
int: Estimated number of tokens
"""
return len(text) // 4
def check_token_limit(text: str) -> Tuple[bool, int]:
"""
Check if text exceeds token limit.
Returns: (is_within_limit, estimated_tokens)
Check if text exceeds the maximum token limit for Gemini models.
This function is used to validate that prepared prompts will fit
within the model's context window, preventing API errors and ensuring
reliable operation.
Args:
text: The text to check
Returns:
Tuple[bool, int]: (is_within_limit, estimated_tokens)
- is_within_limit: True if the text fits within MAX_CONTEXT_TOKENS
- estimated_tokens: The estimated token count
"""
estimated = estimate_tokens(text)
return estimated <= MAX_CONTEXT_TOKENS, estimated