Major new addition: refactor tool

Supports decomposing large components and files, finding codesmells, finding modernizing opportunities as well as code organization opportunities. Fix this mega-classes today!
Line numbers added to embedded code for better references from model -> claude
This commit is contained in:
Fahad
2025-06-15 06:00:01 +04:00
parent 70f1356e3e
commit b5004b91fc
28 changed files with 2633 additions and 310 deletions

View File

@@ -2,7 +2,9 @@
Utility functions for Zen MCP Server
"""
from .file_utils import CODE_EXTENSIONS, expand_paths, read_file_content, read_files
from .file_types import CODE_EXTENSIONS, FILE_CATEGORIES, PROGRAMMING_EXTENSIONS, TEXT_EXTENSIONS
from .file_utils import expand_paths, read_file_content, read_files
from .security_config import EXCLUDED_DIRS, SECURITY_ROOT
from .token_utils import check_token_limit, estimate_tokens
__all__ = [
@@ -10,6 +12,11 @@ __all__ = [
"read_file_content",
"expand_paths",
"CODE_EXTENSIONS",
"PROGRAMMING_EXTENSIONS",
"TEXT_EXTENSIONS",
"FILE_CATEGORIES",
"SECURITY_ROOT",
"EXCLUDED_DIRS",
"estimate_tokens",
"check_token_limit",
]

180
utils/file_types.py Normal file
View File

@@ -0,0 +1,180 @@
"""
File type definitions and constants for file processing
This module centralizes all file type and extension definitions used
throughout the MCP server for consistent file handling.
"""
# Programming language file extensions - core code files
PROGRAMMING_LANGUAGES = {
".py", # Python
".js", # JavaScript
".ts", # TypeScript
".jsx", # React JavaScript
".tsx", # React TypeScript
".java", # Java
".cpp", # C++
".c", # C
".h", # C/C++ Header
".hpp", # C++ Header
".cs", # C#
".go", # Go
".rs", # Rust
".rb", # Ruby
".php", # PHP
".swift", # Swift
".kt", # Kotlin
".scala", # Scala
".r", # R
".m", # Objective-C
".mm", # Objective-C++
}
# Script and shell file extensions
SCRIPTS = {
".sql", # SQL
".sh", # Shell
".bash", # Bash
".zsh", # Zsh
".fish", # Fish shell
".ps1", # PowerShell
".bat", # Batch
".cmd", # Command
}
# Configuration and data file extensions
CONFIGS = {
".yml", # YAML
".yaml", # YAML
".json", # JSON
".xml", # XML
".toml", # TOML
".ini", # INI
".cfg", # Config
".conf", # Config
".properties", # Properties
".env", # Environment
}
# Documentation and markup file extensions
DOCS = {
".txt", # Text
".md", # Markdown
".rst", # reStructuredText
".tex", # LaTeX
}
# Web development file extensions
WEB = {
".html", # HTML
".css", # CSS
".scss", # Sass
".sass", # Sass
".less", # Less
}
# Additional text file extensions for logs and data
TEXT_DATA = {
".log", # Log files
".csv", # CSV
".tsv", # TSV
".gitignore", # Git ignore
".dockerfile", # Docker
".makefile", # Make
".cmake", # CMake
".gradle", # Gradle
".sbt", # SBT
".pom", # Maven POM
".lock", # Lock files
}
# Image file extensions
IMAGES = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".svg", ".webp", ".ico", ".tiff", ".tif"}
# Binary executable and library extensions
BINARIES = {
".exe", # Windows executable
".dll", # Windows library
".so", # Linux shared object
".dylib", # macOS dynamic library
".bin", # Binary
".class", # Java class
}
# Archive and package file extensions
ARCHIVES = {
".jar",
".war",
".ear", # Java archives
".zip",
".tar",
".gz", # General archives
".7z",
".rar", # Compression
".deb",
".rpm", # Linux packages
".dmg",
".pkg", # macOS packages
}
# Derived sets for different use cases
CODE_EXTENSIONS = PROGRAMMING_LANGUAGES | SCRIPTS | CONFIGS | DOCS | WEB
PROGRAMMING_EXTENSIONS = PROGRAMMING_LANGUAGES # For line numbering
TEXT_EXTENSIONS = CODE_EXTENSIONS | TEXT_DATA
IMAGE_EXTENSIONS = IMAGES
BINARY_EXTENSIONS = BINARIES | ARCHIVES
# All extensions by category for easy access
FILE_CATEGORIES = {
"programming": PROGRAMMING_LANGUAGES,
"scripts": SCRIPTS,
"configs": CONFIGS,
"docs": DOCS,
"web": WEB,
"text_data": TEXT_DATA,
"images": IMAGES,
"binaries": BINARIES,
"archives": ARCHIVES,
}
def get_file_category(file_path: str) -> str:
"""
Determine the category of a file based on its extension.
Args:
file_path: Path to the file
Returns:
Category name or "unknown" if not recognized
"""
from pathlib import Path
extension = Path(file_path).suffix.lower()
for category, extensions in FILE_CATEGORIES.items():
if extension in extensions:
return category
return "unknown"
def is_code_file(file_path: str) -> bool:
"""Check if a file is a code file (programming language)."""
from pathlib import Path
return Path(file_path).suffix.lower() in PROGRAMMING_LANGUAGES
def is_text_file(file_path: str) -> bool:
"""Check if a file is a text file."""
from pathlib import Path
return Path(file_path).suffix.lower() in TEXT_EXTENSIONS
def is_binary_file(file_path: str) -> bool:
"""Check if a file is a binary file."""
from pathlib import Path
return Path(file_path).suffix.lower() in BINARY_EXTENSIONS

View File

@@ -23,148 +23,12 @@ import os
from pathlib import Path
from typing import Optional
from .file_types import BINARY_EXTENSIONS, CODE_EXTENSIONS, IMAGE_EXTENSIONS, TEXT_EXTENSIONS
from .security_config import CONTAINER_WORKSPACE, EXCLUDED_DIRS, MCP_SIGNATURE_FILES, SECURITY_ROOT, WORKSPACE_ROOT
from .token_utils import DEFAULT_CONTEXT_WINDOW, estimate_tokens
logger = logging.getLogger(__name__)
# Get workspace root for Docker path translation
# IMPORTANT: WORKSPACE_ROOT should contain the HOST path (e.g., /Users/john/project)
# that gets mounted to /workspace in the Docker container. This enables proper
# path translation between host absolute paths and container workspace paths.
WORKSPACE_ROOT = os.environ.get("WORKSPACE_ROOT")
CONTAINER_WORKSPACE = Path("/workspace")
# Dangerous paths that should never be used as WORKSPACE_ROOT
# These would give overly broad access and pose security risks
DANGEROUS_WORKSPACE_PATHS = {
"/",
"/etc",
"/usr",
"/bin",
"/var",
"/root",
"/home",
"/workspace", # Container path - WORKSPACE_ROOT should be host path
"C:\\",
"C:\\Windows",
"C:\\Program Files",
"C:\\Users",
}
# Validate WORKSPACE_ROOT for security if it's set
if WORKSPACE_ROOT:
# Resolve to canonical path for comparison
resolved_workspace = Path(WORKSPACE_ROOT).resolve()
# Special check for /workspace - common configuration mistake
if str(resolved_workspace) == "/workspace":
raise RuntimeError(
f"Configuration Error: WORKSPACE_ROOT should be set to the HOST path, not the container path. "
f"Found: WORKSPACE_ROOT={WORKSPACE_ROOT} "
f"Expected: WORKSPACE_ROOT should be set to your host directory path (e.g., $HOME) "
f"that contains all files Claude might reference. "
f"This path gets mounted to /workspace inside the Docker container."
)
# Check against other dangerous paths
if str(resolved_workspace) in DANGEROUS_WORKSPACE_PATHS:
raise RuntimeError(
f"Security Error: WORKSPACE_ROOT '{WORKSPACE_ROOT}' is set to a dangerous system directory. "
f"This would give access to critical system files. "
f"Please set WORKSPACE_ROOT to a specific project directory."
)
# Additional check: prevent filesystem root
if resolved_workspace.parent == resolved_workspace:
raise RuntimeError(
f"Security Error: WORKSPACE_ROOT '{WORKSPACE_ROOT}' cannot be the filesystem root. "
f"This would give access to the entire filesystem. "
f"Please set WORKSPACE_ROOT to a specific project directory."
)
# Security boundary
# In Docker: use /workspace (container directory)
# In tests/direct mode: use WORKSPACE_ROOT (host directory)
if CONTAINER_WORKSPACE.exists():
# Running in Docker container
SECURITY_ROOT = CONTAINER_WORKSPACE
elif WORKSPACE_ROOT:
# Running in tests or direct mode with WORKSPACE_ROOT set
SECURITY_ROOT = Path(WORKSPACE_ROOT).resolve()
else:
# Fallback for backward compatibility (should not happen in normal usage)
SECURITY_ROOT = Path.home()
# Directories to exclude from recursive file search
# These typically contain generated code, dependencies, or build artifacts
EXCLUDED_DIRS = {
"__pycache__",
"node_modules",
".venv",
"venv",
"env",
".env",
".git",
".svn",
".hg",
"build",
"dist",
"target",
".idea",
".vscode",
"__pypackages__",
".mypy_cache",
".pytest_cache",
".tox",
"htmlcov",
".coverage",
# Additional build and temp directories
"out",
".next",
".nuxt",
".cache",
".temp",
".tmp",
"bower_components",
"vendor",
".sass-cache",
".gradle",
".m2",
"coverage",
# OS-specific directories
".DS_Store",
"Thumbs.db",
# Python specific
"*.egg-info",
".eggs",
"wheels",
".Python",
# IDE and editor directories
".sublime",
".atom",
".brackets",
"*.swp",
"*.swo",
"*~",
# Documentation build
"_build",
"site",
# Mobile development
".expo",
".flutter",
}
# MCP signature files - presence of these indicates the MCP's own directory
# Used to prevent the MCP from scanning its own codebase
MCP_SIGNATURE_FILES = {
"zen_server.py",
"server.py",
"tools/precommit.py",
"utils/file_utils.py",
"prompts/tool_prompts.py",
}
def is_mcp_directory(path: Path) -> bool:
"""
@@ -242,7 +106,7 @@ def is_home_directory_root(path: Path) -> bool:
# Check if this is exactly the home directory
if resolved_path == resolved_home:
logger.warning(
f"Attempted to scan user home directory root: {path}. " f"Please specify a subdirectory instead."
f"Attempted to scan user home directory root: {path}. Please specify a subdirectory instead."
)
return True
@@ -277,56 +141,105 @@ def is_home_directory_root(path: Path) -> bool:
return False
# Common code file extensions that are automatically included when processing directories
# This set can be extended to support additional file types
CODE_EXTENSIONS = {
".py",
".js",
".ts",
".jsx",
".tsx",
".java",
".cpp",
".c",
".h",
".hpp",
".cs",
".go",
".rs",
".rb",
".php",
".swift",
".kt",
".scala",
".r",
".m",
".mm",
".sql",
".sh",
".bash",
".zsh",
".fish",
".ps1",
".bat",
".cmd",
".yml",
".yaml",
".json",
".xml",
".toml",
".ini",
".cfg",
".conf",
".txt",
".md",
".rst",
".tex",
".html",
".css",
".scss",
".sass",
".less",
}
def detect_file_type(file_path: str) -> str:
"""
Detect file type for appropriate processing strategy.
NOTE: This function is currently not used for line number auto-detection
due to backward compatibility requirements. It is intended for future
features requiring specific file type handling (e.g., image processing,
binary file analysis, or enhanced file filtering).
Args:
file_path: Path to the file to analyze
Returns:
str: "text", "binary", or "image"
"""
path = Path(file_path)
# Check extension first (fast)
extension = path.suffix.lower()
if extension in TEXT_EXTENSIONS:
return "text"
elif extension in IMAGE_EXTENSIONS:
return "image"
elif extension in BINARY_EXTENSIONS:
return "binary"
# Fallback: check magic bytes for text vs binary
# This is helpful for files without extensions or unknown extensions
try:
with open(path, "rb") as f:
chunk = f.read(1024)
# Simple heuristic: if we can decode as UTF-8, likely text
chunk.decode("utf-8")
return "text"
except UnicodeDecodeError:
return "binary"
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"Could not access file {file_path} for type detection: {e}")
return "unknown"
def should_add_line_numbers(file_path: str, include_line_numbers: Optional[bool] = None) -> bool:
"""
Determine if line numbers should be added to a file.
Args:
file_path: Path to the file
include_line_numbers: Explicit preference, or None for auto-detection
Returns:
bool: True if line numbers should be added
"""
if include_line_numbers is not None:
return include_line_numbers
# Default: DO NOT add line numbers (backwards compatibility)
# Tools that want line numbers must explicitly request them
return False
def _normalize_line_endings(content: str) -> str:
"""
Normalize line endings for consistent line numbering.
Args:
content: File content with potentially mixed line endings
Returns:
str: Content with normalized LF line endings
"""
# Normalize all line endings to LF for consistent counting
return content.replace("\r\n", "\n").replace("\r", "\n")
def _add_line_numbers(content: str) -> str:
"""
Add line numbers to text content for precise referencing.
Args:
content: Text content to number
Returns:
str: Content with line numbers in format " 45│ actual code line"
Supports files up to 99,999 lines with dynamic width allocation
"""
# Normalize line endings first
normalized_content = _normalize_line_endings(content)
lines = normalized_content.split("\n")
# Dynamic width allocation based on total line count
# This supports files of any size by computing required width
total_lines = len(lines)
width = len(str(total_lines))
width = max(width, 4) # Minimum padding for readability
# Format with dynamic width and clear separator
numbered_lines = [f"{i + 1:{width}d}{line}" for i, line in enumerate(lines)]
return "\n".join(numbered_lines)
def translate_path_for_environment(path_str: str) -> str:
@@ -515,15 +428,13 @@ def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> lis
# Check 2: Prevent scanning user's home directory root
if is_home_directory_root(path_obj):
logger.warning(
f"Skipping home directory root: {path}. " f"Please specify a project subdirectory instead."
)
logger.warning(f"Skipping home directory root: {path}. Please specify a project subdirectory instead.")
continue
# Check 3: Skip if this is the MCP's own directory
if is_mcp_directory(path_obj):
logger.info(
f"Skipping MCP server directory: {path}. " f"The MCP server code is excluded from project scans."
f"Skipping MCP server directory: {path}. The MCP server code is excluded from project scans."
)
continue
@@ -575,7 +486,9 @@ def expand_paths(paths: list[str], extensions: Optional[set[str]] = None) -> lis
return expanded_files
def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, int]:
def read_file_content(
file_path: str, max_size: int = 1_000_000, *, include_line_numbers: Optional[bool] = None
) -> tuple[str, int]:
"""
Read a single file and format it for inclusion in AI prompts.
@@ -586,6 +499,7 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
Args:
file_path: Path to file (must be absolute)
max_size: Maximum file size to read (default 1MB to prevent memory issues)
include_line_numbers: Whether to add line numbers. If None, auto-detects based on file type
Returns:
Tuple of (formatted_content, estimated_tokens)
@@ -634,6 +548,10 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
content = f"\n--- FILE TOO LARGE: {file_path} ---\nFile size: {file_size:,} bytes (max: {max_size:,})\n--- END FILE ---\n"
return content, estimate_tokens(content)
# Determine if we should add line numbers
add_line_numbers = should_add_line_numbers(file_path, include_line_numbers)
logger.debug(f"[FILES] Line numbers for {file_path}: {'enabled' if add_line_numbers else 'disabled'}")
# Read the file with UTF-8 encoding, replacing invalid characters
# This ensures we can handle files with mixed encodings
logger.debug(f"[FILES] Reading file content for {file_path}")
@@ -642,6 +560,14 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> tuple[str, i
logger.debug(f"[FILES] Successfully read {len(file_content)} characters from {file_path}")
# Add line numbers if requested or auto-detected
if add_line_numbers:
file_content = _add_line_numbers(file_content)
logger.debug(f"[FILES] Added line numbers to {file_path}")
else:
# Still normalize line endings for consistency
file_content = _normalize_line_endings(file_content)
# Format with clear delimiters that help the AI understand file boundaries
# Using consistent markers makes it easier for the model to parse
# NOTE: These markers ("--- BEGIN FILE: ... ---") are distinct from git diff markers
@@ -665,6 +591,8 @@ def read_files(
code: Optional[str] = None,
max_tokens: Optional[int] = None,
reserve_tokens: int = 50_000,
*,
include_line_numbers: bool = False,
) -> str:
"""
Read multiple files and optional direct code with smart token management.
@@ -679,6 +607,7 @@ def read_files(
code: Optional direct code to include (prioritized over files)
max_tokens: Maximum tokens to use (defaults to DEFAULT_CONTEXT_WINDOW)
reserve_tokens: Tokens to reserve for prompt and response (default 50K)
include_line_numbers: Whether to add line numbers to file content
Returns:
str: All file contents formatted for AI consumption
@@ -728,7 +657,7 @@ def read_files(
files_skipped.extend(all_files[i:])
break
file_content, file_tokens = read_file_content(file_path)
file_content, file_tokens = read_file_content(file_path, include_line_numbers=include_line_numbers)
logger.debug(f"[FILES] File {file_path}: {file_tokens:,} tokens")
# Check if adding this file would exceed limit

174
utils/security_config.py Normal file
View File

@@ -0,0 +1,174 @@
"""
Security configuration and path validation constants
This module contains security-related constants and configurations
for file access control and workspace management.
"""
import os
from pathlib import Path
# Dangerous paths that should never be used as WORKSPACE_ROOT
# These would give overly broad access and pose security risks
DANGEROUS_WORKSPACE_PATHS = {
"/",
"/etc",
"/usr",
"/bin",
"/var",
"/root",
"/home",
"/workspace", # Container path - WORKSPACE_ROOT should be host path
"C:\\",
"C:\\Windows",
"C:\\Program Files",
"C:\\Users",
}
# Directories to exclude from recursive file search
# These typically contain generated code, dependencies, or build artifacts
EXCLUDED_DIRS = {
# Python
"__pycache__",
".venv",
"venv",
"env",
".env",
"*.egg-info",
".eggs",
"wheels",
".Python",
".mypy_cache",
".pytest_cache",
".tox",
"htmlcov",
".coverage",
"coverage",
# Node.js / JavaScript
"node_modules",
".next",
".nuxt",
"bower_components",
".sass-cache",
# Version Control
".git",
".svn",
".hg",
# Build Output
"build",
"dist",
"target",
"out",
# IDEs
".idea",
".vscode",
".sublime",
".atom",
".brackets",
# Temporary / Cache
".cache",
".temp",
".tmp",
"*.swp",
"*.swo",
"*~",
# OS-specific
".DS_Store",
"Thumbs.db",
# Java / JVM
".gradle",
".m2",
# Documentation build
"_build",
"site",
# Mobile development
".expo",
".flutter",
# Package managers
"vendor",
}
# MCP signature files - presence of these indicates the MCP's own directory
# Used to prevent the MCP from scanning its own codebase
MCP_SIGNATURE_FILES = {
"zen_server.py",
"server.py",
"tools/precommit.py",
"utils/file_utils.py",
"prompts/tool_prompts.py",
}
# Workspace configuration
WORKSPACE_ROOT = os.environ.get("WORKSPACE_ROOT")
CONTAINER_WORKSPACE = Path("/workspace")
def validate_workspace_security(workspace_root: str) -> None:
"""
Validate that WORKSPACE_ROOT is set to a safe directory.
Args:
workspace_root: The workspace root path to validate
Raises:
RuntimeError: If the workspace root is unsafe
"""
if not workspace_root:
return
# Resolve to canonical path for comparison
resolved_workspace = Path(workspace_root).resolve()
# Special check for /workspace - common configuration mistake
if str(resolved_workspace) == "/workspace":
raise RuntimeError(
f"Configuration Error: WORKSPACE_ROOT should be set to the HOST path, not the container path. "
f"Found: WORKSPACE_ROOT={workspace_root} "
f"Expected: WORKSPACE_ROOT should be set to your host directory path (e.g., $HOME) "
f"that contains all files Claude might reference. "
f"This path gets mounted to /workspace inside the Docker container."
)
# Check against other dangerous paths
if str(resolved_workspace) in DANGEROUS_WORKSPACE_PATHS:
raise RuntimeError(
f"Security Error: WORKSPACE_ROOT '{workspace_root}' is set to a dangerous system directory. "
f"This would give access to critical system files. "
f"Please set WORKSPACE_ROOT to a specific project directory."
)
# Additional check: prevent filesystem root
if resolved_workspace.parent == resolved_workspace:
raise RuntimeError(
f"Security Error: WORKSPACE_ROOT '{workspace_root}' cannot be the filesystem root. "
f"This would give access to the entire filesystem. "
f"Please set WORKSPACE_ROOT to a specific project directory."
)
def get_security_root() -> Path:
"""
Determine the security boundary for file access.
Returns:
Path object representing the security root directory
"""
# In Docker: use /workspace (container directory)
# In tests/direct mode: use WORKSPACE_ROOT (host directory)
if CONTAINER_WORKSPACE.exists():
# Running in Docker container
return CONTAINER_WORKSPACE
elif WORKSPACE_ROOT:
# Running in tests or direct mode with WORKSPACE_ROOT set
return Path(WORKSPACE_ROOT).resolve()
else:
# Fallback for backward compatibility (should not happen in normal usage)
return Path.home()
# Validate security on import if WORKSPACE_ROOT is set
if WORKSPACE_ROOT:
validate_workspace_security(WORKSPACE_ROOT)
# Export the computed security root
SECURITY_ROOT = get_security_root()