Files
my-pal-mcp-server/server.py
Beehive Innovations 95556ba9ea Add Consensus Tool for Multi-Model Perspective Gathering (#67)
* WIP
Refactor resolving mode_names, should be done once at MCP call boundary
Pass around model context instead
Consensus tool allows one to get a consensus from multiple models, optionally assigning one a 'for' or 'against' stance to find nuanced responses.

* Deduplication of model resolution, model_context should be available before reaching deeper parts of the code
Improved abstraction when building conversations
Throw programmer errors early

* Guardrails
Support for `model:option` format at MCP boundary so future tools can use additional options if needed instead of handling this only for consensus
Model name now supports an optional ":option" for future use

* Simplified async flow

* Improved model for request to support natural language
Simplified async flow

* Improved model for request to support natural language
Simplified async flow

* Fix consensus tool async/sync patterns to match codebase standards

CRITICAL FIXES:
- Converted _get_consensus_responses from async to sync (matches other tools)
- Converted store_conversation_turn from async to sync (add_turn is synchronous)
- Removed unnecessary asyncio imports and sleep calls
- Fixed ClosedResourceError in MCP protocol during long consensus operations

PATTERN ALIGNMENT:
- Consensus tool now follows same sync patterns as all other tools
- Only execute() and prepare_prompt() are async (base class requirement)
- All internal operations are synchronous like analyze, chat, debug, etc.

TESTING:
- MCP simulation test now passes: consensus_stance 
- Two-model consensus works correctly in ~35 seconds
- Unknown stance handling defaults to neutral with warnings
- All 9 unit tests pass (100% success rate)

The consensus tool async patterns were anomalous in the codebase.
This fix aligns it with the established synchronous patterns used
by all other tools while maintaining full functionality.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fixed call order and added new test

* Cleanup dead comments
Docs for the new tool
Improved tests

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-06-17 10:53:17 +04:00

1244 lines
52 KiB
Python

"""
Zen MCP Server - Main server implementation
This module implements the core MCP (Model Context Protocol) server that provides
AI-powered tools for code analysis, review, and assistance using multiple AI models.
The server follows the MCP specification to expose various AI tools as callable functions
that can be used by MCP clients (like Claude). Each tool provides specialized functionality
such as code review, debugging, deep thinking, and general chat capabilities.
Key Components:
- MCP Server: Handles protocol communication and tool discovery
- Tool Registry: Maps tool names to their implementations
- Request Handler: Processes incoming tool calls and returns formatted responses
- Configuration: Manages API keys and model settings
The server runs on stdio (standard input/output) and communicates using JSON-RPC messages
as defined by the MCP protocol.
"""
import asyncio
import logging
import os
import sys
import time
from datetime import datetime
from logging.handlers import RotatingFileHandler
from typing import Any, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
GetPromptResult,
Prompt,
PromptMessage,
ServerCapabilities,
TextContent,
Tool,
ToolsCapability,
)
from config import (
DEFAULT_MODEL,
__author__,
__updated__,
__version__,
)
from tools import (
AnalyzeTool,
ChatTool,
CodeReviewTool,
ConsensusTool,
DebugIssueTool,
ListModelsTool,
Precommit,
RefactorTool,
TestGenerationTool,
ThinkDeepTool,
TracerTool,
)
from tools.models import ToolOutput
# Configure logging for server operations
# Can be controlled via LOG_LEVEL environment variable (DEBUG, INFO, WARNING, ERROR)
log_level = os.getenv("LOG_LEVEL", "DEBUG").upper()
# Create timezone-aware formatter
class LocalTimeFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
"""Override to use local timezone instead of UTC"""
ct = self.converter(record.created)
if datefmt:
s = time.strftime(datefmt, ct)
else:
t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
s = f"{t},{record.msecs:03.0f}"
return s
# Configure both console and file logging
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Clear any existing handlers first
root_logger = logging.getLogger()
root_logger.handlers.clear()
# Create and configure stderr handler explicitly
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(getattr(logging, log_level, logging.INFO))
stderr_handler.setFormatter(LocalTimeFormatter(log_format))
root_logger.addHandler(stderr_handler)
# Note: MCP stdio_server interferes with stderr during tool execution
# All logs are properly written to /tmp/mcp_server.log for monitoring
# Set root logger level
root_logger.setLevel(getattr(logging, log_level, logging.INFO))
# Add rotating file handler for Docker log monitoring
try:
# Main server log with size-based rotation (20MB max per file)
# This ensures logs don't grow indefinitely and are properly managed
file_handler = RotatingFileHandler(
"/tmp/mcp_server.log",
maxBytes=20 * 1024 * 1024, # 20MB max file size
backupCount=10, # Keep 10 rotated files (200MB total)
encoding="utf-8",
)
file_handler.setLevel(getattr(logging, log_level, logging.INFO))
file_handler.setFormatter(LocalTimeFormatter(log_format))
logging.getLogger().addHandler(file_handler)
# Create a special logger for MCP activity tracking with size-based rotation
mcp_logger = logging.getLogger("mcp_activity")
mcp_file_handler = RotatingFileHandler(
"/tmp/mcp_activity.log",
maxBytes=20 * 1024 * 1024, # 20MB max file size
backupCount=5, # Keep 5 rotated files (100MB total)
encoding="utf-8",
)
mcp_file_handler.setLevel(logging.INFO)
mcp_file_handler.setFormatter(LocalTimeFormatter("%(asctime)s - %(message)s"))
mcp_logger.addHandler(mcp_file_handler)
mcp_logger.setLevel(logging.INFO)
# Ensure MCP activity also goes to stderr
mcp_logger.propagate = True
# Also keep a size-based rotation as backup (100MB max per file)
# This prevents any single day's log from growing too large
size_handler = RotatingFileHandler(
"/tmp/mcp_server_overflow.log",
maxBytes=100 * 1024 * 1024,
backupCount=3, # 100MB
)
size_handler.setLevel(logging.WARNING) # Only warnings and errors
size_handler.setFormatter(LocalTimeFormatter(log_format))
logging.getLogger().addHandler(size_handler)
except Exception as e:
print(f"Warning: Could not set up file logging: {e}", file=sys.stderr)
logger = logging.getLogger(__name__)
# Create the MCP server instance with a unique name identifier
# This name is used by MCP clients to identify and connect to this specific server
server: Server = Server("zen-server")
# Initialize the tool registry with all available AI-powered tools
# Each tool provides specialized functionality for different development tasks
# Tools are instantiated once and reused across requests (stateless design)
TOOLS = {
"thinkdeep": ThinkDeepTool(), # Extended reasoning for complex problems
"codereview": CodeReviewTool(), # Comprehensive code review and quality analysis
"debug": DebugIssueTool(), # Root cause analysis and debugging assistance
"analyze": AnalyzeTool(), # General-purpose file and code analysis
"chat": ChatTool(), # Interactive development chat and brainstorming
"consensus": ConsensusTool(), # Multi-model consensus for diverse perspectives on technical proposals
"listmodels": ListModelsTool(), # List all available AI models by provider
"precommit": Precommit(), # Pre-commit validation of git changes
"testgen": TestGenerationTool(), # Comprehensive test generation with edge case coverage
"refactor": RefactorTool(), # Intelligent code refactoring suggestions with precise line references
"tracer": TracerTool(), # Static call path prediction and control flow analysis
}
# Rich prompt templates for all tools
PROMPT_TEMPLATES = {
"thinkdeep": {
"name": "thinkdeeper",
"description": "Think deeply about the current context or problem",
"template": "Think deeper about this with {model} using {thinking_mode} thinking mode",
},
"codereview": {
"name": "review",
"description": "Perform a comprehensive code review",
"template": "Perform a comprehensive code review with {model}",
},
"debug": {
"name": "debug",
"description": "Debug an issue or error",
"template": "Help debug this issue with {model}",
},
"analyze": {
"name": "analyze",
"description": "Analyze files and code structure",
"template": "Analyze these files with {model}",
},
"chat": {
"name": "chat",
"description": "Chat and brainstorm ideas",
"template": "Chat with {model} about this",
},
"precommit": {
"name": "precommit",
"description": "Validate changes before committing",
"template": "Run precommit validation with {model}",
},
"testgen": {
"name": "testgen",
"description": "Generate comprehensive tests",
"template": "Generate comprehensive tests with {model}",
},
"refactor": {
"name": "refactor",
"description": "Refactor and improve code structure",
"template": "Refactor this code with {model}",
},
"tracer": {
"name": "tracer",
"description": "Trace code execution paths",
"template": "Generate tracer analysis with {model}",
},
"listmodels": {
"name": "listmodels",
"description": "List available AI models",
"template": "List all available models",
},
}
def configure_providers():
"""
Configure and validate AI providers based on available API keys.
This function checks for API keys and registers the appropriate providers.
At least one valid API key (Gemini or OpenAI) is required.
Raises:
ValueError: If no valid API keys are found or conflicting configurations detected
"""
from providers import ModelProviderRegistry
from providers.base import ProviderType
from providers.custom import CustomProvider
from providers.gemini import GeminiModelProvider
from providers.openai import OpenAIModelProvider
from providers.openrouter import OpenRouterProvider
from providers.xai import XAIModelProvider
from utils.model_restrictions import get_restriction_service
valid_providers = []
has_native_apis = False
has_openrouter = False
has_custom = False
# Check for Gemini API key
gemini_key = os.getenv("GEMINI_API_KEY")
if gemini_key and gemini_key != "your_gemini_api_key_here":
valid_providers.append("Gemini")
has_native_apis = True
logger.info("Gemini API key found - Gemini models available")
# Check for OpenAI API key
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key and openai_key != "your_openai_api_key_here":
valid_providers.append("OpenAI (o3)")
has_native_apis = True
logger.info("OpenAI API key found - o3 model available")
# Check for X.AI API key
xai_key = os.getenv("XAI_API_KEY")
if xai_key and xai_key != "your_xai_api_key_here":
valid_providers.append("X.AI (GROK)")
has_native_apis = True
logger.info("X.AI API key found - GROK models available")
# Check for OpenRouter API key
openrouter_key = os.getenv("OPENROUTER_API_KEY")
if openrouter_key and openrouter_key != "your_openrouter_api_key_here":
valid_providers.append("OpenRouter")
has_openrouter = True
logger.info("OpenRouter API key found - Multiple models available via OpenRouter")
# Check for custom API endpoint (Ollama, vLLM, etc.)
custom_url = os.getenv("CUSTOM_API_URL")
if custom_url:
# IMPORTANT: Always read CUSTOM_API_KEY even if empty
# - Some providers (vLLM, LM Studio, enterprise APIs) require authentication
# - Others (Ollama) work without authentication (empty key)
# - DO NOT remove this variable - it's needed for provider factory function
custom_key = os.getenv("CUSTOM_API_KEY", "") # Default to empty (Ollama doesn't need auth)
custom_model = os.getenv("CUSTOM_MODEL_NAME", "llama3.2")
valid_providers.append(f"Custom API ({custom_url})")
has_custom = True
logger.info(f"Custom API endpoint found: {custom_url} with model {custom_model}")
if custom_key:
logger.debug("Custom API key provided for authentication")
else:
logger.debug("No custom API key provided (using unauthenticated access)")
# Register providers in priority order:
# 1. Native APIs first (most direct and efficient)
if has_native_apis:
if gemini_key and gemini_key != "your_gemini_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.GOOGLE, GeminiModelProvider)
if openai_key and openai_key != "your_openai_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.OPENAI, OpenAIModelProvider)
if xai_key and xai_key != "your_xai_api_key_here":
ModelProviderRegistry.register_provider(ProviderType.XAI, XAIModelProvider)
# 2. Custom provider second (for local/private models)
if has_custom:
# Factory function that creates CustomProvider with proper parameters
def custom_provider_factory(api_key=None):
# api_key is CUSTOM_API_KEY (can be empty for Ollama), base_url from CUSTOM_API_URL
base_url = os.getenv("CUSTOM_API_URL", "")
return CustomProvider(api_key=api_key or "", base_url=base_url) # Use provided API key or empty string
ModelProviderRegistry.register_provider(ProviderType.CUSTOM, custom_provider_factory)
# 3. OpenRouter last (catch-all for everything else)
if has_openrouter:
ModelProviderRegistry.register_provider(ProviderType.OPENROUTER, OpenRouterProvider)
# Require at least one valid provider
if not valid_providers:
raise ValueError(
"At least one API configuration is required. Please set either:\n"
"- GEMINI_API_KEY for Gemini models\n"
"- OPENAI_API_KEY for OpenAI o3 model\n"
"- XAI_API_KEY for X.AI GROK models\n"
"- OPENROUTER_API_KEY for OpenRouter (multiple models)\n"
"- CUSTOM_API_URL for local models (Ollama, vLLM, etc.)"
)
logger.info(f"Available providers: {', '.join(valid_providers)}")
# Log provider priority
priority_info = []
if has_native_apis:
priority_info.append("Native APIs (Gemini, OpenAI)")
if has_custom:
priority_info.append("Custom endpoints")
if has_openrouter:
priority_info.append("OpenRouter (catch-all)")
if len(priority_info) > 1:
logger.info(f"Provider priority: {''.join(priority_info)}")
# Check and log model restrictions
restriction_service = get_restriction_service()
restrictions = restriction_service.get_restriction_summary()
if restrictions:
logger.info("Model restrictions configured:")
for provider_name, allowed_models in restrictions.items():
if isinstance(allowed_models, list):
logger.info(f" {provider_name}: {', '.join(allowed_models)}")
else:
logger.info(f" {provider_name}: {allowed_models}")
# Validate restrictions against known models
provider_instances = {}
for provider_type in [ProviderType.GOOGLE, ProviderType.OPENAI]:
provider = ModelProviderRegistry.get_provider(provider_type)
if provider:
provider_instances[provider_type] = provider
if provider_instances:
restriction_service.validate_against_known_models(provider_instances)
else:
logger.info("No model restrictions configured - all models allowed")
# Check if auto mode has any models available after restrictions
from config import IS_AUTO_MODE
if IS_AUTO_MODE:
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
if not available_models:
logger.error(
"Auto mode is enabled but no models are available after applying restrictions. "
"Please check your OPENAI_ALLOWED_MODELS and GOOGLE_ALLOWED_MODELS settings."
)
raise ValueError(
"No models available for auto mode due to restrictions. "
"Please adjust your allowed model settings or disable auto mode."
)
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""
List all available tools with their descriptions and input schemas.
This handler is called by MCP clients during initialization to discover
what tools are available. Each tool provides:
- name: Unique identifier for the tool
- description: Detailed explanation of what the tool does
- inputSchema: JSON Schema defining the expected parameters
Returns:
List of Tool objects representing all available tools
"""
logger.debug("MCP client requested tool list")
tools = []
# Add all registered AI-powered tools from the TOOLS registry
for tool in TOOLS.values():
tools.append(
Tool(
name=tool.name,
description=tool.description,
inputSchema=tool.get_input_schema(),
)
)
# Add utility tools that provide server metadata and configuration info
# These tools don't require AI processing but are useful for clients
tools.extend(
[
Tool(
name="version",
description=(
"VERSION & CONFIGURATION - Get server version, configuration details, "
"and list of available tools. Useful for debugging and understanding capabilities."
),
inputSchema={"type": "object", "properties": {}},
),
]
)
# Log cache efficiency info
if os.getenv("OPENROUTER_API_KEY") and os.getenv("OPENROUTER_API_KEY") != "your_openrouter_api_key_here":
logger.debug("OpenRouter registry cache used efficiently across all tool schemas")
logger.debug(f"Returning {len(tools)} tools to MCP client")
return tools
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""
Handle incoming tool execution requests from MCP clients.
This is the main request dispatcher that routes tool calls to their appropriate handlers.
It supports both AI-powered tools (from TOOLS registry) and utility tools (implemented as
static functions).
CONVERSATION LIFECYCLE MANAGEMENT:
This function serves as the central orchestrator for multi-turn AI-to-AI conversations:
1. THREAD RESUMPTION: When continuation_id is present, it reconstructs complete conversation
context from Redis including conversation history and file references
2. CROSS-TOOL CONTINUATION: Enables seamless handoffs between different tools (analyze →
codereview → debug) while preserving full conversation context and file references
3. CONTEXT INJECTION: Reconstructed conversation history is embedded into tool prompts
using the dual prioritization strategy:
- Files: Newest-first prioritization (recent file versions take precedence)
- Turns: Newest-first collection for token efficiency, chronological presentation for LLM
4. FOLLOW-UP GENERATION: After tool execution, generates continuation offers for ongoing
AI-to-AI collaboration with natural language instructions
STATELESS TO STATEFUL BRIDGE:
The MCP protocol is inherently stateless, but this function bridges the gap by:
- Loading persistent conversation state from Redis
- Reconstructing full multi-turn context for tool execution
- Enabling tools to access previous exchanges and file references
- Supporting conversation chains across different tool types
Args:
name: The name of the tool to execute (e.g., "analyze", "chat", "codereview")
arguments: Dictionary of arguments to pass to the tool, potentially including:
- continuation_id: UUID for conversation thread resumption
- files: File paths for analysis (subject to deduplication)
- prompt: User request or follow-up question
- model: Specific AI model to use (optional)
Returns:
List of TextContent objects containing:
- Tool's primary response with analysis/results
- Continuation offers for follow-up conversations (when applicable)
- Structured JSON responses with status and content
Raises:
ValueError: If continuation_id is invalid or conversation thread not found
Exception: For tool-specific errors or execution failures
Example Conversation Flow:
1. Claude calls analyze tool with files → creates new thread
2. Thread ID returned in continuation offer
3. Claude continues with codereview tool + continuation_id → full context preserved
4. Multiple tools can collaborate using same thread ID
"""
logger.info(f"MCP tool call: {name}")
logger.debug(f"MCP tool arguments: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"TOOL_CALL: {name} with {len(arguments)} arguments")
except Exception:
pass
# Handle thread context reconstruction if continuation_id is present
if "continuation_id" in arguments and arguments["continuation_id"]:
continuation_id = arguments["continuation_id"]
logger.debug(f"Resuming conversation thread: {continuation_id}")
logger.debug(
f"[CONVERSATION_DEBUG] Tool '{name}' resuming thread {continuation_id} with {len(arguments)} arguments"
)
logger.debug(f"[CONVERSATION_DEBUG] Original arguments keys: {list(arguments.keys())}")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"CONVERSATION_RESUME: {name} resuming thread {continuation_id}")
except Exception:
pass
arguments = await reconstruct_thread_context(arguments)
logger.debug(f"[CONVERSATION_DEBUG] After thread reconstruction, arguments keys: {list(arguments.keys())}")
if "_remaining_tokens" in arguments:
logger.debug(f"[CONVERSATION_DEBUG] Remaining token budget: {arguments['_remaining_tokens']:,}")
# Route to AI-powered tools that require Gemini API calls
if name in TOOLS:
logger.info(f"Executing tool '{name}' with {len(arguments)} parameter(s)")
tool = TOOLS[name]
# EARLY MODEL RESOLUTION AT MCP BOUNDARY
# Resolve model before passing to tool - this ensures consistent model handling
# NOTE: Consensus tool is exempt as it handles multiple models internally
from providers.registry import ModelProviderRegistry
from utils.file_utils import check_total_file_size
from utils.model_context import ModelContext
# Get model from arguments or use default
model_name = arguments.get("model") or DEFAULT_MODEL
logger.debug(f"Initial model for {name}: {model_name}")
# Parse model:option format if present
model_name, model_option = parse_model_option(model_name)
if model_option:
logger.debug(f"Parsed model format - model: '{model_name}', option: '{model_option}'")
# Consensus tool handles its own model configuration validation
# No special handling needed at server level
# Handle auto mode at MCP boundary - resolve to specific model
if model_name.lower() == "auto":
# Get tool category to determine appropriate model
tool_category = tool.get_model_category()
resolved_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
logger.info(f"Auto mode resolved to {resolved_model} for {name} (category: {tool_category.value})")
model_name = resolved_model
# Update arguments with resolved model
arguments["model"] = model_name
# Validate model availability at MCP boundary
provider = ModelProviderRegistry.get_provider_for_model(model_name)
if not provider:
# Get list of available models for error message
available_models = list(ModelProviderRegistry.get_available_models(respect_restrictions=True).keys())
tool_category = tool.get_model_category()
suggested_model = ModelProviderRegistry.get_preferred_fallback_model(tool_category)
error_message = (
f"Model '{model_name}' is not available with current API keys. "
f"Available models: {', '.join(available_models)}. "
f"Suggested model for {name}: '{suggested_model}' "
f"(category: {tool_category.value})"
)
error_output = ToolOutput(
status="error",
content=error_message,
content_type="text",
metadata={"tool_name": name, "requested_model": model_name},
)
return [TextContent(type="text", text=error_output.model_dump_json())]
# Create model context with resolved model and option
model_context = ModelContext(model_name, model_option)
arguments["_model_context"] = model_context
arguments["_resolved_model_name"] = model_name
logger.debug(
f"Model context created for {model_name} with {model_context.capabilities.context_window} token capacity"
)
if model_option:
logger.debug(f"Model option stored in context: '{model_option}'")
# EARLY FILE SIZE VALIDATION AT MCP BOUNDARY
# Check file sizes before tool execution using resolved model
if "files" in arguments and arguments["files"]:
logger.debug(f"Checking file sizes for {len(arguments['files'])} files with model {model_name}")
file_size_check = check_total_file_size(arguments["files"], model_name)
if file_size_check:
logger.warning(f"File size check failed for {name} with model {model_name}")
return [TextContent(type="text", text=ToolOutput(**file_size_check).model_dump_json())]
# Execute tool with pre-resolved model context
result = await tool.execute(arguments)
logger.info(f"Tool '{name}' execution completed")
# Log completion to activity file
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"TOOL_COMPLETED: {name}")
except Exception:
pass
return result
# Route to utility tools that provide server information
elif name == "version":
logger.info(f"Executing utility tool '{name}'")
result = await handle_version()
logger.info(f"Utility tool '{name}' execution completed")
return result
# Handle unknown tool requests gracefully
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
def parse_model_option(model_string: str) -> tuple[str, Optional[str]]:
"""
Parse model:option format into model name and option.
Args:
model_string: String that may contain "model:option" format
Returns:
tuple: (model_name, option) where option may be None
"""
if ":" in model_string and not model_string.startswith("http"): # Avoid parsing URLs
parts = model_string.split(":", 1)
model_name = parts[0].strip()
model_option = parts[1].strip() if len(parts) > 1 else None
return model_name, model_option
return model_string.strip(), None
def get_follow_up_instructions(current_turn_count: int, max_turns: int = None) -> str:
"""
Generate dynamic follow-up instructions based on conversation turn count.
Args:
current_turn_count: Current number of turns in the conversation
max_turns: Maximum allowed turns before conversation ends (defaults to MAX_CONVERSATION_TURNS)
Returns:
Follow-up instructions to append to the tool prompt
"""
if max_turns is None:
from utils.conversation_memory import MAX_CONVERSATION_TURNS
max_turns = MAX_CONVERSATION_TURNS
if current_turn_count >= max_turns - 1:
# We're at or approaching the turn limit - no more follow-ups
return """
IMPORTANT: This is approaching the final exchange in this conversation thread.
Do NOT include any follow-up questions in your response. Provide your complete
final analysis and recommendations."""
else:
# Normal follow-up instructions
remaining_turns = max_turns - current_turn_count - 1
return f"""
CONVERSATION CONTINUATION: You can continue this discussion with Claude! ({remaining_turns} exchanges remaining)
Feel free to ask clarifying questions or suggest areas for deeper exploration naturally within your response.
If something needs clarification or you'd benefit from additional context, simply mention it conversationally.
IMPORTANT: When you suggest follow-ups or ask questions, you MUST explicitly instruct Claude to use the continuation_id
to respond. Use clear, direct language based on urgency:
For optional follow-ups: "Please continue this conversation using the continuation_id from this response if you'd "
"like to explore this further."
For needed responses: "Please respond using the continuation_id from this response - your input is needed to proceed."
For essential/critical responses: "RESPONSE REQUIRED: Please immediately continue using the continuation_id from "
"this response. Cannot proceed without your clarification/input."
This ensures Claude knows both HOW to maintain the conversation thread AND whether a response is optional, "
"needed, or essential.
The tool will automatically provide a continuation_id in the structured response that Claude can use in subsequent
tool calls to maintain full conversation context across multiple exchanges.
Remember: Only suggest follow-ups when they would genuinely add value to the discussion, and always instruct "
"Claude to use the continuation_id when you do."""
async def reconstruct_thread_context(arguments: dict[str, Any]) -> dict[str, Any]:
"""
Reconstruct conversation context for stateless-to-stateful thread continuation.
This is a critical function that transforms the inherently stateless MCP protocol into
stateful multi-turn conversations. It loads persistent conversation state from Redis
and rebuilds complete conversation context using the sophisticated dual prioritization
strategy implemented in the conversation memory system.
CONTEXT RECONSTRUCTION PROCESS:
1. THREAD RETRIEVAL: Loads complete ThreadContext from Redis using continuation_id
- Includes all conversation turns with tool attribution
- Preserves file references and cross-tool context
- Handles conversation chains across multiple linked threads
2. CONVERSATION HISTORY BUILDING: Uses build_conversation_history() to create
comprehensive context with intelligent prioritization:
FILE PRIORITIZATION (Newest-First Throughout):
- When same file appears in multiple turns, newest reference wins
- File embedding prioritizes recent versions, excludes older duplicates
- Token budget management ensures most relevant files are preserved
CONVERSATION TURN PRIORITIZATION (Dual Strategy):
- Collection Phase: Processes turns newest-to-oldest for token efficiency
- Presentation Phase: Presents turns chronologically for LLM understanding
- Ensures recent context is preserved when token budget is constrained
3. CONTEXT INJECTION: Embeds reconstructed history into tool request arguments
- Conversation history becomes part of the tool's prompt context
- Files referenced in previous turns are accessible to current tool
- Cross-tool knowledge transfer is seamless and comprehensive
4. TOKEN BUDGET MANAGEMENT: Applies model-specific token allocation
- Balances conversation history vs. file content vs. response space
- Gracefully handles token limits with intelligent exclusion strategies
- Preserves most contextually relevant information within constraints
CROSS-TOOL CONTINUATION SUPPORT:
This function enables seamless handoffs between different tools:
- Analyze tool → Debug tool: Full file context and analysis preserved
- Chat tool → CodeReview tool: Conversation context maintained
- Any tool → Any tool: Complete cross-tool knowledge transfer
ERROR HANDLING & RECOVERY:
- Thread expiration: Provides clear instructions for conversation restart
- Redis unavailability: Graceful degradation with error messaging
- Invalid continuation_id: Security validation and user-friendly errors
Args:
arguments: Original request arguments dictionary containing:
- continuation_id (required): UUID of conversation thread to resume
- Other tool-specific arguments that will be preserved
Returns:
dict[str, Any]: Enhanced arguments dictionary with conversation context:
- Original arguments preserved
- Conversation history embedded in appropriate format for tool consumption
- File context from previous turns made accessible
- Cross-tool knowledge transfer enabled
Raises:
ValueError: When continuation_id is invalid, thread not found, or expired
Includes user-friendly recovery instructions
Performance Characteristics:
- O(1) thread lookup in Redis
- O(n) conversation history reconstruction where n = number of turns
- Intelligent token budgeting prevents context window overflow
- Optimized file deduplication minimizes redundant content
Example Usage Flow:
1. Claude: "Continue analyzing the security issues" + continuation_id
2. reconstruct_thread_context() loads previous analyze conversation
3. Debug tool receives full context including previous file analysis
4. Debug tool can reference specific findings from analyze tool
5. Natural cross-tool collaboration without context loss
"""
from utils.conversation_memory import add_turn, build_conversation_history, get_thread
continuation_id = arguments["continuation_id"]
# Get thread context from Redis
logger.debug(f"[CONVERSATION_DEBUG] Looking up thread {continuation_id} in Redis")
context = get_thread(continuation_id)
if not context:
logger.warning(f"Thread not found: {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread {continuation_id} not found in Redis or expired")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(f"CONVERSATION_ERROR: Thread {continuation_id} not found or expired")
except Exception:
pass
# Return error asking Claude to restart conversation with full context
raise ValueError(
f"Conversation thread '{continuation_id}' was not found or has expired. "
f"This may happen if the conversation was created more than 1 hour ago or if there was an issue "
f"with Redis storage. "
f"Please restart the conversation by providing your full question/prompt without the "
f"continuation_id parameter. "
f"This will create a new conversation thread that can continue with follow-up exchanges."
)
# Add user's new input to the conversation
user_prompt = arguments.get("prompt", "")
if user_prompt:
# Capture files referenced in this turn
user_files = arguments.get("files", [])
logger.debug(f"[CONVERSATION_DEBUG] Adding user turn to thread {continuation_id}")
from utils.token_utils import estimate_tokens
user_prompt_tokens = estimate_tokens(user_prompt)
logger.debug(
f"[CONVERSATION_DEBUG] User prompt length: {len(user_prompt)} chars (~{user_prompt_tokens:,} tokens)"
)
logger.debug(f"[CONVERSATION_DEBUG] User files: {user_files}")
success = add_turn(continuation_id, "user", user_prompt, files=user_files)
if not success:
logger.warning(f"Failed to add user turn to thread {continuation_id}")
logger.debug("[CONVERSATION_DEBUG] Failed to add user turn - thread may be at turn limit or expired")
else:
logger.debug(f"[CONVERSATION_DEBUG] Successfully added user turn to thread {continuation_id}")
# Create model context early to use for history building
from utils.model_context import ModelContext
model_context = ModelContext.from_arguments(arguments)
# Build conversation history with model-specific limits
logger.debug(f"[CONVERSATION_DEBUG] Building conversation history for thread {continuation_id}")
logger.debug(f"[CONVERSATION_DEBUG] Thread has {len(context.turns)} turns, tool: {context.tool_name}")
logger.debug(f"[CONVERSATION_DEBUG] Using model: {model_context.model_name}")
conversation_history, conversation_tokens = build_conversation_history(context, model_context)
logger.debug(f"[CONVERSATION_DEBUG] Conversation history built: {conversation_tokens:,} tokens")
logger.debug(
f"[CONVERSATION_DEBUG] Conversation history length: {len(conversation_history)} chars (~{conversation_tokens:,} tokens)"
)
# Add dynamic follow-up instructions based on turn count
follow_up_instructions = get_follow_up_instructions(len(context.turns))
logger.debug(f"[CONVERSATION_DEBUG] Follow-up instructions added for turn {len(context.turns)}")
# All tools now use standardized 'prompt' field
original_prompt = arguments.get("prompt", "")
logger.debug("[CONVERSATION_DEBUG] Extracting user input from 'prompt' field")
original_prompt_tokens = estimate_tokens(original_prompt) if original_prompt else 0
logger.debug(
f"[CONVERSATION_DEBUG] User input length: {len(original_prompt)} chars (~{original_prompt_tokens:,} tokens)"
)
# Merge original context with new prompt and follow-up instructions
if conversation_history:
enhanced_prompt = (
f"{conversation_history}\n\n=== NEW USER INPUT ===\n{original_prompt}\n\n{follow_up_instructions}"
)
else:
enhanced_prompt = f"{original_prompt}\n\n{follow_up_instructions}"
# Update arguments with enhanced context and remaining token budget
enhanced_arguments = arguments.copy()
# Store the enhanced prompt in the prompt field
enhanced_arguments["prompt"] = enhanced_prompt
logger.debug("[CONVERSATION_DEBUG] Storing enhanced prompt in 'prompt' field")
# Calculate remaining token budget based on current model
# (model_context was already created above for history building)
token_allocation = model_context.calculate_token_allocation()
# Calculate remaining tokens for files/new content
# History has already consumed some of the content budget
remaining_tokens = token_allocation.content_tokens - conversation_tokens
enhanced_arguments["_remaining_tokens"] = max(0, remaining_tokens) # Ensure non-negative
enhanced_arguments["_model_context"] = model_context # Pass context for use in tools
logger.debug("[CONVERSATION_DEBUG] Token budget calculation:")
logger.debug(f"[CONVERSATION_DEBUG] Model: {model_context.model_name}")
logger.debug(f"[CONVERSATION_DEBUG] Total capacity: {token_allocation.total_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Content allocation: {token_allocation.content_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Conversation tokens: {conversation_tokens:,}")
logger.debug(f"[CONVERSATION_DEBUG] Remaining tokens: {remaining_tokens:,}")
# Merge original context parameters (files, etc.) with new request
if context.initial_context:
logger.debug(f"[CONVERSATION_DEBUG] Merging initial context with {len(context.initial_context)} parameters")
for key, value in context.initial_context.items():
if key not in enhanced_arguments and key not in ["temperature", "thinking_mode", "model"]:
enhanced_arguments[key] = value
logger.debug(f"[CONVERSATION_DEBUG] Merged initial context param: {key}")
logger.info(f"Reconstructed context for thread {continuation_id} (turn {len(context.turns)})")
logger.debug(f"[CONVERSATION_DEBUG] Final enhanced arguments keys: {list(enhanced_arguments.keys())}")
# Debug log files in the enhanced arguments for file tracking
if "files" in enhanced_arguments:
logger.debug(f"[CONVERSATION_DEBUG] Final files in enhanced arguments: {enhanced_arguments['files']}")
# Log to activity file for monitoring
try:
mcp_activity_logger = logging.getLogger("mcp_activity")
mcp_activity_logger.info(
f"CONVERSATION_CONTINUATION: Thread {continuation_id} turn {len(context.turns)} - "
f"{len(context.turns)} previous turns loaded"
)
except Exception:
pass
return enhanced_arguments
async def handle_version() -> list[TextContent]:
"""
Get comprehensive version and configuration information about the server.
Provides details about the server version, configuration settings,
available tools, and runtime environment. Useful for debugging and
understanding the server's capabilities.
Returns:
Formatted text with version and configuration details
"""
# Import thinking mode here to avoid circular imports
from config import DEFAULT_THINKING_MODE_THINKDEEP
# Gather comprehensive server information
version_info = {
"version": __version__,
"updated": __updated__,
"author": __author__,
"default_model": DEFAULT_MODEL,
"default_thinking_mode_thinkdeep": DEFAULT_THINKING_MODE_THINKDEEP,
"max_context_tokens": "Dynamic (model-specific)",
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"server_started": datetime.now().isoformat(),
"available_tools": list(TOOLS.keys()) + ["version"],
}
# Check configured providers and available models
from providers import ModelProviderRegistry
from providers.base import ProviderType
configured_providers = []
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
# Group models by provider
models_by_provider = {}
for model_name, provider_type in available_models.items():
if provider_type not in models_by_provider:
models_by_provider[provider_type] = []
models_by_provider[provider_type].append(model_name)
# Format provider information with actual available models
if ProviderType.GOOGLE in models_by_provider:
gemini_models = ", ".join(sorted(models_by_provider[ProviderType.GOOGLE]))
configured_providers.append(f"Gemini ({gemini_models})")
if ProviderType.OPENAI in models_by_provider:
openai_models = ", ".join(sorted(models_by_provider[ProviderType.OPENAI]))
configured_providers.append(f"OpenAI ({openai_models})")
if ProviderType.XAI in models_by_provider:
xai_models = ", ".join(sorted(models_by_provider[ProviderType.XAI]))
configured_providers.append(f"X.AI ({xai_models})")
if ProviderType.CUSTOM in models_by_provider:
custom_models = ", ".join(sorted(models_by_provider[ProviderType.CUSTOM]))
custom_url = os.getenv("CUSTOM_API_URL", "")
configured_providers.append(f"Custom API ({custom_url}) - Models: {custom_models}")
if ProviderType.OPENROUTER in models_by_provider:
# For OpenRouter, show a summary since there could be many models
openrouter_count = len(models_by_provider[ProviderType.OPENROUTER])
configured_providers.append(f"OpenRouter ({openrouter_count} models via conf/custom_models.json)")
# Format the information in a human-readable way
text = f"""Zen MCP Server v{__version__}
Updated: {__updated__}
Author: {__author__}
Configuration:
- Default Model: {DEFAULT_MODEL}
- Default Thinking Mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}
- Max Context: Dynamic (model-specific)
- Python: {version_info["python_version"]}
- Started: {version_info["server_started"]}
Configured Providers:
{chr(10).join(f" - {provider}" for provider in configured_providers)}
Available Tools:
{chr(10).join(f" - {tool}" for tool in version_info["available_tools"])}
All Available Models:
{chr(10).join(f" - {model}" for model in sorted(available_models.keys()))}
For updates, visit: https://github.com/BeehiveInnovations/zen-mcp-server"""
# Create standardized tool output
tool_output = ToolOutput(status="success", content=text, content_type="text", metadata={"tool_name": "version"})
return [TextContent(type="text", text=tool_output.model_dump_json())]
@server.list_prompts()
async def handle_list_prompts() -> list[Prompt]:
"""
List all available prompts for Claude Code shortcuts.
This handler returns prompts that enable shortcuts like /zen:thinkdeeper.
We automatically generate prompts from all tools (1:1 mapping) plus add
a few marketing aliases with richer templates for commonly used tools.
Returns:
List of Prompt objects representing all available prompts
"""
logger.debug("MCP client requested prompt list")
prompts = []
# Add a prompt for each tool with rich templates
for tool_name, tool in TOOLS.items():
if tool_name in PROMPT_TEMPLATES:
# Use the rich template
template_info = PROMPT_TEMPLATES[tool_name]
prompts.append(
Prompt(
name=template_info["name"],
description=template_info["description"],
arguments=[], # MVP: no structured args
)
)
else:
# Fallback for any tools without templates (shouldn't happen)
prompts.append(
Prompt(
name=tool_name,
description=f"Use {tool.name} tool",
arguments=[],
)
)
# Add special "continue" prompt
prompts.append(
Prompt(
name="continue",
description="Continue the previous conversation using the chat tool",
arguments=[],
)
)
logger.debug(f"Returning {len(prompts)} prompts to MCP client")
return prompts
@server.get_prompt()
async def handle_get_prompt(name: str, arguments: dict[str, Any] = None) -> GetPromptResult:
"""
Get prompt details and generate the actual prompt text.
This handler is called when a user invokes a prompt (e.g., /zen:thinkdeeper or /zen:chat:o3).
It generates the appropriate text that Claude will then use to call the
underlying tool.
Supports structured prompt names like "chat:o3" where:
- "chat" is the tool name
- "o3" is the model to use
Args:
name: The name of the prompt to execute (can include model like "chat:o3")
arguments: Optional arguments for the prompt (e.g., model, thinking_mode)
Returns:
GetPromptResult with the prompt details and generated message
Raises:
ValueError: If the prompt name is unknown
"""
logger.debug(f"MCP client requested prompt: {name} with args: {arguments}")
# Parse structured prompt names like "chat:o3", "chat:continue", or "consensus:flash:for,o3:against,pro:neutral"
parsed_model = None
is_continuation = False
consensus_models = None
base_name = name
if ":" in name:
parts = name.split(":", 1)
base_name = parts[0]
second_part = parts[1]
# Check if the second part is "continue" (special keyword)
if second_part.lower() == "continue":
is_continuation = True
logger.debug(f"Parsed continuation prompt: tool='{base_name}', continue=True")
elif base_name == "consensus" and "," in second_part:
# Handle consensus tool format: "consensus:flash:for,o3:against,pro:neutral"
consensus_models = ConsensusTool.parse_structured_prompt_models(second_part)
logger.debug(f"Parsed consensus prompt with models: {consensus_models}")
else:
parsed_model = second_part
logger.debug(f"Parsed structured prompt: tool='{base_name}', model='{parsed_model}'")
# Handle special "continue" cases
if base_name.lower() == "continue":
# This is "/zen:continue" - use chat tool as default for continuation
tool_name = "chat"
is_continuation = True
template_info = {
"name": "continue",
"description": "Continue the previous conversation",
"template": "Continue the conversation",
}
logger.debug("Using /zen:continue - defaulting to chat tool with continuation")
else:
# Find the corresponding tool by checking prompt names
tool_name = None
template_info = None
# Check if it's a known prompt name (using base_name)
for t_name, t_info in PROMPT_TEMPLATES.items():
if t_info["name"] == base_name:
tool_name = t_name
template_info = t_info
break
# If not found, check if it's a direct tool name
if not tool_name and base_name in TOOLS:
tool_name = base_name
template_info = {
"name": base_name,
"description": f"Use {base_name} tool",
"template": f"Use {base_name}",
}
if not tool_name:
logger.error(f"Unknown prompt requested: {name} (base: {base_name})")
raise ValueError(f"Unknown prompt: {name}")
# Get the template
template = template_info.get("template", f"Use {tool_name}")
# Safe template expansion with defaults
# Prioritize: parsed model > arguments model > "auto"
final_model = parsed_model or (arguments.get("model", "auto") if arguments else "auto")
prompt_args = {
"model": final_model,
"thinking_mode": arguments.get("thinking_mode", "medium") if arguments else "medium",
}
logger.debug(f"Using model '{final_model}' for prompt '{name}'")
# Safely format the template
try:
prompt_text = template.format(**prompt_args)
except KeyError as e:
logger.warning(f"Missing template argument {e} for prompt {name}, using raw template")
prompt_text = template # Fallback to raw template
# Generate tool call instruction based on the type of prompt
if is_continuation:
if base_name.lower() == "continue":
# "/zen:continue" case
tool_instruction = f"Continue the previous conversation using the {tool_name} tool"
else:
# "/zen:chat:continue" case
tool_instruction = f"Continue the previous conversation using the {tool_name} tool"
elif consensus_models:
# "/zen:consensus:flash:for,o3:against,pro:neutral" case
model_descriptions = []
for model_config in consensus_models:
if model_config["stance"] != "neutral":
model_descriptions.append(f"{model_config['model']} with {model_config['stance']} stance")
else:
model_descriptions.append(f"{model_config['model']} with neutral stance")
models_text = ", ".join(model_descriptions)
models_json = str(consensus_models).replace("'", '"') # Convert to JSON-like format for Claude
tool_instruction = f"Use the {tool_name} tool with models: {models_text}. Call the consensus tool with prompt='debate this proposal' and models={models_json}"
elif parsed_model:
# "/zen:chat:o3" case
tool_instruction = f"Use the {tool_name} tool with model '{parsed_model}'"
else:
# "/zen:chat" case
tool_instruction = prompt_text
return GetPromptResult(
prompt=Prompt(
name=name,
description=template_info["description"],
arguments=[],
),
messages=[
PromptMessage(
role="user",
content={"type": "text", "text": tool_instruction},
)
],
)
async def main():
"""
Main entry point for the MCP server.
Initializes the Gemini API configuration and starts the server using
stdio transport. The server will continue running until the client
disconnects or an error occurs.
The server communicates via standard input/output streams using the
MCP protocol's JSON-RPC message format.
"""
# Validate and configure providers based on available API keys
configure_providers()
# Log startup message for Docker log monitoring
logger.info("Zen MCP Server starting up...")
logger.info(f"Log level: {log_level}")
# Log current model mode
from config import IS_AUTO_MODE
if IS_AUTO_MODE:
logger.info("Model mode: AUTO (Claude will select the best model for each task)")
else:
logger.info(f"Model mode: Fixed model '{DEFAULT_MODEL}'")
# Import here to avoid circular imports
from config import DEFAULT_THINKING_MODE_THINKDEEP
logger.info(f"Default thinking mode (ThinkDeep): {DEFAULT_THINKING_MODE_THINKDEEP}")
logger.info(f"Available tools: {list(TOOLS.keys())}")
logger.info("Server ready - waiting for tool requests...")
# Run the server using stdio transport (standard input/output)
# This allows the server to be launched by MCP clients as a subprocess
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="zen",
server_version=__version__,
capabilities=ServerCapabilities(tools=ToolsCapability()), # Advertise tool support capability
),
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
# Handle graceful shutdown
pass