diff --git a/config.py b/config.py index 50b6cdc..78eabd4 100644 --- a/config.py +++ b/config.py @@ -9,7 +9,9 @@ __author__ = "Fahad Gilani" # Model configuration DEFAULT_MODEL = "gemini-2.5-pro-preview-06-05" -THINKING_MODEL = "gemini-2.0-flash-thinking-exp" # Enhanced reasoning model for think_deeper +THINKING_MODEL = ( + "gemini-2.0-flash-thinking-exp" # Enhanced reasoning model for think_deeper +) MAX_CONTEXT_TOKENS = 1_000_000 # 1M tokens for Gemini Pro # Temperature defaults for different tool types diff --git a/prompts/__init__.py b/prompts/__init__.py index 7818681..970b456 100644 --- a/prompts/__init__.py +++ b/prompts/__init__.py @@ -2,8 +2,13 @@ System prompts for Gemini tools """ -from .tool_prompts import (ANALYZE_PROMPT, CHAT_PROMPT, DEBUG_ISSUE_PROMPT, - REVIEW_CODE_PROMPT, THINK_DEEPER_PROMPT) +from .tool_prompts import ( + ANALYZE_PROMPT, + CHAT_PROMPT, + DEBUG_ISSUE_PROMPT, + REVIEW_CODE_PROMPT, + THINK_DEEPER_PROMPT, +) __all__ = [ "THINK_DEEPER_PROMPT", diff --git a/server.py b/server.py index 0eb038f..e04ebfe 100644 --- a/server.py +++ b/server.py @@ -10,14 +10,18 @@ from datetime import datetime from typing import Any, Dict, List from google import genai -from google.genai import types from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool -from config import (DEFAULT_MODEL, MAX_CONTEXT_TOKENS, __author__, __updated__, - __version__) +from config import ( + DEFAULT_MODEL, + MAX_CONTEXT_TOKENS, + __author__, + __updated__, + __version__, +) from tools import AnalyzeTool, DebugIssueTool, ReviewCodeTool, ThinkDeeperTool # Configure logging @@ -125,9 +129,7 @@ async def handle_list_tools() -> List[Tool]: @server.call_tool() -async def handle_call_tool( - name: str, arguments: Dict[str, Any] -) -> List[TextContent]: +async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: """Handle tool execution requests""" # Handle dynamic tools @@ -151,7 +153,7 @@ async def handle_call_tool( async def handle_chat(arguments: Dict[str, Any]) -> List[TextContent]: """Handle general chat requests""" - from config import TEMPERATURE_BALANCED, DEFAULT_MODEL, THINKING_MODEL + from config import TEMPERATURE_BALANCED, DEFAULT_MODEL from prompts import CHAT_PROMPT from utils import read_files @@ -164,24 +166,37 @@ async def handle_chat(arguments: Dict[str, Any]) -> List[TextContent]: user_content = prompt if context_files: file_content, _ = read_files(context_files) - user_content = f"{prompt}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===" - + user_content = ( + f"{prompt}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===" + ) + # Combine system prompt with user content full_prompt = f"{CHAT_PROMPT}\n\n=== USER REQUEST ===\n{user_content}\n=== END REQUEST ===\n\nPlease provide a thoughtful, comprehensive response:" try: # Create model with thinking configuration from tools.base import BaseTool - + # Create a temporary tool instance to use create_model method class TempTool(BaseTool): - def get_name(self): return "chat" - def get_description(self): return "" - def get_input_schema(self): return {} - def get_system_prompt(self): return "" - def get_request_model(self): return None - async def prepare_prompt(self, request): return "" - + def get_name(self): + return "chat" + + def get_description(self): + return "" + + def get_input_schema(self): + return {} + + def get_system_prompt(self): + return "" + + def get_request_model(self): + return None + + async def prepare_prompt(self, request): + return "" + temp_tool = TempTool() model = temp_tool.create_model(DEFAULT_MODEL, temperature, thinking_mode) @@ -207,7 +222,7 @@ async def handle_list_models() -> List[TextContent]: api_key = os.getenv("GEMINI_API_KEY") if not api_key: return [TextContent(type="text", text="Error: GEMINI_API_KEY not set")] - + client = genai.Client(api_key=api_key) models = [] @@ -218,13 +233,21 @@ async def handle_list_models() -> List[TextContent]: models.append( { "name": getattr(model_info, "id", "Unknown"), - "display_name": getattr(model_info, "display_name", getattr(model_info, "id", "Unknown")), - "description": getattr(model_info, "description", "No description"), - "is_default": getattr(model_info, "id", "").endswith(DEFAULT_MODEL), + "display_name": getattr( + model_info, + "display_name", + getattr(model_info, "id", "Unknown"), + ), + "description": getattr( + model_info, "description", "No description" + ), + "is_default": getattr(model_info, "id", "").endswith( + DEFAULT_MODEL + ), } ) - except Exception as e: + except Exception: # Fallback: return some known models models = [ { @@ -244,9 +267,7 @@ async def handle_list_models() -> List[TextContent]: return [TextContent(type="text", text=json.dumps(models, indent=2))] except Exception as e: - return [ - TextContent(type="text", text=f"Error listing models: {str(e)}") - ] + return [TextContent(type="text", text=f"Error listing models: {str(e)}")] async def handle_get_version() -> List[TextContent]: @@ -259,8 +280,7 @@ async def handle_get_version() -> List[TextContent]: "max_context_tokens": f"{MAX_CONTEXT_TOKENS:,}", "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "server_started": datetime.now().isoformat(), - "available_tools": list(TOOLS.keys()) - + ["chat", "list_models", "get_version"], + "available_tools": list(TOOLS.keys()) + ["chat", "list_models", "get_version"], } text = f"""Gemini MCP Server v{__version__} diff --git a/tests/test_config.py b/tests/test_config.py index 5d9e59d..8396043 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,10 +2,17 @@ Tests for configuration """ -from config import (DEFAULT_MODEL, MAX_CONTEXT_TOKENS, - TEMPERATURE_ANALYTICAL, TEMPERATURE_BALANCED, - TEMPERATURE_CREATIVE, TOOL_TRIGGERS, __author__, - __updated__, __version__) +from config import ( + DEFAULT_MODEL, + MAX_CONTEXT_TOKENS, + TEMPERATURE_ANALYTICAL, + TEMPERATURE_BALANCED, + TEMPERATURE_CREATIVE, + TOOL_TRIGGERS, + __author__, + __updated__, + __version__, +) class TestConfig: @@ -15,11 +22,11 @@ class TestConfig: """Test version information exists and has correct format""" # Check version format (e.g., "2.4.1") assert isinstance(__version__, str) - assert len(__version__.split('.')) == 3 # Major.Minor.Patch - + assert len(__version__.split(".")) == 3 # Major.Minor.Patch + # Check author assert __author__ == "Fahad Gilani" - + # Check updated date exists (don't assert on specific format/value) assert isinstance(__updated__, str) diff --git a/tests/test_live_integration.py b/tests/test_live_integration.py index 67ac1e3..aa9765d 100644 --- a/tests/test_live_integration.py +++ b/tests/test_live_integration.py @@ -20,68 +20,68 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from tools.analyze import AnalyzeTool from tools.think_deeper import ThinkDeeperTool -from tools.review_code import ReviewCodeTool -from tools.debug_issue import DebugIssueTool - async def run_manual_live_tests(): """Run live tests manually without pytest""" print("šŸš€ Running manual live integration tests...") - + # Check API key if not os.environ.get("GEMINI_API_KEY"): print("āŒ GEMINI_API_KEY not found. Set it to run live tests.") return False - + try: # Test google-genai import - from google import genai - from google.genai import types + print("āœ… google-genai library import successful") - + # Test tool integration - with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: f.write("def hello(): return 'world'") temp_path = f.name - + try: # Test AnalyzeTool tool = AnalyzeTool() - result = await tool.execute({ - "files": [temp_path], - "question": "What does this code do?", - "thinking_mode": "low" - }) - + result = await tool.execute( + { + "files": [temp_path], + "question": "What does this code do?", + "thinking_mode": "low", + } + ) + if result and result[0].text: print("āœ… AnalyzeTool live test successful") else: print("āŒ AnalyzeTool live test failed") return False - - # Test ThinkDeeperTool + + # Test ThinkDeeperTool think_tool = ThinkDeeperTool() - result = await think_tool.execute({ - "current_analysis": "Testing live integration", - "thinking_mode": "minimal" # Fast test - }) - + result = await think_tool.execute( + { + "current_analysis": "Testing live integration", + "thinking_mode": "minimal", # Fast test + } + ) + if result and result[0].text and "Extended Analysis" in result[0].text: print("āœ… ThinkDeeperTool live test successful") else: print("āŒ ThinkDeeperTool live test failed") return False - + finally: Path(temp_path).unlink(missing_ok=True) - + print("\nšŸŽ‰ All manual live tests passed!") print("āœ… google-genai library working correctly") - print("āœ… All tools can make live API calls") + print("āœ… All tools can make live API calls") print("āœ… Thinking modes functioning properly") return True - + except Exception as e: print(f"āŒ Live test failed: {e}") return False @@ -90,4 +90,4 @@ async def run_manual_live_tests(): if __name__ == "__main__": # Run live tests when script is executed directly success = asyncio.run(run_manual_live_tests()) - exit(0 if success else 1) \ No newline at end of file + exit(0 if success else 1) diff --git a/tests/test_server.py b/tests/test_server.py index 04e5fbb..b93656b 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -33,9 +33,7 @@ class TestServerTools: # Check descriptions are verbose for tool in tools: - assert ( - len(tool.description) > 50 - ) # All should have detailed descriptions + assert len(tool.description) > 50 # All should have detailed descriptions @pytest.mark.asyncio async def test_handle_call_tool_unknown(self): @@ -49,8 +47,9 @@ class TestServerTools: """Test chat functionality""" # Set test environment import os + os.environ["PYTEST_CURRENT_TEST"] = "test" - + # Create a mock for the model with patch("tools.base.BaseTool.create_model") as mock_create: mock_model = Mock() @@ -58,9 +57,9 @@ class TestServerTools: candidates=[Mock(content=Mock(parts=[Mock(text="Chat response")]))] ) mock_create.return_value = mock_model - + result = await handle_call_tool("chat", {"prompt": "Hello Gemini"}) - + assert len(result) == 1 assert result[0].text == "Chat response" @@ -69,7 +68,7 @@ class TestServerTools: """Test listing models""" result = await handle_call_tool("list_models", {}) assert len(result) == 1 - + # Check if we got models or an error text = result[0].text if "Error" in text: diff --git a/tests/test_thinking_modes.py b/tests/test_thinking_modes.py index 3805bff..b68fe6e 100644 --- a/tests/test_thinking_modes.py +++ b/tests/test_thinking_modes.py @@ -2,7 +2,6 @@ Tests for thinking_mode functionality across all tools """ -import os from unittest.mock import Mock, patch import pytest @@ -22,7 +21,7 @@ def setup_test_env(): class TestThinkingModes: """Test thinking modes across all tools""" - + def test_default_thinking_modes(self): """Test that tools have correct default thinking modes""" tools = [ @@ -31,35 +30,40 @@ class TestThinkingModes: (ReviewCodeTool(), "medium"), (DebugIssueTool(), "medium"), ] - + for tool, expected_default in tools: - assert tool.get_default_thinking_mode() == expected_default, \ - f"{tool.__class__.__name__} should default to {expected_default}" - + assert ( + tool.get_default_thinking_mode() == expected_default + ), f"{tool.__class__.__name__} should default to {expected_default}" + @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") async def test_thinking_mode_minimal(self, mock_create_model): """Test minimal thinking mode""" mock_model = Mock() mock_model.generate_content.return_value = Mock( - candidates=[Mock(content=Mock(parts=[Mock(text="Minimal thinking response")]))] + candidates=[ + Mock(content=Mock(parts=[Mock(text="Minimal thinking response")])) + ] ) mock_create_model.return_value = mock_model - + tool = AnalyzeTool() - result = await tool.execute({ - "files": ["test.py"], - "question": "What is this?", - "thinking_mode": "minimal" - }) - + result = await tool.execute( + { + "files": ["test.py"], + "question": "What is this?", + "thinking_mode": "minimal", + } + ) + # Verify create_model was called with correct thinking_mode mock_create_model.assert_called_once() args = mock_create_model.call_args[0] assert args[2] == "minimal" # thinking_mode parameter - + assert result[0].text.startswith("Analysis:") - + @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") async def test_thinking_mode_low(self, mock_create_model): @@ -69,43 +73,44 @@ class TestThinkingModes: candidates=[Mock(content=Mock(parts=[Mock(text="Low thinking response")]))] ) mock_create_model.return_value = mock_model - + tool = ReviewCodeTool() - result = await tool.execute({ - "files": ["test.py"], - "thinking_mode": "low" - }) - + result = await tool.execute({"files": ["test.py"], "thinking_mode": "low"}) + # Verify create_model was called with correct thinking_mode mock_create_model.assert_called_once() args = mock_create_model.call_args[0] assert args[2] == "low" - + assert "Code Review" in result[0].text - + @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") async def test_thinking_mode_medium(self, mock_create_model): """Test medium thinking mode (default for most tools)""" mock_model = Mock() mock_model.generate_content.return_value = Mock( - candidates=[Mock(content=Mock(parts=[Mock(text="Medium thinking response")]))] + candidates=[ + Mock(content=Mock(parts=[Mock(text="Medium thinking response")])) + ] ) mock_create_model.return_value = mock_model - + tool = DebugIssueTool() - result = await tool.execute({ - "error_description": "Test error", - # Not specifying thinking_mode, should use default (medium) - }) - + result = await tool.execute( + { + "error_description": "Test error", + # Not specifying thinking_mode, should use default (medium) + } + ) + # Verify create_model was called with default thinking_mode mock_create_model.assert_called_once() args = mock_create_model.call_args[0] assert args[2] == "medium" - + assert "Debug Analysis" in result[0].text - + @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") async def test_thinking_mode_high(self, mock_create_model): @@ -115,19 +120,21 @@ class TestThinkingModes: candidates=[Mock(content=Mock(parts=[Mock(text="High thinking response")]))] ) mock_create_model.return_value = mock_model - + tool = AnalyzeTool() - result = await tool.execute({ - "files": ["complex.py"], - "question": "Analyze architecture", - "thinking_mode": "high" - }) - + await tool.execute( + { + "files": ["complex.py"], + "question": "Analyze architecture", + "thinking_mode": "high", + } + ) + # Verify create_model was called with correct thinking_mode mock_create_model.assert_called_once() args = mock_create_model.call_args[0] assert args[2] == "high" - + @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") async def test_thinking_mode_max(self, mock_create_model): @@ -137,47 +144,58 @@ class TestThinkingModes: candidates=[Mock(content=Mock(parts=[Mock(text="Max thinking response")]))] ) mock_create_model.return_value = mock_model - + tool = ThinkDeeperTool() - result = await tool.execute({ - "current_analysis": "Initial analysis", - # Not specifying thinking_mode, should use default (max) - }) - + result = await tool.execute( + { + "current_analysis": "Initial analysis", + # Not specifying thinking_mode, should use default (max) + } + ) + # Verify create_model was called with default thinking_mode mock_create_model.assert_called_once() args = mock_create_model.call_args[0] assert args[2] == "max" - + assert "Extended Analysis by Gemini" in result[0].text - + def test_thinking_budget_mapping(self): """Test that thinking modes map to correct budget values""" from tools.base import BaseTool - + # Create a simple test tool class TestTool(BaseTool): - def get_name(self): return "test" - def get_description(self): return "test" - def get_input_schema(self): return {} - def get_system_prompt(self): return "test" - def get_request_model(self): return None - async def prepare_prompt(self, request): return "test" - - tool = TestTool() - + def get_name(self): + return "test" + + def get_description(self): + return "test" + + def get_input_schema(self): + return {} + + def get_system_prompt(self): + return "test" + + def get_request_model(self): + return None + + async def prepare_prompt(self, request): + return "test" + # Expected mappings expected_budgets = { "minimal": 128, "low": 2048, "medium": 8192, "high": 16384, - "max": 32768 + "max": 32768, } - + # Check each mode in create_model for mode, expected_budget in expected_budgets.items(): # The budget mapping is inside create_model # We can't easily test it without calling the method # But we've verified the values are correct in the code - pass \ No newline at end of file + pass diff --git a/tests/test_tools.py b/tests/test_tools.py index 96e567b..a65d4ed 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -120,7 +120,9 @@ class TestDebugIssueTool: # Mock model mock_model = Mock() mock_model.generate_content.return_value = Mock( - candidates=[Mock(content=Mock(parts=[Mock(text="Root cause: race condition")]))] + candidates=[ + Mock(content=Mock(parts=[Mock(text="Root cause: race condition")])) + ] ) mock_create_model.return_value = mock_model @@ -157,9 +159,7 @@ class TestAnalyzeTool: @pytest.mark.asyncio @patch("tools.base.BaseTool.create_model") - async def test_execute_with_analysis_type( - self, mock_model, tool, tmp_path - ): + async def test_execute_with_analysis_type(self, mock_model, tool, tmp_path): """Test execution with specific analysis type""" # Create test file test_file = tmp_path / "module.py" @@ -168,9 +168,7 @@ class TestAnalyzeTool: # Mock response mock_response = Mock() mock_response.candidates = [Mock()] - mock_response.candidates[0].content.parts = [ - Mock(text="Architecture analysis") - ] + mock_response.candidates[0].content.parts = [Mock(text="Architecture analysis")] mock_instance = Mock() mock_instance.generate_content.return_value = mock_response diff --git a/tests/test_utils.py b/tests/test_utils.py index 14b9343..29e782e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,8 +2,7 @@ Tests for utility functions """ -from utils import (check_token_limit, estimate_tokens, read_file_content, - read_files) +from utils import check_token_limit, estimate_tokens, read_file_content, read_files class TestFileUtils: @@ -12,9 +11,7 @@ class TestFileUtils: def test_read_file_content_success(self, tmp_path): """Test successful file reading""" test_file = tmp_path / "test.py" - test_file.write_text( - "def hello():\n return 'world'", encoding="utf-8" - ) + test_file.write_text("def hello():\n return 'world'", encoding="utf-8") content, tokens = read_file_content(str(test_file)) assert "--- BEGIN FILE:" in content @@ -71,18 +68,18 @@ class TestFileUtils: (tmp_path / "file1.py").write_text("print('file1')", encoding="utf-8") (tmp_path / "file2.js").write_text("console.log('file2')", encoding="utf-8") (tmp_path / "readme.md").write_text("# README", encoding="utf-8") - + # Create subdirectory subdir = tmp_path / "src" subdir.mkdir() (subdir / "module.py").write_text("class Module: pass", encoding="utf-8") - + # Create hidden file (should be skipped) (tmp_path / ".hidden").write_text("secret", encoding="utf-8") - + # Read the directory content, summary = read_files([str(tmp_path)]) - + # Check files are included assert "file1.py" in content assert "file2.js" in content @@ -90,17 +87,17 @@ class TestFileUtils: # Handle both forward and backslashes for cross-platform compatibility assert "module.py" in content assert "class Module: pass" in content - + # Check content assert "print('file1')" in content assert "console.log('file2')" in content assert "# README" in content assert "class Module: pass" in content - + # Hidden file should not be included assert ".hidden" not in content assert "secret" not in content - + # Check summary assert "Processed 1 dir(s)" in summary assert "Read 4 file(s)" in summary @@ -110,23 +107,23 @@ class TestFileUtils: # Create files file1 = tmp_path / "direct.py" file1.write_text("# Direct file", encoding="utf-8") - + # Create directory with files subdir = tmp_path / "subdir" subdir.mkdir() (subdir / "sub1.py").write_text("# Sub file 1", encoding="utf-8") (subdir / "sub2.py").write_text("# Sub file 2", encoding="utf-8") - + # Read mix of direct file and directory content, summary = read_files([str(file1), str(subdir)]) - + assert "direct.py" in content assert "sub1.py" in content assert "sub2.py" in content assert "# Direct file" in content assert "# Sub file 1" in content assert "# Sub file 2" in content - + assert "Processed 1 dir(s)" in summary assert "Read 3 file(s)" in summary @@ -135,19 +132,19 @@ class TestFileUtils: # Create files with known token counts # ~250 tokens each (1000 chars) large_content = "x" * 1000 - + for i in range(5): (tmp_path / f"file{i}.txt").write_text(large_content, encoding="utf-8") - + # Read with small token limit (should skip some files) # Reserve 50k tokens, limit to 51k total = 1k available # Each file ~250 tokens, so should read ~3-4 files content, summary = read_files([str(tmp_path)], max_tokens=51_000) - + assert "Skipped" in summary assert "token limit" in summary assert "--- SKIPPED FILES (TOKEN LIMIT) ---" in content - + # Count how many files were read read_count = content.count("--- BEGIN FILE:") assert 2 <= read_count <= 4 # Should read some but not all @@ -157,9 +154,9 @@ class TestFileUtils: # Create a file larger than max_size (1MB) large_file = tmp_path / "large.txt" large_file.write_text("x" * 2_000_000, encoding="utf-8") # 2MB - + content, summary = read_files([str(large_file)]) - + assert "--- FILE TOO LARGE:" in content assert "2,000,000 bytes" in content assert "Read 1 file(s)" in summary # File is counted but shows error message @@ -171,13 +168,13 @@ class TestFileUtils: (tmp_path / "style.css").write_text("css", encoding="utf-8") (tmp_path / "binary.exe").write_text("exe", encoding="utf-8") (tmp_path / "image.jpg").write_text("jpg", encoding="utf-8") - + content, summary = read_files([str(tmp_path)]) - + # Code files should be included assert "code.py" in content assert "style.css" in content - + # Binary files should not be included (not in CODE_EXTENSIONS) assert "binary.exe" not in content assert "image.jpg" not in content diff --git a/tools/base.py b/tools/base.py index beac23c..7775dbb 100644 --- a/tools/base.py +++ b/tools/base.py @@ -11,6 +11,7 @@ from google.genai import types from mcp.types import TextContent from pydantic import BaseModel, Field + class ToolRequest(BaseModel): """Base request model for all tools""" @@ -21,7 +22,8 @@ class ToolRequest(BaseModel): None, description="Temperature for response (tool-specific defaults)" ) thinking_mode: Optional[Literal["minimal", "low", "medium", "high", "max"]] = Field( - None, description="Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768)" + None, + description="Thinking depth: minimal (128), low (2048), medium (8192), high (16384), max (32768)", ) @@ -128,15 +130,15 @@ class BaseTool(ABC): """Create a configured Gemini model with thinking configuration""" # Map thinking modes to budget values thinking_budgets = { - "minimal": 128, # Minimum for 2.5 Pro + "minimal": 128, # Minimum for 2.5 Pro "low": 2048, "medium": 8192, "high": 16384, - "max": 32768 + "max": 32768, } - + thinking_budget = thinking_budgets.get(thinking_mode, 8192) - + # For models supporting thinking config, use the new API # Skip in test environment to allow mocking if "2.5" in model_name and not os.environ.get("PYTEST_CURRENT_TEST"): @@ -145,17 +147,19 @@ class BaseTool(ABC): api_key = os.environ.get("GEMINI_API_KEY") if not api_key: raise ValueError("GEMINI_API_KEY environment variable is required") - + client = genai.Client(api_key=api_key) - + # Create a wrapper to match the expected interface class ModelWrapper: - def __init__(self, client, model_name, temperature, thinking_budget): + def __init__( + self, client, model_name, temperature, thinking_budget + ): self.client = client self.model_name = model_name self.temperature = temperature self.thinking_budget = thinking_budget - + def generate_content(self, prompt): response = self.client.models.generate_content( model=self.model_name, @@ -163,43 +167,62 @@ class BaseTool(ABC): config=types.GenerateContentConfig( temperature=self.temperature, candidate_count=1, - thinking_config=types.ThinkingConfig(thinking_budget=self.thinking_budget) + thinking_config=types.ThinkingConfig( + thinking_budget=self.thinking_budget + ), ), ) + # Convert to match expected format class ResponseWrapper: def __init__(self, text): self.text = text - self.candidates = [type('obj', (object,), { - 'content': type('obj', (object,), { - 'parts': [type('obj', (object,), {'text': text})] - })(), - 'finish_reason': 'STOP' - })] - + self.candidates = [ + type( + "obj", + (object,), + { + "content": type( + "obj", + (object,), + { + "parts": [ + type( + "obj", + (object,), + {"text": text}, + ) + ] + }, + )(), + "finish_reason": "STOP", + }, + ) + ] + return ResponseWrapper(response.text) - + return ModelWrapper(client, model_name, temperature, thinking_budget) - - except Exception as e: + + except Exception: # Fall back to regular genai model if new API fails pass - + # For non-2.5 models or if thinking not needed, use regular API # Get API key api_key = os.environ.get("GEMINI_API_KEY") if not api_key: raise ValueError("GEMINI_API_KEY environment variable is required") - + client = genai.Client(api_key=api_key) - + # Create wrapper for consistency class SimpleModelWrapper: def __init__(self, client, model_name, temperature): self.client = client self.model_name = model_name self.temperature = temperature - + def generate_content(self, prompt): response = self.client.models.generate_content( model=self.model_name, @@ -209,18 +232,30 @@ class BaseTool(ABC): candidate_count=1, ), ) - + # Convert to match expected format class ResponseWrapper: def __init__(self, text): self.text = text - self.candidates = [type('obj', (object,), { - 'content': type('obj', (object,), { - 'parts': [type('obj', (object,), {'text': text})] - })(), - 'finish_reason': 'STOP' - })] - + self.candidates = [ + type( + "obj", + (object,), + { + "content": type( + "obj", + (object,), + { + "parts": [ + type("obj", (object,), {"text": text}) + ] + }, + )(), + "finish_reason": "STOP", + }, + ) + ] + return ResponseWrapper(response.text) - + return SimpleModelWrapper(client, model_name, temperature) diff --git a/tools/debug_issue.py b/tools/debug_issue.py index 9bf39e1..1db99d1 100644 --- a/tools/debug_issue.py +++ b/tools/debug_issue.py @@ -146,8 +146,6 @@ Focus on finding the root cause and providing actionable solutions.""" return full_prompt - def format_response( - self, response: str, request: DebugIssueRequest - ) -> str: + def format_response(self, response: str, request: DebugIssueRequest) -> str: """Format the debugging response""" return f"Debug Analysis\n{'=' * 50}\n\n{response}" diff --git a/tools/review_code.py b/tools/review_code.py index 0de8a02..11f619f 100644 --- a/tools/review_code.py +++ b/tools/review_code.py @@ -130,14 +130,10 @@ class ReviewCodeTool(BaseTool): ) if request.focus_on: - review_focus.append( - f"Pay special attention to: {request.focus_on}" - ) + review_focus.append(f"Pay special attention to: {request.focus_on}") if request.standards: - review_focus.append( - f"Enforce these standards: {request.standards}" - ) + review_focus.append(f"Enforce these standards: {request.standards}") if request.severity_filter != "all": review_focus.append( @@ -159,9 +155,7 @@ Please provide a comprehensive code review following the format specified in the return full_prompt - def format_response( - self, response: str, request: ReviewCodeRequest - ) -> str: + def format_response(self, response: str, request: ReviewCodeRequest) -> str: """Format the review response""" header = f"Code Review ({request.review_type.upper()})" if request.focus_on: diff --git a/tools/think_deeper.py b/tools/think_deeper.py index db84b7d..178d242 100644 --- a/tools/think_deeper.py +++ b/tools/think_deeper.py @@ -130,7 +130,9 @@ class ThinkDeeperTool(BaseTool): focus_instruction = "" if request.focus_areas: areas = ", ".join(request.focus_areas) - focus_instruction = f"\n\nFOCUS AREAS: Please pay special attention to {areas} aspects." + focus_instruction = ( + f"\n\nFOCUS AREAS: Please pay special attention to {areas} aspects." + ) # Combine system prompt with context full_prompt = f"""{self.get_system_prompt()}{focus_instruction} @@ -146,8 +148,6 @@ Please provide deep analysis that extends Claude's thinking with: return full_prompt - def format_response( - self, response: str, request: ThinkDeeperRequest - ) -> str: + def format_response(self, response: str, request: ThinkDeeperRequest) -> str: """Format the response with clear attribution""" return f"Extended Analysis by Gemini:\n\n{response}" diff --git a/utils/file_utils.py b/utils/file_utils.py index 3f18ab0..25a4e9f 100644 --- a/utils/file_utils.py +++ b/utils/file_utils.py @@ -11,63 +11,106 @@ from .token_utils import estimate_tokens, MAX_CONTEXT_TOKENS # Common code file extensions CODE_EXTENSIONS = { - '.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.cpp', '.c', '.h', '.hpp', - '.cs', '.go', '.rs', '.rb', '.php', '.swift', '.kt', '.scala', '.r', '.m', - '.mm', '.sql', '.sh', '.bash', '.zsh', '.fish', '.ps1', '.bat', '.cmd', - '.yml', '.yaml', '.json', '.xml', '.toml', '.ini', '.cfg', '.conf', - '.txt', '.md', '.rst', '.tex', '.html', '.css', '.scss', '.sass', '.less' + ".py", + ".js", + ".ts", + ".jsx", + ".tsx", + ".java", + ".cpp", + ".c", + ".h", + ".hpp", + ".cs", + ".go", + ".rs", + ".rb", + ".php", + ".swift", + ".kt", + ".scala", + ".r", + ".m", + ".mm", + ".sql", + ".sh", + ".bash", + ".zsh", + ".fish", + ".ps1", + ".bat", + ".cmd", + ".yml", + ".yaml", + ".json", + ".xml", + ".toml", + ".ini", + ".cfg", + ".conf", + ".txt", + ".md", + ".rst", + ".tex", + ".html", + ".css", + ".scss", + ".sass", + ".less", } def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> List[str]: """ Expand paths to individual files, handling both files and directories. - + Args: paths: List of file or directory paths extensions: Optional set of file extensions to include - + Returns: List of individual file paths """ if extensions is None: extensions = CODE_EXTENSIONS - + expanded_files = [] seen = set() - + for path in paths: path_obj = Path(path) - + if not path_obj.exists(): continue - + if path_obj.is_file(): # Add file directly if str(path_obj) not in seen: expanded_files.append(str(path_obj)) seen.add(str(path_obj)) - + elif path_obj.is_dir(): # Walk directory recursively for root, dirs, files in os.walk(path_obj): # Skip hidden directories and __pycache__ - dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__'] - + dirs[:] = [ + d for d in dirs if not d.startswith(".") and d != "__pycache__" + ] + for file in files: # Skip hidden files - if file.startswith('.'): + if file.startswith("."): continue - + file_path = Path(root) / file - + # Check extension if not extensions or file_path.suffix.lower() in extensions: full_path = str(file_path) if full_path not in seen: expanded_files.append(full_path) seen.add(full_path) - + # Sort for consistent ordering expanded_files.sort() return expanded_files @@ -76,11 +119,11 @@ def expand_paths(paths: List[str], extensions: Optional[Set[str]] = None) -> Lis def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, int]: """ Read a single file and format it for Gemini. - + Args: file_path: Path to file max_size: Maximum file size to read - + Returns: (formatted_content, estimated_tokens) """ @@ -116,40 +159,42 @@ def read_file_content(file_path: str, max_size: int = 1_000_000) -> Tuple[str, i def read_files( - file_paths: List[str], + file_paths: List[str], code: Optional[str] = None, max_tokens: Optional[int] = None, - reserve_tokens: int = 50_000 + reserve_tokens: int = 50_000, ) -> Tuple[str, str]: """ Read multiple files and optional direct code with smart token management. - + Args: file_paths: List of file or directory paths code: Optional direct code to include max_tokens: Maximum tokens to use (defaults to MAX_CONTEXT_TOKENS) reserve_tokens: Tokens to reserve for prompt and response - - Returns: + + Returns: (full_content, brief_summary) """ if max_tokens is None: max_tokens = MAX_CONTEXT_TOKENS - + content_parts = [] summary_parts = [] total_tokens = 0 available_tokens = max_tokens - reserve_tokens - + files_read = [] files_skipped = [] dirs_processed = [] - + # First, handle direct code if provided if code: - formatted_code = f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n" + formatted_code = ( + f"\n--- BEGIN DIRECT CODE ---\n{code}\n--- END DIRECT CODE ---\n" + ) code_tokens = estimate_tokens(formatted_code) - + if code_tokens <= available_tokens: content_parts.append(formatted_code) total_tokens += code_tokens @@ -158,29 +203,31 @@ def read_files( summary_parts.append(f"Direct code: {code_preview}") else: summary_parts.append("Direct code skipped (too large)") - + # Expand all paths to get individual files if file_paths: # Track which paths are directories for path in file_paths: if Path(path).is_dir(): dirs_processed.append(path) - + # Expand to get all files all_files = expand_paths(file_paths) - + if not all_files and file_paths: # No files found but paths were provided - content_parts.append(f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n") + content_parts.append( + f"\n--- NO FILES FOUND ---\nProvided paths: {', '.join(file_paths)}\n--- END ---\n" + ) else: # Read files up to token limit for file_path in all_files: if total_tokens >= available_tokens: files_skipped.append(file_path) continue - + file_content, file_tokens = read_file_content(file_path) - + # Check if adding this file would exceed limit if total_tokens + file_tokens <= available_tokens: content_parts.append(file_content) @@ -188,7 +235,7 @@ def read_files( files_read.append(file_path) else: files_skipped.append(file_path) - + # Build summary if dirs_processed: summary_parts.append(f"Processed {len(dirs_processed)} dir(s)") @@ -198,10 +245,10 @@ def read_files( summary_parts.append(f"Skipped {len(files_skipped)} file(s) (token limit)") if total_tokens > 0: summary_parts.append(f"~{total_tokens:,} tokens used") - + # Add skipped files note if any were skipped if files_skipped: - skip_note = f"\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n" + skip_note = "\n\n--- SKIPPED FILES (TOKEN LIMIT) ---\n" skip_note += f"Total skipped: {len(files_skipped)}\n" # Show first 10 skipped files for i, file_path in enumerate(files_skipped[:10]): @@ -210,8 +257,8 @@ def read_files( skip_note += f" ... and {len(files_skipped) - 10} more\n" skip_note += "--- END SKIPPED FILES ---\n" content_parts.append(skip_note) - + full_content = "\n\n".join(content_parts) if content_parts else "" summary = " | ".join(summary_parts) if summary_parts else "No input provided" - + return full_content, summary