style: fix linting and formatting issues

- Run black formatter on all Python files
- Fix ruff linting issues:
  - Remove unused imports
  - Remove unused variables
  - Fix f-string without placeholders
- All 37 tests still pass
- Code quality improved for CI/CD compliance

🧹 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Fahad
2025-06-09 09:37:46 +04:00
parent fb5c04ea60
commit 5ccedcecd8
14 changed files with 376 additions and 256 deletions

View File

@@ -9,7 +9,9 @@ __author__ = "Fahad Gilani"
# Model configuration # Model configuration
DEFAULT_MODEL = "gemini-2.5-pro-preview-06-05" DEFAULT_MODEL = "gemini-2.5-pro-preview-06-05"
THINKING_MODEL = "gemini-2.0-flash-thinking-exp" # Enhanced reasoning model for think_deeper THINKING_MODEL = (
"gemini-2.0-flash-thinking-exp" # Enhanced reasoning model for think_deeper
)
MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens for Gemini Pro MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens for Gemini Pro
# Temperature defaults for different tool types # Temperature defaults for different tool types

View File

@@ -2,8 +2,13 @@
System prompts for Gemini tools System prompts for Gemini tools
""" """
from .tool_prompts import (ANALYZE_PROMPT, CHAT_PROMPT, DEBUG_ISSUE_PROMPT, from .tool_prompts import (
REVIEW_CODE_PROMPT, THINK_DEEPER_PROMPT) ANALYZE_PROMPT,
CHAT_PROMPT,
DEBUG_ISSUE_PROMPT,
REVIEW_CODE_PROMPT,
THINK_DEEPER_PROMPT,
)
__all__ = [ __all__ = [
"THINK_DEEPER_PROMPT", "THINK_DEEPER_PROMPT",

View File

@@ -10,14 +10,18 @@ from datetime import datetime
from typing import Any, Dict, List from typing import Any, Dict, List
from google import genai from google import genai
from google.genai import types
from mcp.server import Server from mcp.server import Server
from mcp.server.models import InitializationOptions from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import TextContent, Tool
from config import (DEFAULT_MODEL, MAX_CONTEXT_TOKENS, __author__, __updated__, from config import (
__version__) DEFAULT_MODEL,
MAX_CONTEXT_TOKENS,
__author__,
__updated__,
__version__,
)
from tools import AnalyzeTool, DebugIssueTool, ReviewCodeTool, ThinkDeeperTool from tools import AnalyzeTool, DebugIssueTool, ReviewCodeTool, ThinkDeeperTool
# Configure logging # Configure logging
@@ -125,9 +129,7 @@ async def handle_list_tools() -> List[Tool]:
@server.call_tool() @server.call_tool()
async def handle_call_tool( async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
name: str, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Handle tool execution requests""" """Handle tool execution requests"""
# Handle dynamic tools # Handle dynamic tools
@@ -151,7 +153,7 @@ async def handle_call_tool(
async def handle_chat(arguments: Dict[str, Any]) -> List[TextContent]: async def handle_chat(arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle general chat requests""" """Handle general chat requests"""
from config import TEMPERATURE_BALANCED, DEFAULT_MODEL, THINKING_MODEL from config import TEMPERATURE_BALANCED, DEFAULT_MODEL
from prompts import CHAT_PROMPT from prompts import CHAT_PROMPT
from utils import read_files from utils import read_files
@@ -164,24 +166,37 @@ async def handle_chat(arguments: Dict[str, Any]) -> List[TextContent]:
user_content = prompt user_content = prompt
if context_files: if context_files:
file_content, _ = read_files(context_files) file_content, _ = read_files(context_files)
user_content = f"{prompt}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===" user_content = (
f"{prompt}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ==="
)
# Combine system prompt with user content # Combine system prompt with user content
full_prompt = f"{CHAT_PROMPT}\n\n=== USER REQUEST ===\n{user_content}\n=== END REQUEST ===\n\nPlease provide a thoughtful, comprehensive response:" full_prompt = f"{CHAT_PROMPT}\n\n=== USER REQUEST ===\n{user_content}\n=== END REQUEST ===\n\nPlease provide a thoughtful, comprehensive response:"
try: try:
# Create model with thinking configuration # Create model with thinking configuration
from tools.base import BaseTool from tools.base import BaseTool
# Create a temporary tool instance to use create_model method # Create a temporary tool instance to use create_model method
class TempTool(BaseTool): class TempTool(BaseTool):
def get_name(self): return "chat" def get_name(self):
def get_description(self): return "" return "chat"
def get_input_schema(self): return {}
def get_system_prompt(self): return "" def get_description(self):
def get_request_model(self): return None return ""
async def prepare_prompt(self, request): return ""
def get_input_schema(self):
return {}
def get_system_prompt(self):
return ""
def get_request_model(self):
return None
async def prepare_prompt(self, request):
return ""
temp_tool = TempTool() temp_tool = TempTool()
model = temp_tool.create_model(DEFAULT_MODEL, temperature, thinking_mode) model = temp_tool.create_model(DEFAULT_MODEL, temperature, thinking_mode)
@@ -207,7 +222,7 @@ async def handle_list_models() -> List[TextContent]:
api_key = os.getenv("GEMINI_API_KEY") api_key = os.getenv("GEMINI_API_KEY")
if not api_key: if not api_key:
return [TextContent(type="text", text="Error: GEMINI_API_KEY not set")] return [TextContent(type="text", text="Error: GEMINI_API_KEY not set")]
client = genai.Client(api_key=api_key) client = genai.Client(api_key=api_key)
models = [] models = []
@@ -218,13 +233,21 @@ async def handle_list_models() -> List[TextContent]:
models.append( models.append(
{ {
"name": getattr(model_info, "id", "Unknown"), "name": getattr(model_info, "id", "Unknown"),
"display_name": getattr(model_info, "display_name", getattr(model_info, "id", "Unknown")), "display_name": getattr(
"description": getattr(model_info, "description", "No description"), model_info,
"is_default": getattr(model_info, "id", "").endswith(DEFAULT_MODEL), "display_name",
getattr(model_info, "id", "Unknown"),
),
"description": getattr(
model_info, "description", "No description"
),
"is_default": getattr(model_info, "id", "").endswith(
DEFAULT_MODEL
),
} }
) )
except Exception as e: except Exception:
# Fallback: return some known models # Fallback: return some known models
models = [ models = [
{ {
@@ -244,9 +267,7 @@ async def handle_list_models() -> List[TextContent]:
return [TextContent(type="text", text=json.dumps(models, indent=2))] return [TextContent(type="text", text=json.dumps(models, indent=2))]
except Exception as e: except Exception as e:
return [ return [TextContent(type="text", text=f"Error listing models: {str(e)}")]
TextContent(type="text", text=f"Error listing models: {str(e)}")
]
async def handle_get_version() -> List[TextContent]: async def handle_get_version() -> List[TextContent]:
@@ -259,8 +280,7 @@ async def handle_get_version() -> List[TextContent]:
"max_context_tokens": f"{MAX_CONTEXT_TOKENS:,}", "max_context_tokens": f"{MAX_CONTEXT_TOKENS:,}",
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"server_started": datetime.now().isoformat(), "server_started": datetime.now().isoformat(),
"available_tools": list(TOOLS.keys()) "available_tools": list(TOOLS.keys()) + ["chat", "list_models", "get_version"],
+ ["chat", "list_models", "get_version"],
} }
text = f"""Gemini MCP Server v{__version__} text = f"""Gemini MCP Server v{__version__}

View File

@@ -2,10 +2,17 @@
Tests for configuration Tests for configuration
""" """
from config import (DEFAULT_MODEL, MAX_CONTEXT_TOKENS, from config import (
TEMPERATURE_ANALYTICAL, TEMPERATURE_BALANCED, DEFAULT_MODEL,
TEMPERATURE_CREATIVE, TOOL_TRIGGERS, __author__, MAX_CONTEXT_TOKENS,
__updated__, __version__) TEMPERATURE_ANALYTICAL,
TEMPERATURE_BALANCED,
TEMPERATURE_CREATIVE,
TOOL_TRIGGERS,
__author__,
__updated__,
__version__,
)
class TestConfig: class TestConfig:
@@ -15,11 +22,11 @@ class TestConfig:
"""Test version information exists and has correct format""" """Test version information exists and has correct format"""
# Check version format (e.g., "2.4.1") # Check version format (e.g., "2.4.1")
assert isinstance(__version__, str) assert isinstance(__version__, str)
assert len(__version__.split('.')) == 3 # Major.Minor.Patch assert len(__version__.split(".")) == 3 # Major.Minor.Patch
# Check author # Check author
assert __author__ == "Fahad Gilani" assert __author__ == "Fahad Gilani"
# Check updated date exists (don't assert on specific format/value) # Check updated date exists (don't assert on specific format/value)
assert isinstance(__updated__, str) assert isinstance(__updated__, str)

View File

@@ -20,68 +20,68 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from tools.analyze import AnalyzeTool from tools.analyze import AnalyzeTool
from tools.think_deeper import ThinkDeeperTool from tools.think_deeper import ThinkDeeperTool
from tools.review_code import ReviewCodeTool
from tools.debug_issue import DebugIssueTool
async def run_manual_live_tests(): async def run_manual_live_tests():
"""Run live tests manually without pytest""" """Run live tests manually without pytest"""
print("🚀 Running manual live integration tests...") print("🚀 Running manual live integration tests...")
# Check API key # Check API key
if not os.environ.get("GEMINI_API_KEY"): if not os.environ.get("GEMINI_API_KEY"):
print("❌ GEMINI_API_KEY not found. Set it to run live tests.") print("❌ GEMINI_API_KEY not found. Set it to run live tests.")
return False return False
try: try:
# Test google-genai import # Test google-genai import
from google import genai
from google.genai import types
print("✅ google-genai library import successful") print("✅ google-genai library import successful")
# Test tool integration # Test tool integration
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("def hello(): return 'world'") f.write("def hello(): return 'world'")
temp_path = f.name temp_path = f.name
try: try:
# Test AnalyzeTool # Test AnalyzeTool
tool = AnalyzeTool() tool = AnalyzeTool()
result = await tool.execute({ result = await tool.execute(
"files": [temp_path], {
"question": "What does this code do?", "files": [temp_path],
"thinking_mode": "low" "question": "What does this code do?",
}) "thinking_mode": "low",
}
)
if result and result[0].text: if result and result[0].text:
print("✅ AnalyzeTool live test successful") print("✅ AnalyzeTool live test successful")
else: else:
print("❌ AnalyzeTool live test failed") print("❌ AnalyzeTool live test failed")
return False return False
# Test ThinkDeeperTool # Test ThinkDeeperTool
think_tool = ThinkDeeperTool() think_tool = ThinkDeeperTool()
result = await think_tool.execute({ result = await think_tool.execute(
"current_analysis": "Testing live integration", {
"thinking_mode": "minimal" # Fast test "current_analysis": "Testing live integration",
}) "thinking_mode": "minimal", # Fast test
}
)
if result and result[0].text and "Extended Analysis" in result[0].text: if result and result[0].text and "Extended Analysis" in result[0].text:
print("✅ ThinkDeeperTool live test successful") print("✅ ThinkDeeperTool live test successful")
else: else:
print("❌ ThinkDeeperTool live test failed") print("❌ ThinkDeeperTool live test failed")
return False return False
finally: finally:
Path(temp_path).unlink(missing_ok=True) Path(temp_path).unlink(missing_ok=True)
print("\n🎉 All manual live tests passed!") print("\n🎉 All manual live tests passed!")
print("✅ google-genai library working correctly") print("✅ google-genai library working correctly")
print("✅ All tools can make live API calls") print("✅ All tools can make live API calls")
print("✅ Thinking modes functioning properly") print("✅ Thinking modes functioning properly")
return True return True
except Exception as e: except Exception as e:
print(f"❌ Live test failed: {e}") print(f"❌ Live test failed: {e}")
return False return False
@@ -90,4 +90,4 @@ async def run_manual_live_tests():
if __name__ == "__main__": if __name__ == "__main__":
# Run live tests when script is executed directly # Run live tests when script is executed directly
success = asyncio.run(run_manual_live_tests()) success = asyncio.run(run_manual_live_tests())
exit(0 if success else 1) exit(0 if success else 1)

View File

@@ -33,9 +33,7 @@ class TestServerTools:
# Check descriptions are verbose # Check descriptions are verbose
for tool in tools: for tool in tools:
assert ( assert len(tool.description) > 50 # All should have detailed descriptions
len(tool.description) > 50
) # All should have detailed descriptions
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_call_tool_unknown(self): async def test_handle_call_tool_unknown(self):
@@ -49,8 +47,9 @@ class TestServerTools:
"""Test chat functionality""" """Test chat functionality"""
# Set test environment # Set test environment
import os import os
os.environ["PYTEST_CURRENT_TEST"] = "test" os.environ["PYTEST_CURRENT_TEST"] = "test"
# Create a mock for the model # Create a mock for the model
with patch("tools.base.BaseTool.create_model") as mock_create: with patch("tools.base.BaseTool.create_model") as mock_create:
mock_model = Mock() mock_model = Mock()
@@ -58,9 +57,9 @@ class TestServerTools:
candidates=[Mock(content=Mock(parts=[Mock(text="Chat response")]))] candidates=[Mock(content=Mock(parts=[Mock(text="Chat response")]))]
) )
mock_create.return_value = mock_model mock_create.return_value = mock_model
result = await handle_call_tool("chat", {"prompt": "Hello Gemini"}) result = await handle_call_tool("chat", {"prompt": "Hello Gemini"})
assert len(result) == 1 assert len(result) == 1
assert result[0].text == "Chat response" assert result[0].text == "Chat response"
@@ -69,7 +68,7 @@ class TestServerTools:
"""Test listing models""" """Test listing models"""
result = await handle_call_tool("list_models", {}) result = await handle_call_tool("list_models", {})
assert len(result) == 1 assert len(result) == 1
# Check if we got models or an error # Check if we got models or an error
text = result[0].text text = result[0].text
if "Error" in text: if "Error" in text:

View File

@@ -2,7 +2,6 @@
Tests for thinking_mode functionality across all tools Tests for thinking_mode functionality across all tools
""" """
import os
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import pytest import pytest
@@ -22,7 +21,7 @@ def setup_test_env():
class TestThinkingModes: class TestThinkingModes:
"""Test thinking modes across all tools""" """Test thinking modes across all tools"""
def test_default_thinking_modes(self): def test_default_thinking_modes(self):
"""Test that tools have correct default thinking modes""" """Test that tools have correct default thinking modes"""
tools = [ tools = [
@@ -31,35 +30,40 @@ class TestThinkingModes:
(ReviewCodeTool(), "medium"), (ReviewCodeTool(), "medium"),
(DebugIssueTool(), "medium"), (DebugIssueTool(), "medium"),
] ]
for tool, expected_default in tools: for tool, expected_default in tools:
assert tool.get_default_thinking_mode() == expected_default, \ assert (
f"{tool.__class__.__name__} should default to {expected_default}" tool.get_default_thinking_mode() == expected_default
), f"{tool.__class__.__name__} should default to {expected_default}"
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_thinking_mode_minimal(self, mock_create_model): async def test_thinking_mode_minimal(self, mock_create_model):
"""Test minimal thinking mode""" """Test minimal thinking mode"""
mock_model = Mock() mock_model = Mock()
mock_model.generate_content.return_value = Mock( mock_model.generate_content.return_value = Mock(
candidates=[Mock(content=Mock(parts=[Mock(text="Minimal thinking response")]))] candidates=[
Mock(content=Mock(parts=[Mock(text="Minimal thinking response")]))
]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
tool = AnalyzeTool() tool = AnalyzeTool()
result = await tool.execute({ result = await tool.execute(
"files": ["test.py"], {
"question": "What is this?", "files": ["test.py"],
"thinking_mode": "minimal" "question": "What is this?",
}) "thinking_mode": "minimal",
}
)
# Verify create_model was called with correct thinking_mode # Verify create_model was called with correct thinking_mode
mock_create_model.assert_called_once() mock_create_model.assert_called_once()
args = mock_create_model.call_args[0] args = mock_create_model.call_args[0]
assert args[2] == "minimal" # thinking_mode parameter assert args[2] == "minimal" # thinking_mode parameter
assert result[0].text.startswith("Analysis:") assert result[0].text.startswith("Analysis:")
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_thinking_mode_low(self, mock_create_model): async def test_thinking_mode_low(self, mock_create_model):
@@ -69,43 +73,44 @@ class TestThinkingModes:
candidates=[Mock(content=Mock(parts=[Mock(text="Low thinking response")]))] candidates=[Mock(content=Mock(parts=[Mock(text="Low thinking response")]))]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
tool = ReviewCodeTool() tool = ReviewCodeTool()
result = await tool.execute({ result = await tool.execute({"files": ["test.py"], "thinking_mode": "low"})
"files": ["test.py"],
"thinking_mode": "low"
})
# Verify create_model was called with correct thinking_mode # Verify create_model was called with correct thinking_mode
mock_create_model.assert_called_once() mock_create_model.assert_called_once()
args = mock_create_model.call_args[0] args = mock_create_model.call_args[0]
assert args[2] == "low" assert args[2] == "low"
assert "Code Review" in result[0].text assert "Code Review" in result[0].text
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_thinking_mode_medium(self, mock_create_model): async def test_thinking_mode_medium(self, mock_create_model):
"""Test medium thinking mode (default for most tools)""" """Test medium thinking mode (default for most tools)"""
mock_model = Mock() mock_model = Mock()
mock_model.generate_content.return_value = Mock( mock_model.generate_content.return_value = Mock(
candidates=[Mock(content=Mock(parts=[Mock(text="Medium thinking response")]))] candidates=[
Mock(content=Mock(parts=[Mock(text="Medium thinking response")]))
]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
tool = DebugIssueTool() tool = DebugIssueTool()
result = await tool.execute({ result = await tool.execute(
"error_description": "Test error", {
# Not specifying thinking_mode, should use default (medium) "error_description": "Test error",
}) # Not specifying thinking_mode, should use default (medium)
}
)
# Verify create_model was called with default thinking_mode # Verify create_model was called with default thinking_mode
mock_create_model.assert_called_once() mock_create_model.assert_called_once()
args = mock_create_model.call_args[0] args = mock_create_model.call_args[0]
assert args[2] == "medium" assert args[2] == "medium"
assert "Debug Analysis" in result[0].text assert "Debug Analysis" in result[0].text
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_thinking_mode_high(self, mock_create_model): async def test_thinking_mode_high(self, mock_create_model):
@@ -115,19 +120,21 @@ class TestThinkingModes:
candidates=[Mock(content=Mock(parts=[Mock(text="High thinking response")]))] candidates=[Mock(content=Mock(parts=[Mock(text="High thinking response")]))]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
tool = AnalyzeTool() tool = AnalyzeTool()
result = await tool.execute({ await tool.execute(
"files": ["complex.py"], {
"question": "Analyze architecture", "files": ["complex.py"],
"thinking_mode": "high" "question": "Analyze architecture",
}) "thinking_mode": "high",
}
)
# Verify create_model was called with correct thinking_mode # Verify create_model was called with correct thinking_mode
mock_create_model.assert_called_once() mock_create_model.assert_called_once()
args = mock_create_model.call_args[0] args = mock_create_model.call_args[0]
assert args[2] == "high" assert args[2] == "high"
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_thinking_mode_max(self, mock_create_model): async def test_thinking_mode_max(self, mock_create_model):
@@ -137,47 +144,58 @@ class TestThinkingModes:
candidates=[Mock(content=Mock(parts=[Mock(text="Max thinking response")]))] candidates=[Mock(content=Mock(parts=[Mock(text="Max thinking response")]))]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
tool = ThinkDeeperTool() tool = ThinkDeeperTool()
result = await tool.execute({ result = await tool.execute(
"current_analysis": "Initial analysis", {
# Not specifying thinking_mode, should use default (max) "current_analysis": "Initial analysis",
}) # Not specifying thinking_mode, should use default (max)
}
)
# Verify create_model was called with default thinking_mode # Verify create_model was called with default thinking_mode
mock_create_model.assert_called_once() mock_create_model.assert_called_once()
args = mock_create_model.call_args[0] args = mock_create_model.call_args[0]
assert args[2] == "max" assert args[2] == "max"
assert "Extended Analysis by Gemini" in result[0].text assert "Extended Analysis by Gemini" in result[0].text
def test_thinking_budget_mapping(self): def test_thinking_budget_mapping(self):
"""Test that thinking modes map to correct budget values""" """Test that thinking modes map to correct budget values"""
from tools.base import BaseTool from tools.base import BaseTool
# Create a simple test tool # Create a simple test tool
class TestTool(BaseTool): class TestTool(BaseTool):
def get_name(self): return "test" def get_name(self):
def get_description(self): return "test" return "test"
def get_input_schema(self): return {}
def get_system_prompt(self): return "test" def get_description(self):
def get_request_model(self): return None return "test"
async def prepare_prompt(self, request): return "test"
def get_input_schema(self):
tool = TestTool() return {}
def get_system_prompt(self):
return "test"
def get_request_model(self):
return None
async def prepare_prompt(self, request):
return "test"
# Expected mappings # Expected mappings
expected_budgets = { expected_budgets = {
"minimal": 128, "minimal": 128,
"low": 2048, "low": 2048,
"medium": 8192, "medium": 8192,
"high": 16384, "high": 16384,
"max": 32768 "max": 32768,
} }
# Check each mode in create_model # Check each mode in create_model
for mode, expected_budget in expected_budgets.items(): for mode, expected_budget in expected_budgets.items():
# The budget mapping is inside create_model # The budget mapping is inside create_model
# We can't easily test it without calling the method # We can't easily test it without calling the method
# But we've verified the values are correct in the code # But we've verified the values are correct in the code
pass pass

View File

@@ -120,7 +120,9 @@ class TestDebugIssueTool:
# Mock model # Mock model
mock_model = Mock() mock_model = Mock()
mock_model.generate_content.return_value = Mock( mock_model.generate_content.return_value = Mock(
candidates=[Mock(content=Mock(parts=[Mock(text="Root cause: race condition")]))] candidates=[
Mock(content=Mock(parts=[Mock(text="Root cause: race condition")]))
]
) )
mock_create_model.return_value = mock_model mock_create_model.return_value = mock_model
@@ -157,9 +159,7 @@ class TestAnalyzeTool:
@pytest.mark.asyncio @pytest.mark.asyncio
@patch("tools.base.BaseTool.create_model") @patch("tools.base.BaseTool.create_model")
async def test_execute_with_analysis_type( async def test_execute_with_analysis_type(self, mock_model, tool, tmp_path):
self, mock_model, tool, tmp_path
):
"""Test execution with specific analysis type""" """Test execution with specific analysis type"""
# Create test file # Create test file
test_file = tmp_path / "module.py" test_file = tmp_path / "module.py"
@@ -168,9 +168,7 @@ class TestAnalyzeTool:
# Mock response # Mock response
mock_response = Mock() mock_response = Mock()
mock_response.candidates = [Mock()] mock_response.candidates = [Mock()]
mock_response.candidates[0].content.parts = [ mock_response.candidates[0].content.parts = [Mock(text="Architecture analysis")]
Mock(text="Architecture analysis")
]
mock_instance = Mock() mock_instance = Mock()
mock_instance.generate_content.return_value = mock_response mock_instance.generate_content.return_value = mock_response

View File

@@ -2,8 +2,7 @@
Tests for utility functions Tests for utility functions
""" """
from utils import (check_token_limit, estimate_tokens, read_file_content, from utils import check_token_limit, estimate_tokens, read_file_content, read_files
read_files)
class TestFileUtils: class TestFileUtils:
@@ -12,9 +11,7 @@ class TestFileUtils:
def test_read_file_content_success(self, tmp_path): def test_read_file_content_success(self, tmp_path):
"""Test successful file reading""" """Test successful file reading"""
test_file = tmp_path / "test.py" test_file = tmp_path / "test.py"
test_file.write_text( test_file.write_text("def hello():\n return 'world'", encoding="utf-8")
"def hello():\n return 'world'", encoding="utf-8"
)
content, tokens = read_file_content(str(test_file)) content, tokens = read_file_content(str(test_file))
assert "--- BEGIN FILE:" in content assert "--- BEGIN FILE:" in content
@@ -71,18 +68,18 @@ class TestFileUtils:
(tmp_path / "file1.py").write_text("print('file1')", encoding="utf-8") (tmp_path / "file1.py").write_text("print('file1')", encoding="utf-8")
(tmp_path / "file2.js").write_text("console.log('file2')", encoding="utf-8") (tmp_path / "file2.js").write_text("console.log('file2')", encoding="utf-8")
(tmp_path / "readme.md").write_text("# README", encoding="utf-8") (tmp_path / "readme.md").write_text("# README", encoding="utf-8")
# Create subdirectory # Create subdirectory
subdir = tmp_path / "src" subdir = tmp_path / "src"
subdir.mkdir() subdir.mkdir()
(subdir / "module.py").write_text("class Module: pass", encoding="utf-8") (subdir / "module.py").write_text("class Module: pass", encoding="utf-8")
# Create hidden file (should be skipped) # Create hidden file (should be skipped)
(tmp_path / ".hidden").write_text("secret", encoding="utf-8") (tmp_path / ".hidden").write_text("secret", encoding="utf-8")
# Read the directory # Read the directory
content, summary = read_files([str(tmp_path)]) content, summary = read_files([str(tmp_path)])
# Check files are included # Check files are included
assert "file1.py" in content assert "file1.py" in content
assert "file2.js" in content assert "file2.js" in content
@@ -90,17 +87,17 @@ class TestFileUtils:
# Handle both forward and backslashes for cross-platform compatibility # Handle both forward and backslashes for cross-platform compatibility
assert "module.py" in content assert "module.py" in content
assert "class Module: pass" in content assert "class Module: pass" in content
# Check content # Check content
assert "print('file1')" in content assert "print('file1')" in content
assert "console.log('file2')" in content assert "console.log('file2')" in content
assert "# README" in content assert "# README" in content
assert "class Module: pass" in content assert "class Module: pass" in content
# Hidden file should not be included # Hidden file should not be included
assert ".hidden" not in content assert ".hidden" not in content
assert "secret" not in content assert "secret" not in content
# Check summary # Check summary
assert "Processed 1 dir(s)" in summary assert "Processed 1 dir(s)" in summary
assert "Read 4 file(s)" in summary assert "Read 4 file(s)" in summary
@@ -110,23 +107,23 @@ class TestFileUtils:
# Create files # Create files
file1 = tmp_path / "direct.py" file1 = tmp_path / "direct.py"
file1.write_text("# Direct file", encoding="utf-8") file1.write_text("# Direct file", encoding="utf-8")
# Create directory with files # Create directory with files
subdir = tmp_path / "subdir" subdir = tmp_path / "subdir"
subdir.mkdir() subdir.mkdir()
(subdir / "sub1.py").write_text("# Sub file 1", encoding="utf-8") (subdir / "sub1.py").write_text("# Sub file 1", encoding="utf-8")
(subdir / "sub2.py").write_text("# Sub file 2", encoding="utf-8") (subdir / "sub2.py").write_text("# Sub file 2", encoding="utf-8")
# Read mix of direct file and directory # Read mix of direct file and directory
content, summary = read_files([str(file1), str(subdir)]) content, summary = read_files([str(file1), str(subdir)])
assert "direct.py" in content assert "direct.py" in content
assert "sub1.py" in content assert "sub1.py" in content
assert "sub2.py" in content assert "sub2.py" in content
assert "# Direct file" in content assert "# Direct file" in content
assert "# Sub file 1" in content assert "# Sub file 1" in content
assert "# Sub file 2" in content assert "# Sub file 2" in content
assert "Processed 1 dir(s)" in summary assert "Processed 1 dir(s)" in summary
assert "Read 3 file(s)" in summary assert "Read 3 file(s)" in summary
@@ -135,19 +132,19 @@ class TestFileUtils:
# Create files with known token counts # Create files with known token counts
# ~250 tokens each (1000 chars) # ~250 tokens each (1000 chars)
large_content = "x" * 1000 large_content = "x" * 1000
for i in range(5): for i in range(5):
(tmp_path / f"file{i}.txt").write_text(large_content, encoding="utf-8") (tmp_path / f"file{i}.txt").write_text(large_content, encoding="utf-8")
# Read with small token limit (should skip some files) # Read with small token limit (should skip some files)
# Reserve 50k tokens, limit to 51k total = 1k available # Reserve 50k tokens, limit to 51k total = 1k available
# Each file ~250 tokens, so should read ~3-4 files # Each file ~250 tokens, so should read ~3-4 files
content, summary = read_files([str(tmp_path)], max_tokens=51_000) content, summary = read_files([str(tmp_path)], max_tokens=51_000)
assert "Skipped" in summary assert "Skipped" in summary
assert "token limit" in summary assert "token limit" in summary
assert "--- SKIPPED FILES (TOKEN LIMIT) ---" in content assert "--- SKIPPED FILES (TOKEN LIMIT) ---" in content
# Count how many files were read # Count how many files were read
read_count = content.count("--- BEGIN FILE:") read_count = content.count("--- BEGIN FILE:")
assert 2 <= read_count <= 4 # Should read some but not all assert 2 <= read_count <= 4 # Should read some but not all
@@ -157,9 +154,9 @@ class TestFileUtils:
# Create a file larger than max_size (1MB) # Create a file larger than max_size (1MB)
large_file = tmp_path / "large.txt" large_file = tmp_path / "large.txt"
large_file.write_text("x" * 2_000_000, encoding="utf-8") # 2MB large_file.write_text("x" * 2_000_000, encoding="utf-8") # 2MB
content, summary = read_files([str(large_file)]) content, summary = read_files([str(large_file)])
assert "--- FILE TOO LARGE:" in content assert "--- FILE TOO LARGE:" in content
assert "2,000,000 bytes" in content assert "2,000,000 bytes" in content
assert "Read 1 file(s)" in summary # File is counted but shows error message assert "Read 1 file(s)" in summary # File is counted but shows error message
@@ -171,13 +168,13 @@ class TestFileUtils:
(tmp_path / "style.css").write_text("css", encoding="utf-8") (tmp_path / "style.css").write_text("css", encoding="utf-8")
(tmp_path / "binary.exe").write_text("exe", encoding="utf-8") (tmp_path / "binary.exe").write_text("exe", encoding="utf-8")
(tmp_path / "image.jpg").write_text("jpg", encoding="utf-8") (tmp_path / "image.jpg").write_text("jpg", encoding="utf-8")
content, summary = read_files([str(tmp_path)]) content, summary = read_files([str(tmp_path)])
# Code files should be included # Code files should be included
assert "code.py" in content assert "code.py" in content
assert "style.css" in content assert "style.css" in content
# Binary files should not be included (not in CODE_EXTENSIONS) # Binary files should not be included (not in CODE_EXTENSIONS)
assert "binary.exe" not in content assert "binary.exe" not in content
assert "image.jpg" not in content assert "image.jpg" not in content

View File

@@ -11,6 +11,7 @@ from google.genai import types
from mcp.types import TextContent from mcp.types import TextContent
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
class ToolRequest(BaseModel): class ToolRequest(BaseModel):
"""Base request model for all tools""" """Base request model for all tools"""
@@ -21,7 +22,8 @@ class ToolRequest(BaseModel):
None, description="Temperature for response (tool-specific defaults)" None, description="Temperature for response (tool-specific defaults)"
) )
thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field( thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field(
None, description="Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768)" None,
description="Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768)",
) )
@@ -128,15 +130,15 @@ class BaseTool(ABC):
"""Create a configured Gemini model with thinking configuration""" """Create a configured Gemini model with thinking configuration"""
# Map thinking modes to budget values # Map thinking modes to budget values
thinking_budgets = { thinking_budgets = {
"minimal": 128, # Minimum for 2.5 Pro "minimal": 128, # Minimum for 2.5 Pro
"low": 2048, "low": 2048,
"medium": 8192, "medium": 8192,
"high": 16384, "high": 16384,
"max": 32768 "max": 32768,
} }
thinking_budget = thinking_budgets.get(thinking_mode, 8192) thinking_budget = thinking_budgets.get(thinking_mode, 8192)
# For models supporting thinking config, use the new API # For models supporting thinking config, use the new API
# Skip in test environment to allow mocking # Skip in test environment to allow mocking
if "2.5" in model_name and not os.environ.get("PYTEST_CURRENT_TEST"): if "2.5" in model_name and not os.environ.get("PYTEST_CURRENT_TEST"):
@@ -145,17 +147,19 @@ class BaseTool(ABC):
api_key = os.environ.get("GEMINI_API_KEY") api_key = os.environ.get("GEMINI_API_KEY")
if not api_key: if not api_key:
raise ValueError("GEMINI_API_KEY environment variable is required") raise ValueError("GEMINI_API_KEY environment variable is required")
client = genai.Client(api_key=api_key) client = genai.Client(api_key=api_key)
# Create a wrapper to match the expected interface # Create a wrapper to match the expected interface
class ModelWrapper: class ModelWrapper:
def __init__(self, client, model_name, temperature, thinking_budget): def __init__(
self, client, model_name, temperature, thinking_budget
):
self.client = client self.client = client
self.model_name = model_name self.model_name = model_name
self.temperature = temperature self.temperature = temperature
self.thinking_budget = thinking_budget self.thinking_budget = thinking_budget
def generate_content(self, prompt): def generate_content(self, prompt):
response = self.client.models.generate_content( response = self.client.models.generate_content(
model=self.model_name, model=self.model_name,
@@ -163,43 +167,62 @@ class BaseTool(ABC):
config=types.GenerateContentConfig( config=types.GenerateContentConfig(
temperature=self.temperature, temperature=self.temperature,
candidate_count=1, candidate_count=1,
thinking_config=types.ThinkingConfig(thinking_budget=self.thinking_budget) thinking_config=types.ThinkingConfig(
thinking_budget=self.thinking_budget
),
), ),
) )
# Convert to match expected format # Convert to match expected format
class ResponseWrapper: class ResponseWrapper:
def __init__(self, text): def __init__(self, text):
self.text = text self.text = text
self.candidates = [type('obj', (object,), { self.candidates = [
'content': type('obj', (object,), { type(
'parts': [type('obj', (object,), {'text': text})] "obj",
})(), (object,),
'finish_reason': 'STOP' {
})] "content": type(
"obj",
(object,),
{
"parts": [
type(
"obj",
(object,),
{"text": text},
)
]
},
)(),
"finish_reason": "STOP",
},
)
]
return ResponseWrapper(response.text) return ResponseWrapper(response.text)
return ModelWrapper(client, model_name, temperature, thinking_budget) return ModelWrapper(client, model_name, temperature, thinking_budget)
except Exception as e: except Exception:
# Fall back to regular genai model if new API fails # Fall back to regular genai model if new API fails
pass pass
# For non-2.5 models or if thinking not needed, use regular API # For non-2.5 models or if thinking not needed, use regular API
# Get API key # Get API key
api_key = os.environ.get("GEMINI_API_KEY") api_key = os.environ.get("GEMINI_API_KEY")
if not api_key: if not api_key:
raise ValueError("GEMINI_API_KEY environment variable is required") raise ValueError("GEMINI_API_KEY environment variable is required")
client = genai.Client(api_key=api_key) client = genai.Client(api_key=api_key)
# Create wrapper for consistency # Create wrapper for consistency
class SimpleModelWrapper: class SimpleModelWrapper:
def __init__(self, client, model_name, temperature): def __init__(self, client, model_name, temperature):
self.client = client self.client = client
self.model_name = model_name self.model_name = model_name
self.temperature = temperature self.temperature = temperature
def generate_content(self, prompt): def generate_content(self, prompt):
response = self.client.models.generate_content( response = self.client.models.generate_content(
model=self.model_name, model=self.model_name,
@@ -209,18 +232,30 @@ class BaseTool(ABC):
candidate_count=1, candidate_count=1,
), ),
) )
# Convert to match expected format # Convert to match expected format
class ResponseWrapper: class ResponseWrapper:
def __init__(self, text): def __init__(self, text):
self.text = text self.text = text
self.candidates = [type('obj', (object,), { self.candidates = [
'content': type('obj', (object,), { type(
'parts': [type('obj', (object,), {'text': text})] "obj",
})(), (object,),
'finish_reason': 'STOP' {
})] "content": type(
"obj",
(object,),
{
"parts": [
type("obj", (object,), {"text": text})
]
},
)(),
"finish_reason": "STOP",
},
)
]
return ResponseWrapper(response.text) return ResponseWrapper(response.text)
return SimpleModelWrapper(client, model_name, temperature) return SimpleModelWrapper(client, model_name, temperature)

View File

@@ -146,8 +146,6 @@ Focus on finding the root cause and providing actionable solutions."""
return full_prompt return full_prompt
def format_response( def format_response(self, response: str, request: DebugIssueRequest) -> str:
self, response: str, request: DebugIssueRequest
) -> str:
"""Format the debugging response""" """Format the debugging response"""
return f"Debug Analysis\n{'=' * 50}\n\n{response}" return f"Debug Analysis\n{'=' * 50}\n\n{response}"

View File

@@ -130,14 +130,10 @@ class ReviewCodeTool(BaseTool):
) )
if request.focus_on: if request.focus_on:
review_focus.append( review_focus.append(f"Pay special attention to: {request.focus_on}")
f"Pay special attention to: {request.focus_on}"
)
if request.standards: if request.standards:
review_focus.append( review_focus.append(f"Enforce these standards: {request.standards}")
f"Enforce these standards: {request.standards}"
)
if request.severity_filter != "all": if request.severity_filter != "all":
review_focus.append( review_focus.append(
@@ -159,9 +155,7 @@ Please provide a comprehensive code review following the format specified in the
return full_prompt return full_prompt
def format_response( def format_response(self, response: str, request: ReviewCodeRequest) -> str:
self, response: str, request: ReviewCodeRequest
) -> str:
"""Format the review response""" """Format the review response"""
header = f"Code Review ({request.review_type.upper()})" header = f"Code Review ({request.review_type.upper()})"
if request.focus_on: if request.focus_on:

View File

@@ -130,7 +130,9 @@ class ThinkDeeperTool(BaseTool):
focus_instruction = "" focus_instruction = ""
if request.focus_areas: if request.focus_areas:
areas = ", ".join(request.focus_areas) areas = ", ".join(request.focus_areas)
focus_instruction = f"\n\nFOCUS AREAS: Please pay special attention to {areas} aspects." focus_instruction = (
f"\n\nFOCUS AREAS: Please pay special attention to {areas} aspects."
)
# Combine system prompt with context # Combine system prompt with context
full_prompt = f"""{self.get_system_prompt()}{focus_instruction} full_prompt = f"""{self.get_system_prompt()}{focus_instruction}
@@ -146,8 +148,6 @@ Please provide deep analysis that extends Claude's thinking with:
return full_prompt return full_prompt
def format_response( def format_response(self, response: str, request: ThinkDeeperRequest) -> str:
self, response: str, request: ThinkDeeperRequest
) -> str:
"""Format the response with clear attribution""" """Format the response with clear attribution"""
return f"Extended Analysis by Gemini:\n\n{response}" return f"Extended Analysis by Gemini:\n\n{response}"

View File

@@ -11,63 +11,106 @@ from .token_utils import estimate_tokens, MAX_CONTEXT_TOKENS
# Common code file extensions # Common code file extensions
CODE_EXTENSIONS = { CODE_EXTENSIONS = {
'.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.h', '.hpp', ".py",
'.cs', '.go', '.rs', '.rb', '.php', '.swift', '.kt', '.scala', '.r', '.m', ".js",
'.mm', '.sql', '.sh', '.bash', '.zsh', '.fish', '.ps1', '.bat', '.cmd', ".ts",
'.yml', '.yaml', '.json', '.xml', '.toml', '.ini', '.cfg', '.conf', ".jsx",
'.txt', '.md', '.rst', '.tex', '.html', '.css', '.scss', '.sass', '.less' ".tsx",
".java",
".cpp",
".c",
".h",
".hpp",
".cs",
".go",
".rs",
".rb",
".php",
".swift",
".kt",
".scala",
".r",
".m",
".mm",
".sql",
".sh",
".bash",
".zsh",
".fish",
".ps1",
".bat",
".cmd",
".yml",
".yaml",
".json",
".xml",
".toml",
".ini",
".cfg",
".conf",
".txt",
".md",
".rst",
".tex",
".html",
".css",
".scss",
".sass",
".less",
} }
def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> List[str]: def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> List[str]:
""" """
Expand paths to individual files, handling both files and directories. Expand paths to individual files, handling both files and directories.
Args: Args:
paths: List of file or directory paths paths: List of file or directory paths
extensions: Optional set of file extensions to include extensions: Optional set of file extensions to include
Returns: Returns:
List of individual file paths List of individual file paths
""" """
if extensions is None: if extensions is None:
extensions = CODE_EXTENSIONS extensions = CODE_EXTENSIONS
expanded_files = [] expanded_files = []
seen = set() seen = set()
for path in paths: for path in paths:
path_obj = Path(path) path_obj = Path(path)
if not path_obj.exists(): if not path_obj.exists():
continue continue
if path_obj.is_file(): if path_obj.is_file():
# Add file directly # Add file directly
if str(path_obj) not in seen: if str(path_obj) not in seen:
expanded_files.append(str(path_obj)) expanded_files.append(str(path_obj))
seen.add(str(path_obj)) seen.add(str(path_obj))
elif path_obj.is_dir(): elif path_obj.is_dir():
# Walk directory recursively # Walk directory recursively
for root, dirs, files in os.walk(path_obj): for root, dirs, files in os.walk(path_obj):
# Skip hidden directories and __pycache__ # Skip hidden directories and __pycache__
dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__'] dirs[:] = [
d for d in dirs if not d.startswith(".") and d != "__pycache__"
]
for file in files: for file in files:
# Skip hidden files # Skip hidden files
if file.startswith('.'): if file.startswith("."):
continue continue
file_path = Path(root) / file file_path = Path(root) / file
# Check extension # Check extension
if not extensions or file_path.suffix.lower() in extensions: if not extensions or file_path.suffix.lower() in extensions:
full_path = str(file_path) full_path = str(file_path)
if full_path not in seen: if full_path not in seen:
expanded_files.append(full_path) expanded_files.append(full_path)
seen.add(full_path) seen.add(full_path)
# Sort for consistent ordering # Sort for consistent ordering
expanded_files.sort() expanded_files.sort()
return expanded_files return expanded_files
@@ -76,11 +119,11 @@ def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> Lis
def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]: def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]:
""" """
Read a single file and format it for Gemini. Read a single file and format it for Gemini.
Args: Args:
file_path: Path to file file_path: Path to file
max_size: Maximum file size to read max_size: Maximum file size to read
Returns: Returns:
(formatted_content, estimated_tokens) (formatted_content, estimated_tokens)
""" """
@@ -116,40 +159,42 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, i
def read_files( def read_files(
file_paths: List[str], file_paths: List[str],
code: Optional[str] = None, code: Optional[str] = None,
max_tokens: Optional[int] = None, max_tokens: Optional[int] = None,
reserve_tokens: int = 50_000 reserve_tokens: int = 50_000,
) -> Tuple[str, str]: ) -> Tuple[str, str]:
""" """
Read multiple files and optional direct code with smart token management. Read multiple files and optional direct code with smart token management.
Args: Args:
file_paths: List of file or directory paths file_paths: List of file or directory paths
code: Optional direct code to include code: Optional direct code to include
max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS) max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS)
reserve_tokens: Tokens to reserve for prompt and response reserve_tokens: Tokens to reserve for prompt and response
Returns: Returns:
(full_content, brief_summary) (full_content, brief_summary)
""" """
if max_tokens is None: if max_tokens is None:
max_tokens = MAX_CONTEXT_TOKENS max_tokens = MAX_CONTEXT_TOKENS
content_parts = [] content_parts = []
summary_parts = [] summary_parts = []
total_tokens = 0 total_tokens = 0
available_tokens = max_tokens - reserve_tokens available_tokens = max_tokens - reserve_tokens
files_read = [] files_read = []
files_skipped = [] files_skipped = []
dirs_processed = [] dirs_processed = []
# First, handle direct code if provided # First, handle direct code if provided
if code: if code:
formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n" formatted_code = (
f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n"
)
code_tokens = estimate_tokens(formatted_code) code_tokens = estimate_tokens(formatted_code)
if code_tokens <= available_tokens: if code_tokens <= available_tokens:
content_parts.append(formatted_code) content_parts.append(formatted_code)
total_tokens += code_tokens total_tokens += code_tokens
@@ -158,29 +203,31 @@ def read_files(
summary_parts.append(f"Direct code: {code_preview}") summary_parts.append(f"Direct code: {code_preview}")
else: else:
summary_parts.append("Direct code skipped (too large)") summary_parts.append("Direct code skipped (too large)")
# Expand all paths to get individual files # Expand all paths to get individual files
if file_paths: if file_paths:
# Track which paths are directories # Track which paths are directories
for path in file_paths: for path in file_paths:
if Path(path).is_dir(): if Path(path).is_dir():
dirs_processed.append(path) dirs_processed.append(path)
# Expand to get all files # Expand to get all files
all_files = expand_paths(file_paths) all_files = expand_paths(file_paths)
if not all_files and file_paths: if not all_files and file_paths:
# No files found but paths were provided # No files found but paths were provided
content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n") content_parts.append(
f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n"
)
else: else:
# Read files up to token limit # Read files up to token limit
for file_path in all_files: for file_path in all_files:
if total_tokens >= available_tokens: if total_tokens >= available_tokens:
files_skipped.append(file_path) files_skipped.append(file_path)
continue continue
file_content, file_tokens = read_file_content(file_path) file_content, file_tokens = read_file_content(file_path)
# Check if adding this file would exceed limit # Check if adding this file would exceed limit
if total_tokens + file_tokens <= available_tokens: if total_tokens + file_tokens <= available_tokens:
content_parts.append(file_content) content_parts.append(file_content)
@@ -188,7 +235,7 @@ def read_files(
files_read.append(file_path) files_read.append(file_path)
else: else:
files_skipped.append(file_path) files_skipped.append(file_path)
# Build summary # Build summary
if dirs_processed: if dirs_processed:
summary_parts.append(f"Processed {len(dirs_processed)} dir(s)") summary_parts.append(f"Processed {len(dirs_processed)} dir(s)")
@@ -198,10 +245,10 @@ def read_files(
summary_parts.append(f"Skipped {len(files_skipped)} file(s) (token limit)") summary_parts.append(f"Skipped {len(files_skipped)} file(s) (token limit)")
if total_tokens > 0: if total_tokens > 0:
summary_parts.append(f"~{total_tokens:,} tokens used") summary_parts.append(f"~{total_tokens:,} tokens used")
# Add skipped files note if any were skipped # Add skipped files note if any were skipped
if files_skipped: if files_skipped:
skip_note = f"\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n" skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n"
skip_note += f"Total skipped: {len(files_skipped)}\n" skip_note += f"Total skipped: {len(files_skipped)}\n"
# Show first 10 skipped files # Show first 10 skipped files
for i, file_path in enumerate(files_skipped[:10]): for i, file_path in enumerate(files_skipped[:10]):
@@ -210,8 +257,8 @@ def read_files(
skip_note += f" ... and {len(files_skipped) - 10} more\n" skip_note += f" ... and {len(files_skipped) - 10} more\n"
skip_note += "--- END SKIPPED FILES ---\n" skip_note += "--- END SKIPPED FILES ---\n"
content_parts.append(skip_note) content_parts.append(skip_note)
full_content = "\n\n".join(content_parts) if content_parts else "" full_content = "\n\n".join(content_parts) if content_parts else ""
summary = " | ".join(summary_parts) if summary_parts else "No input provided" summary = " | ".join(summary_parts) if summary_parts else "No input provided"
return full_content, summary return full_content, summary