Files
my-pal-mcp-server/utils/file_utils.py
Fahad e3e6f12cf3 fix: resolve linting issues across multiple files
- Remove unused imports (os, Optional, json, List, sys, MagicMock, DebugIssueTool)
- Remove unused variable assignments (safe_file_name)
- Fix f-strings without placeholders
- Reorder imports in test_live_integration.py
2025-06-09 13:53:37 +04:00

324 lines
9.6 KiB
Python

"""
File reading utilities with directory support and token management
"""
import os
from pathlib import Path
from typing import List, Optional, Tuple, Set
from .token_utils import estimate_tokens, MAX_CONTEXT_TOKENS
# Get project root from environment or use current directory
# This defines the sandbox directory where file access is allowed
PROJECT_ROOT = Path(os.environ.get("MCP_PROJECT_ROOT", os.getcwd())).resolve()
# Security: Prevent running with overly permissive root
if str(PROJECT_ROOT) == "/":
raise RuntimeError(
"Security Error: MCP_PROJECT_ROOT cannot be set to '/'. "
"This would give access to the entire filesystem."
)
# Common code file extensions
CODE_EXTENSIONS = {
".py",
".js",
".ts",
".jsx",
".tsx",
".java",
".cpp",
".c",
".h",
".hpp",
".cs",
".go",
".rs",
".rb",
".php",
".swift",
".kt",
".scala",
".r",
".m",
".mm",
".sql",
".sh",
".bash",
".zsh",
".fish",
".ps1",
".bat",
".cmd",
".yml",
".yaml",
".json",
".xml",
".toml",
".ini",
".cfg",
".conf",
".txt",
".md",
".rst",
".tex",
".html",
".css",
".scss",
".sass",
".less",
}
def resolve_and_validate_path(path_str: str) -> Path:
"""
Validates that a path is absolute and resolves it.
Args:
path_str: Path string (must be absolute)
Returns:
Resolved Path object
Raises:
ValueError: If path is not absolute
PermissionError: If path is outside allowed directory
"""
# Create a Path object from the user-provided path
user_path = Path(path_str)
# Require absolute paths
if not user_path.is_absolute():
raise ValueError(
f"Relative paths are not supported. Please provide an absolute path.\n"
f"Received: {path_str}"
)
# Resolve the absolute path
resolved_path = user_path.resolve()
# Security check: ensure the resolved path is within PROJECT_ROOT
try:
resolved_path.relative_to(PROJECT_ROOT)
except ValueError:
raise PermissionError(
f"Path outside project root: {path_str}\n"
f"Project root: {PROJECT_ROOT}\n"
f"Resolved path: {resolved_path}"
)
return resolved_path
def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> List[str]:
"""
Expand paths to individual files, handling both files and directories.
Args:
paths: List of file or directory paths
extensions: Optional set of file extensions to include
Returns:
List of individual file paths
"""
if extensions is None:
extensions = CODE_EXTENSIONS
expanded_files = []
seen = set()
for path in paths:
try:
path_obj = resolve_and_validate_path(path)
except (ValueError, PermissionError):
# Skip invalid paths
continue
if not path_obj.exists():
continue
if path_obj.is_file():
# Add file directly
if str(path_obj) not in seen:
expanded_files.append(str(path_obj))
seen.add(str(path_obj))
elif path_obj.is_dir():
# Walk directory recursively
for root, dirs, files in os.walk(path_obj):
# Skip hidden directories and __pycache__
dirs[:] = [
d for d in dirs if not d.startswith(".") and d != "__pycache__"
]
for file in files:
# Skip hidden files
if file.startswith("."):
continue
file_path = Path(root) / file
# Check extension
if not extensions or file_path.suffix.lower() in extensions:
full_path = str(file_path)
if full_path not in seen:
expanded_files.append(full_path)
seen.add(full_path)
# Sort for consistent ordering
expanded_files.sort()
return expanded_files
def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]:
"""
Read a single file and format it for Gemini.
Args:
file_path: Path to file (must be absolute)
max_size: Maximum file size to read
Returns:
(formatted_content, estimated_tokens)
"""
try:
path = resolve_and_validate_path(file_path)
except (ValueError, PermissionError) as e:
content = f"\n--- ERROR ACCESSING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
return content, estimate_tokens(content)
try:
# Check if path exists and is a file
if not path.exists():
content = f"\n--- FILE NOT FOUND: {file_path} ---\nError: File does not exist\n--- END FILE ---\n"
return content, estimate_tokens(content)
if not path.is_file():
content = f"\n--- NOT A FILE: {file_path} ---\nError: Path is not a file\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Check file size
file_size = path.stat().st_size
if file_size > max_size:
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Read the file
with open(path, "r", encoding="utf-8", errors="replace") as f:
file_content = f.read()
# Format with clear delimiters for Gemini
formatted = f"\n--- BEGIN FILE: {file_path} ---\n{file_content}\n--- END FILE: {file_path} ---\n"
return formatted, estimate_tokens(formatted)
except Exception as e:
content = f"\n--- ERROR READING FILE: {file_path} ---\nError: {str(e)}\n--- END FILE ---\n"
return content, estimate_tokens(content)
def read_files(
file_paths: List[str],
code: Optional[str] = None,
max_tokens: Optional[int] = None,
reserve_tokens: int = 50_000,
) -> Tuple[str, str]:
"""
Read multiple files and optional direct code with smart token management.
Args:
file_paths: List of file or directory paths
code: Optional direct code to include
max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS)
reserve_tokens: Tokens to reserve for prompt and response
Returns:
(full_content, brief_summary)
"""
if max_tokens is None:
max_tokens = MAX_CONTEXT_TOKENS
content_parts = []
summary_parts = []
total_tokens = 0
available_tokens = max_tokens - reserve_tokens
files_read = []
files_skipped = []
dirs_processed = []
# First, handle direct code if provided
if code:
formatted_code = (
f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
)
code_tokens = estimate_tokens(formatted_code)
if code_tokens <= available_tokens:
content_parts.append(formatted_code)
total_tokens += code_tokens
available_tokens -= code_tokens
code_preview = code[:50] + "..." if len(code) > 50 else code
summary_parts.append(f"Direct code: {code_preview}")
else:
summary_parts.append("Direct code skipped (too large)")
# Expand all paths to get individual files
if file_paths:
# Track which paths are directories
for path in file_paths:
if Path(path).is_dir():
dirs_processed.append(path)
# Expand to get all files
all_files = expand_paths(file_paths)
if not all_files and file_paths:
# No files found but paths were provided
content_parts.append(
f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n"
)
else:
# Read files up to token limit
for file_path in all_files:
if total_tokens >= available_tokens:
files_skipped.append(file_path)
continue
file_content, file_tokens = read_file_content(file_path)
# Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content)
total_tokens += file_tokens
files_read.append(file_path)
else:
files_skipped.append(file_path)
# Build summary
if dirs_processed:
summary_parts.append(f"Processed {len(dirs_processed)} dir(s)")
if files_read:
summary_parts.append(f"Read {len(files_read)} file(s)")
if files_skipped:
summary_parts.append(f"Skipped {len(files_skipped)} file(s) (token limit)")
if total_tokens > 0:
summary_parts.append(f"~{total_tokens:,} tokens used")
# Add skipped files note if any were skipped
if files_skipped:
skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files
for i, file_path in enumerate(files_skipped[:10]):
skip_note += f" - {file_path}\n"
if len(files_skipped) > 10:
skip_note += f" ... and {len(files_skipped) - 10} more\n"
skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note)
full_content = "\n\n".join(content_parts) if content_parts else ""
summary = " | ".join(summary_parts) if summary_parts else "No input provided"
return full_content, summary