fix: resolve Python 3.8/3.9 compatibility and linting issues

- Replace tuple[str, str] with Tuple[str, str] for Python 3.8 compatibility
- Remove unused imports (Union, NotificationOptions)
- Fix line length issues by breaking long lines
- Add verbose_output field to analyze_code tool schema
- Apply black and isort formatting
- All tests pass and linting issues resolved

This should fix the GitHub Actions failures on Python 3.8 and 3.9.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Fahad
2025-06-08 20:54:03 +04:00
parent 22d387a858
commit c2c80dd828

View File

@@ -4,25 +4,26 @@ Gemini MCP Server - Model Context Protocol server for Google Gemini
Enhanced for large-scale code analysis with 1M token context window Enhanced for large-scale code analysis with 1M token context window
""" """
import os
import json
import asyncio import asyncio
from typing import Optional, Dict, Any, List, Union import json
import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import google.generativeai as genai
from mcp.server import Server
from mcp.server.models import InitializationOptions from mcp.server.models import InitializationOptions
from mcp.server import Server, NotificationOptions
from mcp.server.stdio import stdio_server from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool from mcp.types import TextContent, Tool
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
import google.generativeai as genai
# Default to Gemini 2.5 Pro Preview with maximum context # Default to Gemini 2.5 Pro Preview with maximum context
DEFAULT_MODEL = "gemini-2.5-pro-preview-06-05" DEFAULT_MODEL = "gemini-2.5-pro-preview-06-05"
MAX_CONTEXT_TOKENS = 1000000 # 1M tokens MAX_CONTEXT_TOKENS = 1000000 # 1M tokens
# Developer-focused system prompt for Claude Code usage # Developer-focused system prompt for Claude Code usage
DEVELOPER_SYSTEM_PROMPT = """You are an expert software developer assistant working alongside Claude Code. Your role is to extend Claude's capabilities when handling large codebases or complex analysis tasks. DEVELOPER_SYSTEM_PROMPT = """You are an expert software developer assistant working alongside Claude Code. \
Your role is to extend Claude's capabilities when handling large codebases or complex analysis tasks.
Core competencies: Core competencies:
- Deep understanding of software architecture and design patterns - Deep understanding of software architecture and design patterns
@@ -43,28 +44,55 @@ Your approach:
- When reviewing code, prioritize critical issues first - When reviewing code, prioritize critical issues first
- Always validate your suggestions against best practices - Always validate your suggestions against best practices
Remember: You're augmenting Claude Code's capabilities, especially for tasks requiring extensive context or deep analysis that might exceed Claude's token limits.""" Remember: You're augmenting Claude Code's capabilities, especially for tasks requiring \
extensive context or deep analysis that might exceed Claude's token limits."""
class GeminiChatRequest(BaseModel): class GeminiChatRequest(BaseModel):
"""Request model for Gemini chat""" """Request model for Gemini chat"""
prompt: str = Field(..., description="The prompt to send to Gemini") prompt: str = Field(..., description="The prompt to send to Gemini")
system_prompt: Optional[str] = Field(None, description="Optional system prompt for context") system_prompt: Optional[str] = Field(
max_tokens: Optional[int] = Field(8192, description="Maximum number of tokens in response") None, description="Optional system prompt for context"
temperature: Optional[float] = Field(0.5, description="Temperature for response randomness (0-1, default 0.5 for balanced accuracy/creativity)") )
model: Optional[str] = Field(DEFAULT_MODEL, description=f"Model to use (defaults to {DEFAULT_MODEL})") max_tokens: Optional[int] = Field(
8192, description="Maximum number of tokens in response"
)
temperature: Optional[float] = Field(
0.5,
description="Temperature for response randomness (0-1, default 0.5 for balanced accuracy/creativity)",
)
model: Optional[str] = Field(
DEFAULT_MODEL, description=f"Model to use (defaults to {DEFAULT_MODEL})"
)
class CodeAnalysisRequest(BaseModel): class CodeAnalysisRequest(BaseModel):
"""Request model for code analysis""" """Request model for code analysis"""
files: Optional[List[str]] = Field(None, description="List of file paths to analyze")
files: Optional[List[str]] = Field(
None, description="List of file paths to analyze"
)
code: Optional[str] = Field(None, description="Direct code content to analyze") code: Optional[str] = Field(None, description="Direct code content to analyze")
question: str = Field(..., description="Question or analysis request about the code") question: str = Field(
system_prompt: Optional[str] = Field(None, description="Optional system prompt for context") ..., description="Question or analysis request about the code"
max_tokens: Optional[int] = Field(8192, description="Maximum number of tokens in response") )
temperature: Optional[float] = Field(0.2, description="Temperature for code analysis (0-1, default 0.2 for high accuracy)") system_prompt: Optional[str] = Field(
model: Optional[str] = Field(DEFAULT_MODEL, description=f"Model to use (defaults to {DEFAULT_MODEL})") None, description="Optional system prompt for context"
verbose_output: Optional[bool] = Field(False, description="Show file contents in terminal output") )
max_tokens: Optional[int] = Field(
8192, description="Maximum number of tokens in response"
)
temperature: Optional[float] = Field(
0.2,
description="Temperature for code analysis (0-1, default 0.2 for high accuracy)",
)
model: Optional[str] = Field(
DEFAULT_MODEL, description=f"Model to use (defaults to {DEFAULT_MODEL})"
)
verbose_output: Optional[bool] = Field(
False, description="Show file contents in terminal output"
)
# Create the MCP server instance # Create the MCP server instance
@@ -88,30 +116,32 @@ def read_file_content(file_path: str) -> str:
return f"Error: File not found: {file_path}" return f"Error: File not found: {file_path}"
if not path.is_file(): if not path.is_file():
return f"Error: Not a file: {file_path}" return f"Error: Not a file: {file_path}"
# Read the file # Read the file
with open(path, 'r', encoding='utf-8') as f: with open(path, "r", encoding="utf-8") as f:
content = f.read() content = f.read()
return f"=== File: {file_path} ===\n{content}\n" return f"=== File: {file_path} ===\n{content}\n"
except Exception as e: except Exception as e:
return f"Error reading {file_path}: {str(e)}" return f"Error reading {file_path}: {str(e)}"
def prepare_code_context(files: Optional[List[str]], code: Optional[str], verbose: bool = False) -> tuple[str, str]: def prepare_code_context(
files: Optional[List[str]], code: Optional[str], verbose: bool = False
) -> Tuple[str, str]:
"""Prepare code context from files and/or direct code """Prepare code context from files and/or direct code
Returns: (context_for_gemini, summary_for_terminal) Returns: (context_for_gemini, summary_for_terminal)
""" """
context_parts = [] context_parts = []
summary_parts = [] summary_parts = []
# Add file contents # Add file contents
if files: if files:
summary_parts.append(f"Analyzing {len(files)} file(s):") summary_parts.append(f"Analyzing {len(files)} file(s):")
for file_path in files: for file_path in files:
content = read_file_content(file_path) content = read_file_content(file_path)
context_parts.append(content) context_parts.append(content)
# For summary, just show file path and size # For summary, just show file path and size
path = Path(file_path) path = Path(file_path)
if path.exists() and path.is_file(): if path.exists() and path.is_file():
@@ -119,15 +149,15 @@ def prepare_code_context(files: Optional[List[str]], code: Optional[str], verbos
summary_parts.append(f" - {file_path} ({size:,} bytes)") summary_parts.append(f" - {file_path} ({size:,} bytes)")
else: else:
summary_parts.append(f" - {file_path} (not found)") summary_parts.append(f" - {file_path} (not found)")
# Add direct code # Add direct code
if code: if code:
context_parts.append("=== Direct Code ===\n" + code + "\n") context_parts.append("=== Direct Code ===\n" + code + "\n")
summary_parts.append(f"Direct code provided ({len(code):,} characters)") summary_parts.append(f"Direct code provided ({len(code):,} characters)")
full_context = "\n".join(context_parts) full_context = "\n".join(context_parts)
summary = "\n".join(summary_parts) if not verbose else full_context summary = "\n".join(summary_parts) if not verbose else full_context
return full_context, summary return full_context, summary
@@ -143,32 +173,33 @@ async def handle_list_tools() -> List[Tool]:
"properties": { "properties": {
"prompt": { "prompt": {
"type": "string", "type": "string",
"description": "The prompt to send to Gemini" "description": "The prompt to send to Gemini",
}, },
"system_prompt": { "system_prompt": {
"type": "string", "type": "string",
"description": "Optional system prompt for context" "description": "Optional system prompt for context",
}, },
"max_tokens": { "max_tokens": {
"type": "integer", "type": "integer",
"description": "Maximum number of tokens in response", "description": "Maximum number of tokens in response",
"default": 8192 "default": 8192,
}, },
"temperature": { "temperature": {
"type": "number", "type": "number",
"description": "Temperature for response randomness (0-1, default 0.5 for balanced accuracy/creativity)", "description": "Temperature for response randomness (0-1, default 0.5 for "
"balanced accuracy/creativity)",
"default": 0.5, "default": 0.5,
"minimum": 0, "minimum": 0,
"maximum": 1 "maximum": 1,
}, },
"model": { "model": {
"type": "string", "type": "string",
"description": f"Model to use (defaults to {DEFAULT_MODEL})", "description": f"Model to use (defaults to {DEFAULT_MODEL})",
"default": DEFAULT_MODEL "default": DEFAULT_MODEL,
} },
}, },
"required": ["prompt"] "required": ["prompt"],
} },
), ),
Tool( Tool(
name="analyze_code", name="analyze_code",
@@ -179,60 +210,62 @@ async def handle_list_tools() -> List[Tool]:
"files": { "files": {
"type": "array", "type": "array",
"items": {"type": "string"}, "items": {"type": "string"},
"description": "List of file paths to analyze" "description": "List of file paths to analyze",
}, },
"code": { "code": {
"type": "string", "type": "string",
"description": "Direct code content to analyze (alternative to files)" "description": "Direct code content to analyze (alternative to files)",
}, },
"question": { "question": {
"type": "string", "type": "string",
"description": "Question or analysis request about the code" "description": "Question or analysis request about the code",
}, },
"system_prompt": { "system_prompt": {
"type": "string", "type": "string",
"description": "Optional system prompt for context" "description": "Optional system prompt for context",
}, },
"max_tokens": { "max_tokens": {
"type": "integer", "type": "integer",
"description": "Maximum number of tokens in response", "description": "Maximum number of tokens in response",
"default": 8192 "default": 8192,
}, },
"temperature": { "temperature": {
"type": "number", "type": "number",
"description": "Temperature for code analysis (0-1, default 0.2 for high accuracy)", "description": "Temperature for code analysis (0-1, default 0.2 for high accuracy)",
"default": 0.2, "default": 0.2,
"minimum": 0, "minimum": 0,
"maximum": 1 "maximum": 1,
}, },
"model": { "model": {
"type": "string", "type": "string",
"description": f"Model to use (defaults to {DEFAULT_MODEL})", "description": f"Model to use (defaults to {DEFAULT_MODEL})",
"default": DEFAULT_MODEL "default": DEFAULT_MODEL,
} },
"verbose_output": {
"type": "boolean",
"description": "Show file contents in terminal output",
"default": False,
},
}, },
"required": ["question"] "required": ["question"],
} },
), ),
Tool( Tool(
name="list_models", name="list_models",
description="List available Gemini models", description="List available Gemini models",
inputSchema={ inputSchema={"type": "object", "properties": {}},
"type": "object", ),
"properties": {}
}
)
] ]
@server.call_tool() @server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool execution requests""" """Handle tool execution requests"""
if name == "chat": if name == "chat":
# Validate request # Validate request
request = GeminiChatRequest(**arguments) request = GeminiChatRequest(**arguments)
try: try:
# Use the specified model with optimized settings # Use the specified model with optimized settings
model = genai.GenerativeModel( model = genai.GenerativeModel(
@@ -241,61 +274,68 @@ async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextCon
"temperature": request.temperature, "temperature": request.temperature,
"max_output_tokens": request.max_tokens, "max_output_tokens": request.max_tokens,
"candidate_count": 1, "candidate_count": 1,
} },
) )
# Prepare the prompt with automatic developer context if no system prompt provided # Prepare the prompt with automatic developer context if no system prompt provided
if request.system_prompt: if request.system_prompt:
full_prompt = f"{request.system_prompt}\n\n{request.prompt}" full_prompt = f"{request.system_prompt}\n\n{request.prompt}"
else: else:
# Auto-inject developer system prompt for better Claude Code integration # Auto-inject developer system prompt for better Claude Code integration
full_prompt = f"{DEVELOPER_SYSTEM_PROMPT}\n\n{request.prompt}" full_prompt = f"{DEVELOPER_SYSTEM_PROMPT}\n\n{request.prompt}"
# Generate response # Generate response
response = model.generate_content(full_prompt) response = model.generate_content(full_prompt)
# Handle response based on finish reason # Handle response based on finish reason
if response.candidates and response.candidates[0].content.parts: if response.candidates and response.candidates[0].content.parts:
text = response.candidates[0].content.parts[0].text text = response.candidates[0].content.parts[0].text
else: else:
# Handle safety filters or other issues # Handle safety filters or other issues
finish_reason = response.candidates[0].finish_reason if response.candidates else "Unknown" finish_reason = (
response.candidates[0].finish_reason
if response.candidates
else "Unknown"
)
text = f"Response blocked or incomplete. Finish reason: {finish_reason}" text = f"Response blocked or incomplete. Finish reason: {finish_reason}"
return [TextContent( return [TextContent(type="text", text=text)]
type="text",
text=text
)]
except Exception as e: except Exception as e:
return [TextContent( return [
type="text", TextContent(type="text", text=f"Error calling Gemini API: {str(e)}")
text=f"Error calling Gemini API: {str(e)}" ]
)]
elif name == "analyze_code": elif name == "analyze_code":
# Validate request # Validate request
request = CodeAnalysisRequest(**arguments) request = CodeAnalysisRequest(**arguments)
# Check that we have either files or code # Check that we have either files or code
if not request.files and not request.code: if not request.files and not request.code:
return [TextContent( return [
type="text", TextContent(
text="Error: Must provide either 'files' or 'code' parameter" type="text",
)] text="Error: Must provide either 'files' or 'code' parameter",
)
]
try: try:
# Prepare code context # Prepare code context
code_context, summary = prepare_code_context(request.files, request.code, request.verbose_output) code_context, summary = prepare_code_context(
request.files, request.code, request.verbose_output
)
# Count approximate tokens (rough estimate: 1 token ≈ 4 characters) # Count approximate tokens (rough estimate: 1 token ≈ 4 characters)
estimated_tokens = len(code_context) // 4 estimated_tokens = len(code_context) // 4
if estimated_tokens > MAX_CONTEXT_TOKENS: if estimated_tokens > MAX_CONTEXT_TOKENS:
return [TextContent( return [
type="text", TextContent(
text=f"Error: Code context too large (~{estimated_tokens:,} tokens). Maximum is {MAX_CONTEXT_TOKENS:,} tokens." type="text",
)] text=f"Error: Code context too large (~{estimated_tokens:,} tokens). "
f"Maximum is {MAX_CONTEXT_TOKENS:,} tokens.",
)
]
# Use the specified model with optimized settings for code analysis # Use the specified model with optimized settings for code analysis
model = genai.GenerativeModel( model = genai.GenerativeModel(
model_name=request.model, model_name=request.model,
@@ -303,90 +343,80 @@ async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextCon
"temperature": request.temperature, "temperature": request.temperature,
"max_output_tokens": request.max_tokens, "max_output_tokens": request.max_tokens,
"candidate_count": 1, "candidate_count": 1,
} },
) )
# Prepare the full prompt with enhanced developer context # Prepare the full prompt with enhanced developer context
system_prompt = request.system_prompt or DEVELOPER_SYSTEM_PROMPT system_prompt = request.system_prompt or DEVELOPER_SYSTEM_PROMPT
full_prompt = f"{system_prompt}\n\nCode to analyze:\n\n{code_context}\n\nQuestion/Request: {request.question}" full_prompt = (
f"{system_prompt}\n\nCode to analyze:\n\n{code_context}\n\n"
f"Question/Request: {request.question}"
)
# Generate response # Generate response
response = model.generate_content(full_prompt) response = model.generate_content(full_prompt)
# Handle response # Handle response
if response.candidates and response.candidates[0].content.parts: if response.candidates and response.candidates[0].content.parts:
text = response.candidates[0].content.parts[0].text text = response.candidates[0].content.parts[0].text
else: else:
finish_reason = response.candidates[0].finish_reason if response.candidates else "Unknown" finish_reason = (
response.candidates[0].finish_reason
if response.candidates
else "Unknown"
)
text = f"Response blocked or incomplete. Finish reason: {finish_reason}" text = f"Response blocked or incomplete. Finish reason: {finish_reason}"
# Return response with summary if not verbose # Return response with summary if not verbose
if not request.verbose_output and request.files: if not request.verbose_output and request.files:
response_text = f"{summary}\n\nGemini's response:\n{text}" response_text = f"{summary}\n\nGemini's response:\n{text}"
else: else:
response_text = text response_text = text
return [TextContent( return [TextContent(type="text", text=response_text)]
type="text",
text=response_text
)]
except Exception as e: except Exception as e:
return [TextContent( return [TextContent(type="text", text=f"Error analyzing code: {str(e)}")]
type="text",
text=f"Error analyzing code: {str(e)}"
)]
elif name == "list_models": elif name == "list_models":
try: try:
# List available models # List available models
models = [] models = []
for model in genai.list_models(): for model in genai.list_models():
if 'generateContent' in model.supported_generation_methods: if "generateContent" in model.supported_generation_methods:
models.append({ models.append(
"name": model.name, {
"display_name": model.display_name, "name": model.name,
"description": model.description, "display_name": model.display_name,
"is_default": model.name == DEFAULT_MODEL "description": model.description,
}) "is_default": model.name == DEFAULT_MODEL,
}
return [TextContent( )
type="text",
text=json.dumps(models, indent=2) return [TextContent(type="text", text=json.dumps(models, indent=2))]
)]
except Exception as e: except Exception as e:
return [TextContent( return [TextContent(type="text", text=f"Error listing models: {str(e)}")]
type="text",
text=f"Error listing models: {str(e)}"
)]
else: else:
return [TextContent( return [TextContent(type="text", text=f"Unknown tool: {name}")]
type="text",
text=f"Unknown tool: {name}"
)]
async def main(): async def main():
"""Main entry point for the server""" """Main entry point for the server"""
# Configure Gemini API # Configure Gemini API
configure_gemini() configure_gemini()
# Run the server using stdio transport # Run the server using stdio transport
async with stdio_server() as (read_stream, write_stream): async with stdio_server() as (read_stream, write_stream):
await server.run( await server.run(
read_stream, read_stream,
write_stream, write_stream,
InitializationOptions( InitializationOptions(
server_name="gemini", server_name="gemini", server_version="2.0.0", capabilities={"tools": {}}
server_version="2.0.0", ),
capabilities={
"tools": {}
}
)
) )
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())