1331 lines
55 KiB
Python
1331 lines
55 KiB
Python
"""
|
|
Zen MCP Server - Main server implementation
|
|
|
|
This module implements the core MCP (Model Context Protocol) server that provides
|
|
AI-powered tools for code analysis, review, and assistance using multiple AI models.
|
|
|
|
The server follows the MCP specification to expose various AI tools as callable functions
|
|
that can be used by MCP clients (like Claude). Each tool provides specialized functionality
|
|
such as code review, debugging, deep thinking, and general chat capabilities.
|
|
|
|
Key Components:
|
|
- MCP Server: Handles protocol communication and tool discovery
|
|
- Tool Registry: Maps tool names to their implementations
|
|
- Request Handler: Processes incoming tool calls and returns formatted responses
|
|
- Configuration: Manages API keys and model settings
|
|
|
|
The server runs on stdio (standard input/output) and communicates using JSON-RPC messages
|
|
as defined by the MCP protocol.
|
|
"""
|
|
|
|
import asyncio
|
|
import atexit
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
# Try to load environment variables from .env file if dotenv is available
|
|
# This is optional - environment variables can still be passed directly
|
|
try:
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables from .env file in the script's directory
|
|
# This ensures .env is loaded regardless of the current working directory
|
|
script_dir = Path(__file__).parent
|
|
env_file = script_dir / ".env"
|
|
load_dotenv(dotenv_path=env_file)
|
|
except ImportError:
|
|
# dotenv not available - this is fine, environment variables can still be passed directly
|
|
# This commonly happens when running via uvx or in minimal environments
|
|
pass
|
|
|
|
from mcp.server import Server # noqa: E402
|
|
from mcp.server.models import InitializationOptions # noqa: E402
|
|
from mcp.server.stdio import stdio_server # noqa: E402
|
|
from mcp.types import ( # noqa: E402
|
|
GetPromptResult,
|
|
Prompt,
|
|
PromptMessage,
|
|
PromptsCapability,
|
|
ServerCapabilities,
|
|
TextContent,
|
|
Tool,
|
|
ToolAnnotations,
|
|
ToolsCapability,
|
|
)
|
|
|
|
from config import ( # noqa: E402
|
|
DEFAULT_MODEL,
|
|
__version__,
|
|
)
|
|
from tools import ( # noqa: E402
|
|
AnalyzeTool,
|
|
ChallengeTool,
|
|
ChatTool,
|
|
CodeReviewTool,
|
|
ConsensusTool,
|
|
DebugIssueTool,
|
|
DocgenTool,
|
|
ListModelsTool,
|
|
PlannerTool,
|
|
PrecommitTool,
|
|
RefactorTool,
|
|
SecauditTool,
|
|
TestGenTool,
|
|
ThinkDeepTool,
|
|
TracerTool,
|
|
VersionTool,
|
|
)
|
|
from tools.models import ToolOutput # noqa: E402
|
|
|
|
# Configure logging for server operations
|
|
# Can be controlled via LOG_LEVEL environment variable (DEBUG, INFO, WARNING, ERROR)
|
|
log_level = os.getenv("LOG_LEVEL", "DEBUG").upper()
|
|
|
|
# Create timezone-aware formatter
|
|
|
|
|
|
class LocalTimeFormatter(logging.Formatter):
|
|
def formatTime(self, record, datefmt=None):
|
|
"""Override to use local timezone instead of UTC"""
|
|
ct = self.converter(record.created)
|
|
if datefmt:
|
|
s = time.strftime(datefmt, ct)
|
|
else:
|
|
t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
|
|
s = f"{t},{record.msecs:03.0f}"
|
|
return s
|
|
|
|
|
|
# Configure both console and file logging
|
|
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
|
|
# Clear any existing handlers first
|
|
root_logger = logging.getLogger()
|
|
root_logger.handlers.clear()
|
|
|
|
# Create and configure stderr handler explicitly
|
|
stderr_handler = logging.StreamHandler(sys.stderr)
|
|
stderr_handler.setLevel(getattr(logging, log_level, logging.INFO))
|
|
stderr_handler.setFormatter(LocalTimeFormatter(log_format))
|
|
root_logger.addHandler(stderr_handler)
|
|
|
|
# Note: MCP stdio_server interferes with stderr during tool execution
|
|
# All logs are properly written to logs/mcp_server.log for monitoring
|
|
|
|
# Set root logger level
|
|
root_logger.setLevel(getattr(logging, log_level, logging.INFO))
|
|
|
|
# Add rotating file handler for local log monitoring
|
|
|
|
try:
|
|
# Create logs directory in project root
|
|
log_dir = Path(__file__).parent / "logs"
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
# Main server log with size-based rotation (20MB max per file)
|
|
# This ensures logs don't grow indefinitely and are properly managed
|
|
file_handler = RotatingFileHandler(
|
|
log_dir / "mcp_server.log",
|
|
maxBytes=20 * 1024 * 1024, # 20MB max file size
|
|
backupCount=5, # Keep 10 rotated files (100MB total)
|
|
encoding="utf-8",
|
|
)
|
|
file_handler.setLevel(getattr(logging, log_level, logging.INFO))
|
|
file_handler.setFormatter(LocalTimeFormatter(log_format))
|
|
logging.getLogger().addHandler(file_handler)
|
|
|
|
# Create a special logger for MCP activity tracking with size-based rotation
|
|
mcp_logger = logging.getLogger("mcp_activity")
|
|
mcp_file_handler = RotatingFileHandler(
|
|
log_dir / "mcp_activity.log",
|
|
maxBytes=10 * 1024 * 1024, # 20MB max file size
|
|
backupCount=2, # Keep 5 rotated files (20MB total)
|
|
encoding="utf-8",
|
|
)
|
|
mcp_file_handler.setLevel(logging.INFO)
|
|
mcp_file_handler.setFormatter(LocalTimeFormatter("%(asctime)s - %(message)s"))
|
|
mcp_logger.addHandler(mcp_file_handler)
|
|
mcp_logger.setLevel(logging.INFO)
|
|
# Ensure MCP activity also goes to stderr
|
|
mcp_logger.propagate = True
|
|
|
|
# Log setup info directly to root logger since logger isn't defined yet
|
|
logging.info(f"Logging to: {log_dir / 'mcp_server.log'}")
|
|
logging.info(f"Process PID: {os.getpid()}")
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not set up file logging: {e}", file=sys.stderr)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create the MCP server instance with a unique name identifier
|
|
# This name is used by MCP clients to identify and connect to this specific server
|
|
server: Server = Server("zen-server")
|
|
|
|
|
|
# Constants for tool filtering
|
|
ESSENTIAL_TOOLS = {"version", "listmodels"}
|
|
|
|
|
|
def parse_disabled_tools_env() -> set[str]:
|
|
"""
|
|
Parse the DISABLED_TOOLS environment variable into a set of tool names.
|
|
|
|
Returns:
|
|
Set of lowercase tool names to disable, empty set if none specified
|
|
"""
|
|
disabled_tools_env = os.getenv("DISABLED_TOOLS", "").strip()
|
|
if not disabled_tools_env:
|
|
return set()
|
|
return {t.strip().lower() for t in disabled_tools_env.split(",") if t.strip()}
|
|
|
|
|
|
def validate_disabled_tools(disabled_tools: set[str], all_tools: dict[str, Any]) -> None:
|
|
"""
|
|
Validate the disabled tools list and log appropriate warnings.
|
|
|
|
Args:
|
|
disabled_tools: Set of tool names requested to be disabled
|
|
all_tools: Dictionary of all available tool instances
|
|
"""
|
|
essential_disabled = disabled_tools & ESSENTIAL_TOOLS
|
|
if essential_disabled:
|
|
logger.warning(f"Cannot disable essential tools: {sorted(essential_disabled)}")
|
|
unknown_tools = disabled_tools - set(all_tools.keys())
|
|
if unknown_tools:
|
|
logger.warning(f"Unknown tools in DISABLED_TOOLS: {sorted(unknown_tools)}")
|
|
|
|
|
|
def apply_tool_filter(all_tools: dict[str, Any], disabled_tools: set[str]) -> dict[str, Any]:
|
|
"""
|
|
Apply the disabled tools filter to create the final tools dictionary.
|
|
|
|
Args:
|
|
all_tools: Dictionary of all available tool instances
|
|
disabled_tools: Set of tool names to disable
|
|
|
|
Returns:
|
|
Dictionary containing only enabled tools
|
|
"""
|
|
enabled_tools = {}
|
|
for tool_name, tool_instance in all_tools.items():
|
|
if tool_name in ESSENTIAL_TOOLS or tool_name not in disabled_tools:
|
|
enabled_tools[tool_name] = tool_instance
|
|
else:
|
|
logger.debug(f"Tool '{tool_name}' disabled via DISABLED_TOOLS")
|
|
return enabled_tools
|
|
|
|
|
|
def log_tool_configuration(disabled_tools: set[str], enabled_tools: dict[str, Any]) -> None:
|
|
"""
|
|
Log the final tool configuration for visibility.
|
|
|
|
Args:
|
|
disabled_tools: Set of tool names that were requested to be disabled
|
|
enabled_tools: Dictionary of tools that remain enabled
|
|
"""
|
|
if not disabled_tools:
|
|
logger.info("All tools enabled (DISABLED_TOOLS not set)")
|
|
return
|
|
actual_disabled = disabled_tools - ESSENTIAL_TOOLS
|
|
if actual_disabled:
|
|
logger.debug(f"Disabled tools: {sorted(actual_disabled)}")
|
|
logger.info(f"Active tools: {sorted(enabled_tools.keys())}")
|
|
|
|
|
|
def filter_disabled_tools(all_tools: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Filter tools based on DISABLED_TOOLS environment variable.
|
|
|
|
Args:
|
|
all_tools: Dictionary of all available tool instances
|
|
|
|
Returns:
|
|
dict: Filtered dictionary containing only enabled tools
|
|
"""
|
|
disabled_tools = parse_disabled_tools_env()
|
|
if not disabled_tools:
|
|
log_tool_configuration(disabled_tools, all_tools)
|
|
return all_tools
|
|
validate_disabled_tools(disabled_tools, all_tools)
|
|
enabled_tools = apply_tool_filter(all_tools, disabled_tools)
|
|
log_tool_configuration(disabled_tools, enabled_tools)
|
|
return enabled_tools
|
|
|
|
|
|
# Initialize the tool registry with all available AI-powered tools
|
|
# Each tool provides specialized functionality for different development tasks
|
|
# Tools are instantiated once and reused across requests (stateless design)
|
|
TOOLS = {
|
|
"chat": ChatTool(), # Interactive development chat and brainstorming
|
|
"thinkdeep": ThinkDeepTool(), # Step-by-step deep thinking workflow with expert analysis
|
|
"planner": PlannerTool(), # Interactive sequential planner using workflow architecture
|
|
"consensus": ConsensusTool(), # Step-by-step consensus workflow with multi-model analysis
|
|
"codereview": CodeReviewTool(), # Comprehensive step-by-step code review workflow with expert analysis
|
|
"precommit": PrecommitTool(), # Step-by-step pre-commit validation workflow
|
|
"debug": DebugIssueTool(), # Root cause analysis and debugging assistance
|
|
"secaudit": SecauditTool(), # Comprehensive security audit with OWASP Top 10 and compliance coverage
|
|
"docgen": DocgenTool(), # Step-by-step documentation generation with complexity analysis
|
|
"analyze": AnalyzeTool(), # General-purpose file and code analysis
|
|
"refactor": RefactorTool(), # Step-by-step refactoring analysis workflow with expert validation
|
|
"tracer": TracerTool(), # Static call path prediction and control flow analysis
|
|
"testgen": TestGenTool(), # Step-by-step test generation workflow with expert validation
|
|
"challenge": ChallengeTool(), # Critical challenge prompt wrapper to avoid automatic agreement
|
|
"listmodels": ListModelsTool(), # List all available AI models by provider
|
|
"version": VersionTool(), # Display server version and system information
|
|
}
|
|
TOOLS = filter_disabled_tools(TOOLS)
|
|
|
|
# Rich prompt templates for all tools
|
|
PROMPT_TEMPLATES = {
|
|
"chat": {
|
|
"name": "chat",
|
|
"description": "Chat and brainstorm ideas",
|
|
"template": "Chat with {model} about this",
|
|
},
|
|
"thinkdeep": {
|
|
"name": "thinkdeeper",
|
|
"description": "Step-by-step deep thinking workflow with expert analysis",
|
|
"template": "Start comprehensive deep thinking workflow with {model} using {thinking_mode} thinking mode",
|
|
},
|
|
"planner": {
|
|
"name": "planner",
|
|
"description": "Break down complex ideas, problems, or projects into multiple manageable steps",
|
|
"template": "Create a detailed plan with {model}",
|
|
},
|
|
"consensus": {
|
|
"name": "consensus",
|
|
"description": "Step-by-step consensus workflow with multi-model analysis",
|
|
"template": "Start comprehensive consensus workflow with {model}",
|
|
},
|
|
"codereview": {
|
|
"name": "review",
|
|
"description": "Perform a comprehensive code review",
|
|
"template": "Perform a comprehensive code review with {model}",
|
|
},
|
|
"precommit": {
|
|
"name": "precommit",
|
|
"description": "Step-by-step pre-commit validation workflow",
|
|
"template": "Start comprehensive pre-commit validation workflow with {model}",
|
|
},
|
|
"debug": {
|
|
"name": "debug",
|
|
"description": "Debug an issue or error",
|
|
"template": "Help debug this issue with {model}",
|
|
},
|
|
"secaudit": {
|
|
"name": "secaudit",
|
|
"description": "Comprehensive security audit with OWASP Top 10 coverage",
|
|
"template": "Perform comprehensive security audit with {model}",
|
|
},
|
|
"docgen": {
|
|
"name": "docgen",
|
|
"description": "Generate comprehensive code documentation with complexity analysis",
|
|
"template": "Generate comprehensive documentation with {model}",
|
|
},
|
|
"analyze": {
|
|
"name": "analyze",
|
|
"description": "Analyze files and code structure",
|
|
"template": "Analyze these files with {model}",
|
|
},
|
|
"refactor": {
|
|
"name": "refactor",
|
|
"description": "Refactor and improve code structure",
|
|
"template": "Refactor this code with {model}",
|
|
},
|
|
"tracer": {
|
|
"name": "tracer",
|
|
"description": "Trace code execution paths",
|
|
"template": "Generate tracer analysis with {model}",
|
|
},
|
|
"testgen": {
|
|
"name": "testgen",
|
|
"description": "Generate comprehensive tests",
|
|
"template": "Generate comprehensive tests with {model}",
|
|
},
|
|
"challenge": {
|
|
"name": "challenge",
|
|
"description": "Challenge a statement critically without automatic agreement",
|
|
"template": "Challenge this statement critically",
|
|
},
|
|
"listmodels": {
|
|
"name": "listmodels",
|
|
"description": "List available AI models",
|
|
"template": "List all available models",
|
|
},
|
|
"version": {
|
|
"name": "version",
|
|
"description": "Show server version and system information",
|
|
"template": "Show Zen MCP Server version",
|
|
},
|
|
}
|
|
|
|
|
|
def configure_providers():
|
|
"""
|
|
Configure and validate AI providers based on available API keys.
|
|
|
|
This function checks for API keys and registers the appropriate providers.
|
|
At least one valid API key (Gemini or OpenAI) is required.
|
|
|
|
Raises:
|
|
ValueError: If no valid API keys are found or conflicting configurations detected
|
|
"""
|
|
# Log environment variable status for debugging
|
|
logger.debug("Checking environment variables for API keys...")
|
|
api_keys_to_check = ["OPENAI_API_KEY", "OPENROUTER_API_KEY", "GEMINI_API_KEY", "XAI_API_KEY", "CUSTOM_API_URL"]
|
|
for key in api_keys_to_check:
|
|
value = os.getenv(key)
|
|
logger.debug(f" {key}: {'[PRESENT]' if value else '[MISSING]'}")
|
|
from providers import ModelProviderRegistry
|
|
from providers.base import ProviderType
|
|
from providers.custom import CustomProvider
|
|
from providers.dial import DIALModelProvider
|
|
from providers.gemini import GeminiModelProvider
|
|
from providers.openai_provider import OpenAIModelProvider
|
|
from providers.openrouter import OpenRouterProvider
|
|
from providers.xai import XAIModelProvider
|
|
from utils.model_restrictions import get_restriction_service
|
|
|
|
valid_providers = []
|
|
has_native_apis = False
|
|
has_openrouter = False
|
|
has_custom = False
|
|
|
|
# Check for Gemini API key
|
|
gemini_key = os.getenv("GEMINI_API_KEY")
|
|
if gemini_key and gemini_key != "your_gemini_api_key_here":
|
|
valid_providers.append("Gemini")
|
|
has_native_apis = True
|
|
logger.info("Gemini API key found - Gemini models available")
|
|
|
|
# Check for OpenAI API key
|
|
openai_key = os.getenv("OPENAI_API_KEY")
|
|
logger.debug(f"OpenAI key check: key={'[PRESENT]' if openai_key else '[MISSING]'}")
|
|
if openai_key and openai_key != "your_openai_api_key_here":
|
|
valid_providers.append("OpenAI (o3)")
|
|
has_native_apis = True
|
|
logger.info("OpenAI API key found - o3 model available")
|
|
else:
|
|
if not openai_key:
|
|
logger.debug("OpenAI API key not found in environment")
|
|
else:
|
|
logger.debug("OpenAI API key is placeholder value")
|
|
|
|
# Check for X.AI API key
|
|
xai_key = os.getenv("XAI_API_KEY")
|
|
if xai_key and xai_key != "your_xai_api_key_here":
|
|
valid_providers.append("X.AI (GROK)")
|
|
has_native_apis = True
|
|
logger.info("X.AI API key found - GROK models available")
|
|
|
|
# Check for DIAL API key
|
|
dial_key = os.getenv("DIAL_API_KEY")
|
|
if dial_key and dial_key != "your_dial_api_key_here":
|
|
valid_providers.append("DIAL")
|
|
has_native_apis = True
|
|
logger.info("DIAL API key found - DIAL models available")
|
|
|
|
# Check for OpenRouter API key
|
|
openrouter_key = os.getenv("OPENROUTER_API_KEY")
|
|
logger.debug(f"OpenRouter key check: key={'[PRESENT]' if openrouter_key else '[MISSING]'}")
|
|
if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
|
|
valid_providers.append("OpenRouter")
|
|
has_openrouter = True
|
|
logger.info("OpenRouter API key found - Multiple models available via OpenRouter")
|
|
else:
|
|
if not openrouter_key:
|
|
logger.debug("OpenRouter API key not found in environment")
|
|
else:
|
|
logger.debug("OpenRouter API key is placeholder value")
|
|
|
|
# Check for custom API endpoint (Ollama, vLLM, etc.)
|
|
custom_url = os.getenv("CUSTOM_API_URL")
|
|
if custom_url:
|
|
# IMPORTANT: Always read CUSTOM_API_KEY even if empty
|
|
# - Some providers (vLLM, LM Studio, enterprise APIs) require authentication
|
|
# - Others (Ollama) work without authentication (empty key)
|
|
# - DO NOT remove this variable - it's needed for provider factory function
|
|
custom_key = os.getenv("CUSTOM_API_KEY", "") # Default to empty (Ollama doesn't need auth)
|
|
custom_model = os.getenv("CUSTOM_MODEL_NAME", "llama3.2")
|
|
valid_providers.append(f"Custom API ({custom_url})")
|
|
has_custom = True
|
|
logger.info(f"Custom API endpoint found: {custom_url} with model {custom_model}")
|
|
if custom_key:
|
|
logger.debug("Custom API key provided for authentication")
|
|
else:
|
|
logger.debug("No custom API key provided (using unauthenticated access)")
|
|
|
|
# Register providers in priority order:
|
|
# 1. Native APIs first (most direct and efficient)
|
|
if has_native_apis:
|
|
if gemini_key and gemini_key != "your_gemini_api_key_here":
|
|
ModelProviderRegistry.register_provider(ProviderType.GOOGLE, GeminiModelProvider)
|
|
if openai_key and openai_key != "your_openai_api_key_here":
|
|
ModelProviderRegistry.register_provider(ProviderType.OPENAI, OpenAIModelProvider)
|
|
if xai_key and xai_key != "your_xai_api_key_here":
|
|
ModelProviderRegistry.register_provider(ProviderType.XAI, XAIModelProvider)
|
|
if dial_key and dial_key != "your_dial_api_key_here":
|
|
ModelProviderRegistry.register_provider(ProviderType.DIAL, DIALModelProvider)
|
|
|
|
# 2. Custom provider second (for local/private models)
|
|
if has_custom:
|
|
# Factory function that creates CustomProvider with proper parameters
|
|
def custom_provider_factory(api_key=None):
|
|
# api_key is CUSTOM_API_KEY (can be empty for Ollama), base_url from CUSTOM_API_URL
|
|
base_url = os.getenv("CUSTOM_API_URL", "")
|
|
return CustomProvider(api_key=api_key or "", base_url=base_url) # Use provided API key or empty string
|
|
|
|
ModelProviderRegistry.register_provider(ProviderType.CUSTOM, custom_provider_factory)
|
|
|
|
# 3. OpenRouter last (catch-all for everything else)
|
|
if has_openrouter:
|
|
ModelProviderRegistry.register_provider(ProviderType.OPENROUTER, OpenRouterProvider)
|
|
|
|
# Require at least one valid provider
|
|
if not valid_providers:
|
|
raise ValueError(
|
|
"At least one API configuration is required. Please set either:\n"
|
|
"- GEMINI_API_KEY for Gemini models\n"
|
|
"- OPENAI_API_KEY for OpenAI o3 model\n"
|
|
"- XAI_API_KEY for X.AI GROK models\n"
|
|
"- DIAL_API_KEY for DIAL models\n"
|
|
"- OPENROUTER_API_KEY for OpenRouter (multiple models)\n"
|
|
"- CUSTOM_API_URL for local models (Ollama, vLLM, etc.)"
|
|
)
|
|
|
|
logger.info(f"Available providers: {', '.join(valid_providers)}")
|
|
|
|
# Log provider priority
|
|
priority_info = []
|
|
if has_native_apis:
|
|
priority_info.append("Native APIs (Gemini, OpenAI)")
|
|
if has_custom:
|
|
priority_info.append("Custom endpoints")
|
|
if has_openrouter:
|
|
priority_info.append("OpenRouter (catch-all)")
|
|
|
|
if len(priority_info) > 1:
|
|
logger.info(f"Provider priority: {' → '.join(priority_info)}")
|
|
|
|
# Register cleanup function for providers
|
|
def cleanup_providers():
|
|
"""Clean up all registered providers on shutdown."""
|
|
try:
|
|
registry = ModelProviderRegistry()
|
|
if hasattr(registry, "_initialized_providers"):
|
|
for provider in list(registry._initialized_providers.items()):
|
|
try:
|
|
if provider and hasattr(provider, "close"):
|
|
provider.close()
|
|
except Exception:
|
|
# Logger might be closed during shutdown
|
|
pass
|
|
except Exception:
|
|
# Silently ignore any errors during cleanup
|
|
pass
|
|
|
|
atexit.register(cleanup_providers)
|
|
|
|
# Check and log model restrictions
|
|
restriction_service = get_restriction_service()
|
|
restrictions = restriction_service.get_restriction_summary()
|
|
|
|
if restrictions:
|
|
logger.info("Model restrictions configured:")
|
|
for provider_name, allowed_models in restrictions.items():
|
|
if isinstance(allowed_models, list):
|
|
logger.info(f" {provider_name}: {', '.join(allowed_models)}")
|
|
else:
|
|
logger.info(f" {provider_name}: {allowed_models}")
|
|
|
|
# Validate restrictions against known models
|
|
provider_instances = {}
|
|
provider_types_to_validate = [ProviderType.GOOGLE, ProviderType.OPENAI, ProviderType.XAI, ProviderType.DIAL]
|
|
for provider_type in provider_types_to_validate:
|
|
provider = ModelProviderRegistry.get_provider(provider_type)
|
|
if provider:
|
|
provider_instances[provider_type] = provider
|
|
|
|
if provider_instances:
|
|
restriction_service.validate_against_known_models(provider_instances)
|
|
else:
|
|
logger.info("No model restrictions configured - all models allowed")
|
|
|
|
# Check if auto mode has any models available after restrictions
|
|
from config import IS_AUTO_MODE
|
|
|
|
if IS_AUTO_MODE:
|
|
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
|
|
if not available_models:
|
|
logger.error(
|
|
"Auto mode is enabled but no models are available after applying restrictions. "
|
|
"Please check your OPENAI_ALLOWED_MODELS and GOOGLE_ALLOWED_MODELS settings."
|
|
)
|
|
raise ValueError(
|
|
"No models available for auto mode due to restrictions. "
|
|
"Please adjust your allowed model settings or disable auto mode."
|
|
)
|
|
|
|
|
|
@server.list_tools()
|
|
async def handle_list_tools() -> list[Tool]:
|
|
"""
|
|
List all available tools with their descriptions and input schemas.
|
|
|
|
This handler is called by MCP clients during initialization to discover
|
|
what tools are available. Each tool provides:
|
|
- name: Unique identifier for the tool
|
|
- description: Detailed explanation of what the tool does
|
|
- inputSchema: JSON Schema defining the expected parameters
|
|
|
|
Returns:
|
|
List of Tool objects representing all available tools
|
|
"""
|
|
logger.debug("MCP client requested tool list")
|
|
tools = []
|
|
|
|
# Add all registered AI-powered tools from the TOOLS registry
|
|
for tool in TOOLS.values():
|
|
# Get optional annotations from the tool
|
|
annotations = tool.get_annotations()
|
|
tool_annotations = ToolAnnotations(**annotations) if annotations else None
|
|
|
|
tools.append(
|
|
Tool(
|
|
name=tool.name,
|
|
description=tool.description,
|
|
inputSchema=tool.get_input_schema(),
|
|
annotations=tool_annotations,
|
|
)
|
|
)
|
|
|
|
# Log cache efficiency info
|
|
if os.getenv("OPENROUTER_API_KEY") and os.getenv("OPENROUTER_API_KEY") != "your_openrouter_api_key_here":
|
|
logger.debug("OpenRouter registry cache used efficiently across all tool schemas")
|
|
|
|
logger.debug(f"Returning {len(tools)} tools to MCP client")
|
|
return tools
|
|
|
|
|
|
@server.call_tool()
|
|
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
|
|
"""
|
|
Handle incoming tool execution requests from MCP clients.
|
|
|
|
This is the main request dispatcher that routes tool calls to their appropriate handlers.
|
|
It supports both AI-powered tools (from TOOLS registry) and utility tools (implemented as
|
|
static functions).
|
|
|
|
CONVERSATION LIFECYCLE MANAGEMENT:
|
|
This function serves as the central orchestrator for multi-turn AI-to-AI conversations:
|
|
|
|
1. THREAD RESUMPTION: When continuation_id is present, it reconstructs complete conversation
|
|
context from in-memory storage including conversation history and file references
|
|
|
|
2. CROSS-TOOL CONTINUATION: Enables seamless handoffs between different tools (analyze →
|
|
codereview → debug) while preserving full conversation context and file references
|
|
|
|
3. CONTEXT INJECTION: Reconstructed conversation history is embedded into tool prompts
|
|
using the dual prioritization strategy:
|
|
- Files: Newest-first prioritization (recent file versions take precedence)
|
|
- Turns: Newest-first collection for token efficiency, chronological presentation for LLM
|
|
|
|
4. FOLLOW-UP GENERATION: After tool execution, generates continuation offers for ongoing
|
|
AI-to-AI collaboration with natural language instructions
|
|
|
|
STATELESS TO STATEFUL BRIDGE:
|
|
The MCP protocol is inherently stateless, but this function bridges the gap by:
|
|
- Loading persistent conversation state from in-memory storage
|
|
- Reconstructing full multi-turn context for tool execution
|
|
- Enabling tools to access previous exchanges and file references
|
|
- Supporting conversation chains across different tool types
|
|
|
|
Args:
|
|
name: The name of the tool to execute (e.g., "analyze", "chat", "codereview")
|
|
arguments: Dictionary of arguments to pass to the tool, potentially including:
|
|
- continuation_id: UUID for conversation thread resumption
|
|
- files: File paths for analysis (subject to deduplication)
|
|
- prompt: User request or follow-up question
|
|
- model: Specific AI model to use (optional)
|
|
|
|
Returns:
|
|
List of TextContent objects containing:
|
|
- Tool's primary response with analysis/results
|
|
- Continuation offers for follow-up conversations (when applicable)
|
|
- Structured JSON responses with status and content
|
|
|
|
Raises:
|
|
ValueError: If continuation_id is invalid or conversation thread not found
|
|
Exception: For tool-specific errors or execution failures
|
|
|
|
Example Conversation Flow:
|
|
1. Claude calls analyze tool with files → creates new thread
|
|
2. Thread ID returned in continuation offer
|
|
3. Claude continues with codereview tool + continuation_id → full context preserved
|
|
4. Multiple tools can collaborate using same thread ID
|
|
"""
|
|
logger.info(f"MCP tool call: {name}")
|
|
logger.debug(f"MCP tool arguments: {list(arguments.keys())}")
|
|
|
|
# Log to activity file for monitoring
|
|
try:
|
|
mcp_activity_logger = logging.getLogger("mcp_activity")
|
|
mcp_activity_logger.info(f"TOOL_CALL: {name} with {len(arguments)} arguments")
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle thread context reconstruction if continuation_id is present
|
|
if "continuation_id" in arguments and arguments["continuation_id"]:
|
|
continuation_id = arguments["continuation_id"]
|
|
logger.debug(f"Resuming conversation thread: {continuation_id}")
|
|
logger.debug(
|
|
f"[CONVERSATION_DEBUG] Tool '{name}' resuming thread {continuation_id} with {len(arguments)} arguments"
|
|
)
|
|
logger.debug(f"[CONVERSATION_DEBUG] Original arguments keys: {list(arguments.keys())}")
|
|
|
|
# Log to activity file for monitoring
|
|
try:
|
|
mcp_activity_logger = logging.getLogger("mcp_activity")
|
|
mcp_activity_logger.info(f"CONVERSATION_RESUME: {name} resuming thread {continuation_id}")
|
|
except Exception:
|
|
pass
|
|
|
|
arguments = await reconstruct_thread_context(arguments)
|
|
logger.debug(f"[CONVERSATION_DEBUG] After thread reconstruction, arguments keys: {list(arguments.keys())}")
|
|
if "_remaining_tokens" in arguments:
|
|
logger.debug(f"[CONVERSATION_DEBUG] Remaining token budget: {arguments['_remaining_tokens']:,}")
|
|
|
|
# Route to AI-powered tools that require Gemini API calls
|
|
if name in TOOLS:
|
|
logger.info(f"Executing tool '{name}' with {len(arguments)} parameter(s)")
|
|
tool = TOOLS[name]
|
|
|
|
# EARLY MODEL RESOLUTION AT MCP BOUNDARY
|
|
# Resolve model before passing to tool - this ensures consistent model handling
|
|
# NOTE: Consensus tool is exempt as it handles multiple models internally
|
|
from providers.registry import ModelProviderRegistry
|
|
from utils.file_utils import check_total_file_size
|
|
from utils.model_context import ModelContext
|
|
|
|
# Get model from arguments or use default
|
|
model_name = arguments.get("model") or DEFAULT_MODEL
|
|
logger.debug(f"Initial model for {name}: {model_name}")
|
|
|
|
# Parse model:option format if present
|
|
model_name, model_option = parse_model_option(model_name)
|
|
if model_option:
|
|
logger.debug(f"Parsed model format - model: '{model_name}', option: '{model_option}'")
|
|
|
|
# Consensus tool handles its own model configuration validation
|
|
# No special handling needed at server level
|
|
|
|
# Skip model resolution for tools that don't require models (e.g., planner)
|
|
if not tool.requires_model():
|
|
logger.debug(f"Tool {name} doesn't require model resolution - skipping model validation")
|
|
# Execute tool directly without model context
|
|
return await tool.execute(arguments)
|
|
|
|
# Handle auto mode at MCP boundary - resolve to specific model
|
|
if model_name.lower() == "auto":
|
|
# Get tool category to determine appropriate model
|
|
tool_category = tool.get_model_category()
|
|
resolved_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
|
|
logger.info(f"Auto mode resolved to {resolved_model} for {name} (category: {tool_category.value})")
|
|
model_name = resolved_model
|
|
# Update arguments with resolved model
|
|
arguments["model"] = model_name
|
|
|
|
# Validate model availability at MCP boundary
|
|
provider = ModelProviderRegistry.get_provider_for_model(model_name)
|
|
if not provider:
|
|
# Get list of available models for error message
|
|
available_models = list(ModelProviderRegistry.get_available_models(respect_restrictions=True).keys())
|
|
tool_category = tool.get_model_category()
|
|
suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
|
|
|
|
error_message = (
|
|
f"Model '{model_name}' is not available with current API keys. "
|
|
f"Available models: {', '.join(available_models)}. "
|
|
f"Suggested model for {name}: '{suggested_model}' "
|
|
f"(category: {tool_category.value})"
|
|
)
|
|
error_output = ToolOutput(
|
|
status="error",
|
|
content=error_message,
|
|
content_type="text",
|
|
metadata={"tool_name": name, "requested_model": model_name},
|
|
)
|
|
return [TextContent(type="text", text=error_output.model_dump_json())]
|
|
|
|
# Create model context with resolved model and option
|
|
model_context = ModelContext(model_name, model_option)
|
|
arguments["_model_context"] = model_context
|
|
arguments["_resolved_model_name"] = model_name
|
|
logger.debug(
|
|
f"Model context created for {model_name} with {model_context.capabilities.context_window} token capacity"
|
|
)
|
|
if model_option:
|
|
logger.debug(f"Model option stored in context: '{model_option}'")
|
|
|
|
# EARLY FILE SIZE VALIDATION AT MCP BOUNDARY
|
|
# Check file sizes before tool execution using resolved model
|
|
if "files" in arguments and arguments["files"]:
|
|
logger.debug(f"Checking file sizes for {len(arguments['files'])} files with model {model_name}")
|
|
file_size_check = check_total_file_size(arguments["files"], model_name)
|
|
if file_size_check:
|
|
logger.warning(f"File size check failed for {name} with model {model_name}")
|
|
return [TextContent(type="text", text=ToolOutput(**file_size_check).model_dump_json())]
|
|
|
|
# Execute tool with pre-resolved model context
|
|
result = await tool.execute(arguments)
|
|
logger.info(f"Tool '{name}' execution completed")
|
|
|
|
# Log completion to activity file
|
|
try:
|
|
mcp_activity_logger = logging.getLogger("mcp_activity")
|
|
mcp_activity_logger.info(f"TOOL_COMPLETED: {name}")
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
# Handle unknown tool requests gracefully
|
|
else:
|
|
return [TextContent(type="text", text=f"Unknown tool: {name}")]
|
|
|
|
|
|
def parse_model_option(model_string: str) -> tuple[str, Optional[str]]:
|
|
"""
|
|
Parse model:option format into model name and option.
|
|
|
|
Handles different formats:
|
|
- OpenRouter models: preserve :free, :beta, :preview suffixes as part of model name
|
|
- Ollama/Custom models: split on : to extract tags like :latest
|
|
- Consensus stance: extract options like :for, :against
|
|
|
|
Args:
|
|
model_string: String that may contain "model:option" format
|
|
|
|
Returns:
|
|
tuple: (model_name, option) where option may be None
|
|
"""
|
|
if ":" in model_string and not model_string.startswith("http"): # Avoid parsing URLs
|
|
# Check if this looks like an OpenRouter model (contains /)
|
|
if "/" in model_string and model_string.count(":") == 1:
|
|
# Could be openai/gpt-4:something - check what comes after colon
|
|
parts = model_string.split(":", 1)
|
|
suffix = parts[1].strip().lower()
|
|
|
|
# Known OpenRouter suffixes to preserve
|
|
if suffix in ["free", "beta", "preview"]:
|
|
return model_string.strip(), None
|
|
|
|
# For other patterns (Ollama tags, consensus stances), split normally
|
|
parts = model_string.split(":", 1)
|
|
model_name = parts[0].strip()
|
|
model_option = parts[1].strip() if len(parts) > 1 else None
|
|
return model_name, model_option
|
|
return model_string.strip(), None
|
|
|
|
|
|
def get_follow_up_instructions(current_turn_count: int, max_turns: int = None) -> str:
|
|
"""
|
|
Generate dynamic follow-up instructions based on conversation turn count.
|
|
|
|
Args:
|
|
current_turn_count: Current number of turns in the conversation
|
|
max_turns: Maximum allowed turns before conversation ends (defaults to MAX_CONVERSATION_TURNS)
|
|
|
|
Returns:
|
|
Follow-up instructions to append to the tool prompt
|
|
"""
|
|
if max_turns is None:
|
|
from utils.conversation_memory import MAX_CONVERSATION_TURNS
|
|
|
|
max_turns = MAX_CONVERSATION_TURNS
|
|
|
|
if current_turn_count >= max_turns - 1:
|
|
# We're at or approaching the turn limit - no more follow-ups
|
|
return """
|
|
IMPORTANT: This is approaching the final exchange in this conversation thread.
|
|
Do NOT include any follow-up questions in your response. Provide your complete
|
|
final analysis and recommendations."""
|
|
else:
|
|
# Normal follow-up instructions
|
|
remaining_turns = max_turns - current_turn_count - 1
|
|
return f"""
|
|
|
|
CONVERSATION CONTINUATION: You can continue this discussion with Claude! ({remaining_turns} exchanges remaining)
|
|
|
|
Feel free to ask clarifying questions or suggest areas for deeper exploration naturally within your response.
|
|
If something needs clarification or you'd benefit from additional context, simply mention it conversationally.
|
|
|
|
IMPORTANT: When you suggest follow-ups or ask questions, you MUST explicitly instruct Claude to use the continuation_id
|
|
to respond. Use clear, direct language based on urgency:
|
|
|
|
For optional follow-ups: "Please continue this conversation using the continuation_id from this response if you'd "
|
|
"like to explore this further."
|
|
|
|
For needed responses: "Please respond using the continuation_id from this response - your input is needed to proceed."
|
|
|
|
For essential/critical responses: "RESPONSE REQUIRED: Please immediately continue using the continuation_id from "
|
|
"this response. Cannot proceed without your clarification/input."
|
|
|
|
This ensures Claude knows both HOW to maintain the conversation thread AND whether a response is optional, "
|
|
"needed, or essential.
|
|
|
|
The tool will automatically provide a continuation_id in the structured response that Claude can use in subsequent
|
|
tool calls to maintain full conversation context across multiple exchanges.
|
|
|
|
Remember: Only suggest follow-ups when they would genuinely add value to the discussion, and always instruct "
|
|
"Claude to use the continuation_id when you do."""
|
|
|
|
|
|
async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Reconstruct conversation context for stateless-to-stateful thread continuation.
|
|
|
|
This is a critical function that transforms the inherently stateless MCP protocol into
|
|
stateful multi-turn conversations. It loads persistent conversation state from in-memory
|
|
storage and rebuilds complete conversation context using the sophisticated dual prioritization
|
|
strategy implemented in the conversation memory system.
|
|
|
|
CONTEXT RECONSTRUCTION PROCESS:
|
|
|
|
1. THREAD RETRIEVAL: Loads complete ThreadContext from storage using continuation_id
|
|
- Includes all conversation turns with tool attribution
|
|
- Preserves file references and cross-tool context
|
|
- Handles conversation chains across multiple linked threads
|
|
|
|
2. CONVERSATION HISTORY BUILDING: Uses build_conversation_history() to create
|
|
comprehensive context with intelligent prioritization:
|
|
|
|
FILE PRIORITIZATION (Newest-First Throughout):
|
|
- When same file appears in multiple turns, newest reference wins
|
|
- File embedding prioritizes recent versions, excludes older duplicates
|
|
- Token budget management ensures most relevant files are preserved
|
|
|
|
CONVERSATION TURN PRIORITIZATION (Dual Strategy):
|
|
- Collection Phase: Processes turns newest-to-oldest for token efficiency
|
|
- Presentation Phase: Presents turns chronologically for LLM understanding
|
|
- Ensures recent context is preserved when token budget is constrained
|
|
|
|
3. CONTEXT INJECTION: Embeds reconstructed history into tool request arguments
|
|
- Conversation history becomes part of the tool's prompt context
|
|
- Files referenced in previous turns are accessible to current tool
|
|
- Cross-tool knowledge transfer is seamless and comprehensive
|
|
|
|
4. TOKEN BUDGET MANAGEMENT: Applies model-specific token allocation
|
|
- Balances conversation history vs. file content vs. response space
|
|
- Gracefully handles token limits with intelligent exclusion strategies
|
|
- Preserves most contextually relevant information within constraints
|
|
|
|
CROSS-TOOL CONTINUATION SUPPORT:
|
|
This function enables seamless handoffs between different tools:
|
|
- Analyze tool → Debug tool: Full file context and analysis preserved
|
|
- Chat tool → CodeReview tool: Conversation context maintained
|
|
- Any tool → Any tool: Complete cross-tool knowledge transfer
|
|
|
|
ERROR HANDLING & RECOVERY:
|
|
- Thread expiration: Provides clear instructions for conversation restart
|
|
- Storage unavailability: Graceful degradation with error messaging
|
|
- Invalid continuation_id: Security validation and user-friendly errors
|
|
|
|
Args:
|
|
arguments: Original request arguments dictionary containing:
|
|
- continuation_id (required): UUID of conversation thread to resume
|
|
- Other tool-specific arguments that will be preserved
|
|
|
|
Returns:
|
|
dict[str, Any]: Enhanced arguments dictionary with conversation context:
|
|
- Original arguments preserved
|
|
- Conversation history embedded in appropriate format for tool consumption
|
|
- File context from previous turns made accessible
|
|
- Cross-tool knowledge transfer enabled
|
|
|
|
Raises:
|
|
ValueError: When continuation_id is invalid, thread not found, or expired
|
|
Includes user-friendly recovery instructions
|
|
|
|
Performance Characteristics:
|
|
- O(1) thread lookup in memory
|
|
- O(n) conversation history reconstruction where n = number of turns
|
|
- Intelligent token budgeting prevents context window overflow
|
|
- Optimized file deduplication minimizes redundant content
|
|
|
|
Example Usage Flow:
|
|
1. Claude: "Continue analyzing the security issues" + continuation_id
|
|
2. reconstruct_thread_context() loads previous analyze conversation
|
|
3. Debug tool receives full context including previous file analysis
|
|
4. Debug tool can reference specific findings from analyze tool
|
|
5. Natural cross-tool collaboration without context loss
|
|
"""
|
|
from utils.conversation_memory import add_turn, build_conversation_history, get_thread
|
|
|
|
continuation_id = arguments["continuation_id"]
|
|
|
|
# Get thread context from storage
|
|
logger.debug(f"[CONVERSATION_DEBUG] Looking up thread {continuation_id} in storage")
|
|
context = get_thread(continuation_id)
|
|
if not context:
|
|
logger.warning(f"Thread not found: {continuation_id}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Thread {continuation_id} not found in storage or expired")
|
|
|
|
# Log to activity file for monitoring
|
|
try:
|
|
mcp_activity_logger = logging.getLogger("mcp_activity")
|
|
mcp_activity_logger.info(f"CONVERSATION_ERROR: Thread {continuation_id} not found or expired")
|
|
except Exception:
|
|
pass
|
|
|
|
# Return error asking Claude to restart conversation with full context
|
|
raise ValueError(
|
|
f"Conversation thread '{continuation_id}' was not found or has expired. "
|
|
f"This may happen if the conversation was created more than 3 hours ago or if the "
|
|
f"server was restarted. "
|
|
f"Please restart the conversation by providing your full question/prompt without the "
|
|
f"continuation_id parameter. "
|
|
f"This will create a new conversation thread that can continue with follow-up exchanges."
|
|
)
|
|
|
|
# Add user's new input to the conversation
|
|
user_prompt = arguments.get("prompt", "")
|
|
if user_prompt:
|
|
# Capture files referenced in this turn
|
|
user_files = arguments.get("files", [])
|
|
logger.debug(f"[CONVERSATION_DEBUG] Adding user turn to thread {continuation_id}")
|
|
from utils.token_utils import estimate_tokens
|
|
|
|
user_prompt_tokens = estimate_tokens(user_prompt)
|
|
logger.debug(
|
|
f"[CONVERSATION_DEBUG] User prompt length: {len(user_prompt)} chars (~{user_prompt_tokens:,} tokens)"
|
|
)
|
|
logger.debug(f"[CONVERSATION_DEBUG] User files: {user_files}")
|
|
success = add_turn(continuation_id, "user", user_prompt, files=user_files)
|
|
if not success:
|
|
logger.warning(f"Failed to add user turn to thread {continuation_id}")
|
|
logger.debug("[CONVERSATION_DEBUG] Failed to add user turn - thread may be at turn limit or expired")
|
|
else:
|
|
logger.debug(f"[CONVERSATION_DEBUG] Successfully added user turn to thread {continuation_id}")
|
|
|
|
# Create model context early to use for history building
|
|
from utils.model_context import ModelContext
|
|
|
|
# Check if we should use the model from the previous conversation turn
|
|
model_from_args = arguments.get("model")
|
|
if not model_from_args and context.turns:
|
|
# Find the last assistant turn to get the model used
|
|
for turn in reversed(context.turns):
|
|
if turn.role == "assistant" and turn.model_name:
|
|
arguments["model"] = turn.model_name
|
|
logger.debug(f"[CONVERSATION_DEBUG] Using model from previous turn: {turn.model_name}")
|
|
break
|
|
|
|
model_context = ModelContext.from_arguments(arguments)
|
|
|
|
# Build conversation history with model-specific limits
|
|
logger.debug(f"[CONVERSATION_DEBUG] Building conversation history for thread {continuation_id}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Thread has {len(context.turns)} turns, tool: {context.tool_name}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Using model: {model_context.model_name}")
|
|
conversation_history, conversation_tokens = build_conversation_history(context, model_context)
|
|
logger.debug(f"[CONVERSATION_DEBUG] Conversation history built: {conversation_tokens:,} tokens")
|
|
logger.debug(
|
|
f"[CONVERSATION_DEBUG] Conversation history length: {len(conversation_history)} chars (~{conversation_tokens:,} tokens)"
|
|
)
|
|
|
|
# Add dynamic follow-up instructions based on turn count
|
|
follow_up_instructions = get_follow_up_instructions(len(context.turns))
|
|
logger.debug(f"[CONVERSATION_DEBUG] Follow-up instructions added for turn {len(context.turns)}")
|
|
|
|
# All tools now use standardized 'prompt' field
|
|
original_prompt = arguments.get("prompt", "")
|
|
logger.debug("[CONVERSATION_DEBUG] Extracting user input from 'prompt' field")
|
|
original_prompt_tokens = estimate_tokens(original_prompt) if original_prompt else 0
|
|
logger.debug(
|
|
f"[CONVERSATION_DEBUG] User input length: {len(original_prompt)} chars (~{original_prompt_tokens:,} tokens)"
|
|
)
|
|
|
|
# Merge original context with new prompt and follow-up instructions
|
|
if conversation_history:
|
|
enhanced_prompt = (
|
|
f"{conversation_history}\n\n=== NEW USER INPUT ===\n{original_prompt}\n\n{follow_up_instructions}"
|
|
)
|
|
else:
|
|
enhanced_prompt = f"{original_prompt}\n\n{follow_up_instructions}"
|
|
|
|
# Update arguments with enhanced context and remaining token budget
|
|
enhanced_arguments = arguments.copy()
|
|
|
|
# Store the enhanced prompt in the prompt field
|
|
enhanced_arguments["prompt"] = enhanced_prompt
|
|
# Store the original user prompt separately for size validation
|
|
enhanced_arguments["_original_user_prompt"] = original_prompt
|
|
logger.debug("[CONVERSATION_DEBUG] Storing enhanced prompt in 'prompt' field")
|
|
logger.debug("[CONVERSATION_DEBUG] Storing original user prompt in '_original_user_prompt' field")
|
|
|
|
# Calculate remaining token budget based on current model
|
|
# (model_context was already created above for history building)
|
|
token_allocation = model_context.calculate_token_allocation()
|
|
|
|
# Calculate remaining tokens for files/new content
|
|
# History has already consumed some of the content budget
|
|
remaining_tokens = token_allocation.content_tokens - conversation_tokens
|
|
enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative
|
|
enhanced_arguments["_model_context"] = model_context # Pass context for use in tools
|
|
|
|
logger.debug("[CONVERSATION_DEBUG] Token budget calculation:")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Model: {model_context.model_name}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Total capacity: {token_allocation.total_tokens:,}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Content allocation: {token_allocation.content_tokens:,}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Conversation tokens: {conversation_tokens:,}")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Remaining tokens: {remaining_tokens:,}")
|
|
|
|
# Merge original context parameters (files, etc.) with new request
|
|
if context.initial_context:
|
|
logger.debug(f"[CONVERSATION_DEBUG] Merging initial context with {len(context.initial_context)} parameters")
|
|
for key, value in context.initial_context.items():
|
|
if key not in enhanced_arguments and key not in ["temperature", "thinking_mode", "model"]:
|
|
enhanced_arguments[key] = value
|
|
logger.debug(f"[CONVERSATION_DEBUG] Merged initial context param: {key}")
|
|
|
|
logger.info(f"Reconstructed context for thread {continuation_id} (turn {len(context.turns)})")
|
|
logger.debug(f"[CONVERSATION_DEBUG] Final enhanced arguments keys: {list(enhanced_arguments.keys())}")
|
|
|
|
# Debug log files in the enhanced arguments for file tracking
|
|
if "files" in enhanced_arguments:
|
|
logger.debug(f"[CONVERSATION_DEBUG] Final files in enhanced arguments: {enhanced_arguments['files']}")
|
|
|
|
# Log to activity file for monitoring
|
|
try:
|
|
mcp_activity_logger = logging.getLogger("mcp_activity")
|
|
mcp_activity_logger.info(
|
|
f"CONVERSATION_CONTINUATION: Thread {continuation_id} turn {len(context.turns)} - "
|
|
f"{len(context.turns)} previous turns loaded"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return enhanced_arguments
|
|
|
|
|
|
@server.list_prompts()
|
|
async def handle_list_prompts() -> list[Prompt]:
|
|
"""
|
|
List all available prompts for Claude Code shortcuts.
|
|
|
|
This handler returns prompts that enable shortcuts like /zen:thinkdeeper.
|
|
We automatically generate prompts from all tools (1:1 mapping) plus add
|
|
a few marketing aliases with richer templates for commonly used tools.
|
|
|
|
Returns:
|
|
List of Prompt objects representing all available prompts
|
|
"""
|
|
logger.debug("MCP client requested prompt list")
|
|
prompts = []
|
|
|
|
# Add a prompt for each tool with rich templates
|
|
for tool_name, tool in TOOLS.items():
|
|
if tool_name in PROMPT_TEMPLATES:
|
|
# Use the rich template
|
|
template_info = PROMPT_TEMPLATES[tool_name]
|
|
prompts.append(
|
|
Prompt(
|
|
name=template_info["name"],
|
|
description=template_info["description"],
|
|
arguments=[], # MVP: no structured args
|
|
)
|
|
)
|
|
else:
|
|
# Fallback for any tools without templates (shouldn't happen)
|
|
prompts.append(
|
|
Prompt(
|
|
name=tool_name,
|
|
description=f"Use {tool.name} tool",
|
|
arguments=[],
|
|
)
|
|
)
|
|
|
|
# Add special "continue" prompt
|
|
prompts.append(
|
|
Prompt(
|
|
name="continue",
|
|
description="Continue the previous conversation using the chat tool",
|
|
arguments=[],
|
|
)
|
|
)
|
|
|
|
logger.debug(f"Returning {len(prompts)} prompts to MCP client")
|
|
return prompts
|
|
|
|
|
|
@server.get_prompt()
|
|
async def handle_get_prompt(name: str, arguments: dict[str, Any] = None) -> GetPromptResult:
|
|
"""
|
|
Get prompt details and generate the actual prompt text.
|
|
|
|
This handler is called when a user invokes a prompt (e.g., /zen:thinkdeeper or /zen:chat:o3).
|
|
It generates the appropriate text that Claude will then use to call the
|
|
underlying tool.
|
|
|
|
Supports structured prompt names like "chat:o3" where:
|
|
- "chat" is the tool name
|
|
- "o3" is the model to use
|
|
|
|
Args:
|
|
name: The name of the prompt to execute (can include model like "chat:o3")
|
|
arguments: Optional arguments for the prompt (e.g., model, thinking_mode)
|
|
|
|
Returns:
|
|
GetPromptResult with the prompt details and generated message
|
|
|
|
Raises:
|
|
ValueError: If the prompt name is unknown
|
|
"""
|
|
logger.debug(f"MCP client requested prompt: {name} with args: {arguments}")
|
|
|
|
# Handle special "continue" case
|
|
if name.lower() == "continue":
|
|
# This is "/zen:continue" - use chat tool as default for continuation
|
|
tool_name = "chat"
|
|
template_info = {
|
|
"name": "continue",
|
|
"description": "Continue the previous conversation",
|
|
"template": "Continue the conversation",
|
|
}
|
|
logger.debug("Using /zen:continue - defaulting to chat tool")
|
|
else:
|
|
# Find the corresponding tool by checking prompt names
|
|
tool_name = None
|
|
template_info = None
|
|
|
|
# Check if it's a known prompt name
|
|
for t_name, t_info in PROMPT_TEMPLATES.items():
|
|
if t_info["name"] == name:
|
|
tool_name = t_name
|
|
template_info = t_info
|
|
break
|
|
|
|
# If not found, check if it's a direct tool name
|
|
if not tool_name and name in TOOLS:
|
|
tool_name = name
|
|
template_info = {
|
|
"name": name,
|
|
"description": f"Use {name} tool",
|
|
"template": f"Use {name}",
|
|
}
|
|
|
|
if not tool_name:
|
|
logger.error(f"Unknown prompt requested: {name}")
|
|
raise ValueError(f"Unknown prompt: {name}")
|
|
|
|
# Get the template
|
|
template = template_info.get("template", f"Use {tool_name}")
|
|
|
|
# Safe template expansion with defaults
|
|
final_model = arguments.get("model", "auto") if arguments else "auto"
|
|
|
|
prompt_args = {
|
|
"model": final_model,
|
|
"thinking_mode": arguments.get("thinking_mode", "medium") if arguments else "medium",
|
|
}
|
|
|
|
logger.debug(f"Using model '{final_model}' for prompt '{name}'")
|
|
|
|
# Safely format the template
|
|
try:
|
|
prompt_text = template.format(**prompt_args)
|
|
except KeyError as e:
|
|
logger.warning(f"Missing template argument {e} for prompt {name}, using raw template")
|
|
prompt_text = template # Fallback to raw template
|
|
|
|
# Generate tool call instruction
|
|
if name.lower() == "continue":
|
|
# "/zen:continue" case
|
|
tool_instruction = f"Continue the previous conversation using the {tool_name} tool"
|
|
else:
|
|
# Simple prompt case
|
|
tool_instruction = prompt_text
|
|
|
|
return GetPromptResult(
|
|
prompt=Prompt(
|
|
name=name,
|
|
description=template_info["description"],
|
|
arguments=[],
|
|
),
|
|
messages=[
|
|
PromptMessage(
|
|
role="user",
|
|
content={"type": "text", "text": tool_instruction},
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
async def main():
|
|
"""
|
|
Main entry point for the MCP server.
|
|
|
|
Initializes the Gemini API configuration and starts the server using
|
|
stdio transport. The server will continue running until the client
|
|
disconnects or an error occurs.
|
|
|
|
The server communicates via standard input/output streams using the
|
|
MCP protocol's JSON-RPC message format.
|
|
"""
|
|
# Validate and configure providers based on available API keys
|
|
configure_providers()
|
|
|
|
# Log startup message
|
|
logger.info("Zen MCP Server starting up...")
|
|
logger.info(f"Log level: {log_level}")
|
|
|
|
# Log current model mode
|
|
from config import IS_AUTO_MODE
|
|
|
|
if IS_AUTO_MODE:
|
|
logger.info("Model mode: AUTO (Claude will select the best model for each task)")
|
|
else:
|
|
logger.info(f"Model mode: Fixed model '{DEFAULT_MODEL}'")
|
|
|
|
# Import here to avoid circular imports
|
|
from config import DEFAULT_THINKING_MODE_THINKDEEP
|
|
|
|
logger.info(f"Default thinking mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}")
|
|
|
|
logger.info(f"Available tools: {list(TOOLS.keys())}")
|
|
logger.info("Server ready - waiting for tool requests...")
|
|
|
|
# Run the server using stdio transport (standard input/output)
|
|
# This allows the server to be launched by MCP clients as a subprocess
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await server.run(
|
|
read_stream,
|
|
write_stream,
|
|
InitializationOptions(
|
|
server_name="zen",
|
|
server_version=__version__,
|
|
capabilities=ServerCapabilities(
|
|
tools=ToolsCapability(), # Advertise tool support capability
|
|
prompts=PromptsCapability(), # Advertise prompt support capability
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def run():
|
|
"""Console script entry point for zen-mcp-server."""
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
# Handle graceful shutdown
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|