796 lines
35 KiB
Python
796 lines
35 KiB
Python
"""
|
|
Consensus tool for multi-model perspective gathering and validation
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any, Optional
|
|
|
|
from mcp.types import TextContent
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
if TYPE_CHECKING:
|
|
from tools.models import ToolModelCategory
|
|
|
|
from config import DEFAULT_CONSENSUS_MAX_INSTANCES_PER_COMBINATION
|
|
from systemprompts import CONSENSUS_PROMPT
|
|
|
|
from .base import BaseTool, ToolRequest
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ModelConfig(BaseModel):
|
|
"""Enhanced model configuration for consensus tool"""
|
|
|
|
model: str = Field(..., description="Model name to use (e.g., 'o3', 'flash', 'pro')")
|
|
stance: Optional[str] = Field(
|
|
default="neutral",
|
|
description=(
|
|
"Stance for this model. Supportive: 'for', 'support', 'favor'. "
|
|
"Critical: 'against', 'oppose', 'critical'. Neutral: 'neutral'. "
|
|
"Defaults to 'neutral'."
|
|
),
|
|
)
|
|
stance_prompt: Optional[str] = Field(
|
|
default=None,
|
|
description=(
|
|
"Custom stance-specific instructions for this model. "
|
|
"If provided, this will be used instead of the default stance prompt. "
|
|
"Should be clear, specific instructions about how this model should approach the analysis."
|
|
),
|
|
)
|
|
|
|
|
|
class ConsensusRequest(ToolRequest):
|
|
"""Request model for consensus tool"""
|
|
|
|
prompt: str = Field(
|
|
...,
|
|
description=(
|
|
"Description of what to get consensus on, testing objectives, and specific scope/focus areas. "
|
|
"Be as detailed as possible about the proposal, plan, or idea you want multiple perspectives on."
|
|
),
|
|
)
|
|
models: list[ModelConfig] = Field(
|
|
...,
|
|
description=(
|
|
"List of model configurations for consensus analysis. Each model can have a specific stance and custom instructions. "
|
|
"Example: [{'model': 'o3', 'stance': 'for', 'stance_prompt': 'Focus on benefits and opportunities...'}, "
|
|
"{'model': 'flash', 'stance': 'against', 'stance_prompt': 'Identify risks and challenges...'}]. "
|
|
"Maximum 2 instances per model+stance combination."
|
|
),
|
|
)
|
|
files: Optional[list[str]] = Field(
|
|
default_factory=list,
|
|
description="Optional files or directories for additional context (must be absolute paths)",
|
|
)
|
|
images: Optional[list[str]] = Field(
|
|
default_factory=list,
|
|
description=(
|
|
"Optional images showing expected UI changes, design requirements, "
|
|
"or visual references for the consensus analysis"
|
|
),
|
|
)
|
|
focus_areas: Optional[list[str]] = Field(
|
|
default_factory=list,
|
|
description="Specific aspects to focus on (e.g., 'performance', 'security', 'user experience')",
|
|
)
|
|
|
|
@field_validator("models")
|
|
@classmethod
|
|
def validate_models_not_empty(cls, v):
|
|
if not v:
|
|
raise ValueError("At least one model must be specified")
|
|
return v
|
|
|
|
|
|
class ConsensusTool(BaseTool):
|
|
"""Multi-model consensus tool for gathering diverse perspectives on technical proposals"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def get_name(self) -> str:
|
|
return "consensus"
|
|
|
|
def get_description(self) -> str:
|
|
return (
|
|
"MULTI-MODEL CONSENSUS - Gather diverse perspectives from multiple AI models on technical proposals, "
|
|
"plans, and ideas. Perfect for validation, feasibility assessment, and getting comprehensive "
|
|
"viewpoints on complex decisions. Supports advanced stance steering with custom instructions for each model. "
|
|
"You can specify different stances (for/against/neutral) and provide custom stance prompts to guide each "
|
|
"model's analysis. Example: [{'model': 'o3', 'stance': 'for', 'stance_prompt': 'Focus on implementation "
|
|
"benefits and user value'}, {'model': 'flash', 'stance': 'against', 'stance_prompt': 'Identify potential "
|
|
"risks and technical challenges'}]. Use neutral stances by default unless structured debate would add value."
|
|
)
|
|
|
|
def get_input_schema(self) -> dict[str, Any]:
|
|
schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"prompt": {
|
|
"type": "string",
|
|
"description": (
|
|
"Description of what to get consensus on, testing objectives, and specific scope/focus areas. "
|
|
"Be as detailed as possible about the proposal, plan, or idea you want multiple perspectives on."
|
|
),
|
|
},
|
|
"models": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"model": {
|
|
"type": "string",
|
|
"description": "Model name to use (e.g., 'o3', 'flash', 'pro')",
|
|
},
|
|
"stance": {
|
|
"type": "string",
|
|
"enum": ["for", "support", "favor", "against", "oppose", "critical", "neutral"],
|
|
"description": "Stance for this model: supportive ('for', 'support', 'favor'), critical ('against', 'oppose', 'critical'), or 'neutral'",
|
|
"default": "neutral",
|
|
},
|
|
"stance_prompt": {
|
|
"type": "string",
|
|
"description": "Custom stance-specific instructions for this model. If provided, this will be used instead of the default stance prompt.",
|
|
},
|
|
},
|
|
"required": ["model"],
|
|
},
|
|
"description": (
|
|
"List of model configurations for consensus analysis. Each model can have a specific stance and custom instructions. "
|
|
"Example: [{'model': 'o3', 'stance': 'for', 'stance_prompt': 'Focus on benefits and opportunities...'}, "
|
|
"{'model': 'flash', 'stance': 'against', 'stance_prompt': 'Identify risks and challenges...'}]. "
|
|
"Maximum 2 instances per model+stance combination."
|
|
),
|
|
},
|
|
"files": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Optional files or directories for additional context (must be absolute paths)",
|
|
},
|
|
"images": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": (
|
|
"Optional images showing expected UI changes, design requirements, "
|
|
"or visual references for the consensus analysis"
|
|
),
|
|
},
|
|
"focus_areas": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Specific aspects to focus on (e.g., 'performance', 'security', 'user experience')",
|
|
},
|
|
"temperature": {
|
|
"type": "number",
|
|
"description": "Temperature (0-1, default 0.2 for consistency)",
|
|
"minimum": 0,
|
|
"maximum": 1,
|
|
"default": self.get_default_temperature(),
|
|
},
|
|
"thinking_mode": {
|
|
"type": "string",
|
|
"enum": ["minimal", "low", "medium", "high", "max"],
|
|
"description": (
|
|
"Thinking depth: minimal (0.5% of model max), low (8%), medium (33%), "
|
|
"high (67%), max (100% of model max)"
|
|
),
|
|
},
|
|
"use_websearch": {
|
|
"type": "boolean",
|
|
"description": (
|
|
"Enable web search for documentation, best practices, and current information. "
|
|
"Particularly useful for: brainstorming sessions, architectural design discussions, "
|
|
"exploring industry best practices, working with specific frameworks/technologies, "
|
|
"researching solutions to complex problems, or when current documentation and "
|
|
"community insights would enhance the analysis."
|
|
),
|
|
"default": True,
|
|
},
|
|
"continuation_id": {
|
|
"type": "string",
|
|
"description": (
|
|
"Thread continuation ID for multi-turn conversations. Can be used to continue "
|
|
"conversations across different tools. Only provide this if continuing a previous "
|
|
"conversation thread."
|
|
),
|
|
},
|
|
},
|
|
"required": ["prompt", "models"],
|
|
}
|
|
|
|
return schema
|
|
|
|
def get_system_prompt(self) -> str:
|
|
return CONSENSUS_PROMPT
|
|
|
|
def get_default_temperature(self) -> float:
|
|
return 0.2 # Lower temperature for more consistent consensus responses
|
|
|
|
def get_model_category(self) -> "ToolModelCategory":
|
|
"""Consensus uses extended reasoning models for deep analysis"""
|
|
from tools.models import ToolModelCategory
|
|
|
|
return ToolModelCategory.EXTENDED_REASONING
|
|
|
|
def get_request_model(self):
|
|
return ConsensusRequest
|
|
|
|
def format_conversation_turn(self, turn) -> list[str]:
|
|
"""
|
|
Format consensus turns with individual model responses for better readability.
|
|
|
|
This custom formatting shows the individual model responses that were
|
|
synthesized into the consensus, making it easier to understand the
|
|
reasoning behind the final recommendation.
|
|
"""
|
|
parts = []
|
|
|
|
# Add files context if present
|
|
if turn.files:
|
|
parts.append(f"Files used in this turn: {', '.join(turn.files)}")
|
|
parts.append("")
|
|
|
|
# Check if this is a consensus turn with individual responses
|
|
if turn.model_metadata and turn.model_metadata.get("individual_responses"):
|
|
individual_responses = turn.model_metadata["individual_responses"]
|
|
|
|
# Add consensus header
|
|
models_consulted = []
|
|
for resp in individual_responses:
|
|
model = resp["model"]
|
|
stance = resp.get("stance", "neutral")
|
|
if stance != "neutral":
|
|
models_consulted.append(f"{model}:{stance}")
|
|
else:
|
|
models_consulted.append(model)
|
|
|
|
parts.append(f"Models consulted: {', '.join(models_consulted)}")
|
|
parts.append("")
|
|
parts.append("=== INDIVIDUAL MODEL RESPONSES ===")
|
|
parts.append("")
|
|
|
|
# Add each successful model response
|
|
for i, response in enumerate(individual_responses):
|
|
model_name = response["model"]
|
|
stance = response.get("stance", "neutral")
|
|
verdict = response["verdict"]
|
|
|
|
stance_label = f"({stance.title()} Stance)" if stance != "neutral" else "(Neutral Analysis)"
|
|
parts.append(f"**{model_name.upper()} {stance_label}**:")
|
|
parts.append(verdict)
|
|
|
|
if i < len(individual_responses) - 1:
|
|
parts.append("")
|
|
parts.append("---")
|
|
parts.append("")
|
|
|
|
parts.append("=== END INDIVIDUAL RESPONSES ===")
|
|
parts.append("")
|
|
parts.append("Claude's Synthesis:")
|
|
|
|
# Add the actual content
|
|
parts.append(turn.content)
|
|
|
|
return parts
|
|
|
|
def _normalize_stance(self, stance: Optional[str]) -> str:
|
|
"""Normalize stance to canonical form."""
|
|
if not stance:
|
|
return "neutral"
|
|
|
|
stance = stance.lower()
|
|
|
|
# Define stance synonyms
|
|
supportive_stances = {"for", "support", "favor"}
|
|
critical_stances = {"against", "oppose", "critical"}
|
|
|
|
# Map synonyms to canonical stance
|
|
if stance in supportive_stances:
|
|
return "for"
|
|
elif stance in critical_stances:
|
|
return "against"
|
|
elif stance == "neutral":
|
|
return "neutral"
|
|
else:
|
|
# Unknown stances default to neutral for robustness
|
|
logger.warning(
|
|
f"Unknown stance '{stance}' provided, defaulting to 'neutral'. Valid stances: {', '.join(sorted(supportive_stances | critical_stances))}, or 'neutral'"
|
|
)
|
|
return "neutral"
|
|
|
|
def _validate_model_combinations(self, model_configs: list[ModelConfig]) -> tuple[list[ModelConfig], list[str]]:
|
|
"""Validate model configurations and enforce limits.
|
|
|
|
Returns:
|
|
tuple: (valid_configs, skipped_entries)
|
|
- Each model+stance combination can appear max 2 times
|
|
- Same model+stance limited to 2 instances
|
|
"""
|
|
valid_configs = []
|
|
skipped_entries = []
|
|
combination_counts = {} # Track (model, stance) -> count
|
|
|
|
for config in model_configs:
|
|
try:
|
|
# Normalize stance
|
|
normalized_stance = self._normalize_stance(config.stance)
|
|
|
|
# Create normalized config
|
|
normalized_config = ModelConfig(
|
|
model=config.model, stance=normalized_stance, stance_prompt=config.stance_prompt
|
|
)
|
|
|
|
combination_key = (config.model, normalized_stance)
|
|
current_count = combination_counts.get(combination_key, 0)
|
|
|
|
if current_count >= DEFAULT_CONSENSUS_MAX_INSTANCES_PER_COMBINATION:
|
|
# Already have max instances of this model+stance combination
|
|
skipped_entries.append(
|
|
f"{config.model}:{normalized_stance} (max {DEFAULT_CONSENSUS_MAX_INSTANCES_PER_COMBINATION} instances)"
|
|
)
|
|
continue
|
|
|
|
combination_counts[combination_key] = current_count + 1
|
|
valid_configs.append(normalized_config)
|
|
|
|
except ValueError as e:
|
|
# Invalid stance or model
|
|
skipped_entries.append(f"{config.model} ({str(e)})")
|
|
continue
|
|
|
|
return valid_configs, skipped_entries
|
|
|
|
def _get_stance_enhanced_prompt(self, stance: str, custom_stance_prompt: Optional[str] = None) -> str:
|
|
"""Get the system prompt with stance injection based on the stance."""
|
|
base_prompt = self.get_system_prompt()
|
|
|
|
# If custom stance prompt is provided, use it instead of default
|
|
if custom_stance_prompt:
|
|
# Validate stance placeholder exists exactly once
|
|
if base_prompt.count("{stance_prompt}") != 1:
|
|
raise ValueError(
|
|
"System prompt must contain exactly one '{stance_prompt}' placeholder, "
|
|
f"found {base_prompt.count('{stance_prompt}')}"
|
|
)
|
|
return base_prompt.replace("{stance_prompt}", custom_stance_prompt)
|
|
|
|
stance_prompts = {
|
|
"for": """SUPPORTIVE PERSPECTIVE WITH INTEGRITY
|
|
|
|
You are tasked with advocating FOR this proposal, but with CRITICAL GUARDRAILS:
|
|
|
|
MANDATORY ETHICAL CONSTRAINTS:
|
|
- This is NOT a debate for entertainment. You MUST act in good faith and in the best interest of the questioner
|
|
- You MUST think deeply about whether supporting this idea is safe, sound, and passes essential requirements
|
|
- You MUST be direct and unequivocal in saying "this is a bad idea" when it truly is
|
|
- There must be at least ONE COMPELLING reason to be optimistic, otherwise DO NOT support it
|
|
|
|
WHEN TO REFUSE SUPPORT (MUST OVERRIDE STANCE):
|
|
- If the idea is fundamentally harmful to users, project, or stakeholders
|
|
- If implementation would violate security, privacy, or ethical standards
|
|
- If the proposal is technically infeasible within realistic constraints
|
|
- If costs/risks dramatically outweigh any potential benefits
|
|
|
|
YOUR SUPPORTIVE ANALYSIS SHOULD:
|
|
- Identify genuine strengths and opportunities
|
|
- Propose solutions to overcome legitimate challenges
|
|
- Highlight synergies with existing systems
|
|
- Suggest optimizations that enhance value
|
|
- Present realistic implementation pathways
|
|
|
|
Remember: Being "for" means finding the BEST possible version of the idea IF it has merit, not blindly supporting bad ideas.""",
|
|
"against": """CRITICAL PERSPECTIVE WITH RESPONSIBILITY
|
|
|
|
You are tasked with critiquing this proposal, but with ESSENTIAL BOUNDARIES:
|
|
|
|
MANDATORY FAIRNESS CONSTRAINTS:
|
|
- You MUST NOT oppose genuinely excellent, common-sense ideas just to be contrarian
|
|
- You MUST acknowledge when a proposal is fundamentally sound and well-conceived
|
|
- You CANNOT give harmful advice or recommend against beneficial changes
|
|
- If the idea is outstanding, say so clearly while offering constructive refinements
|
|
|
|
WHEN TO MODERATE CRITICISM (MUST OVERRIDE STANCE):
|
|
- If the proposal addresses critical user needs effectively
|
|
- If it follows established best practices with good reason
|
|
- If benefits clearly and substantially outweigh risks
|
|
- If it's the obvious right solution to the problem
|
|
|
|
YOUR CRITICAL ANALYSIS SHOULD:
|
|
- Identify legitimate risks and failure modes
|
|
- Point out overlooked complexities
|
|
- Suggest more efficient alternatives
|
|
- Highlight potential negative consequences
|
|
- Question assumptions that may be flawed
|
|
|
|
Remember: Being "against" means rigorous scrutiny to ensure quality, not undermining good ideas that deserve support.""",
|
|
"neutral": """BALANCED PERSPECTIVE
|
|
|
|
Provide objective analysis considering both positive and negative aspects. However, if there is overwhelming evidence
|
|
that the proposal clearly leans toward being exceptionally good or particularly problematic, you MUST accurately
|
|
reflect this reality. Being "balanced" means being truthful about the weight of evidence, not artificially creating
|
|
50/50 splits when the reality is 90/10.
|
|
|
|
Your analysis should:
|
|
- Present all significant pros and cons discovered
|
|
- Weight them according to actual impact and likelihood
|
|
- If evidence strongly favors one conclusion, clearly state this
|
|
- Provide proportional coverage based on the strength of arguments
|
|
- Help the questioner see the true balance of considerations
|
|
|
|
Remember: Artificial balance that misrepresents reality is not helpful. True balance means accurate representation
|
|
of the evidence, even when it strongly points in one direction.""",
|
|
}
|
|
|
|
stance_prompt = stance_prompts.get(stance, stance_prompts["neutral"])
|
|
|
|
# Validate stance placeholder exists exactly once
|
|
if base_prompt.count("{stance_prompt}") != 1:
|
|
raise ValueError(
|
|
"System prompt must contain exactly one '{stance_prompt}' placeholder, "
|
|
f"found {base_prompt.count('{stance_prompt}')}"
|
|
)
|
|
|
|
# Inject stance into the system prompt
|
|
return base_prompt.replace("{stance_prompt}", stance_prompt)
|
|
|
|
def _get_single_response(
|
|
self, provider, model_config: ModelConfig, prompt: str, request: ConsensusRequest
|
|
) -> dict[str, Any]:
|
|
"""Get response from a single model - synchronous method."""
|
|
logger.debug(f"Getting response from {model_config.model} with stance '{model_config.stance}'")
|
|
|
|
try:
|
|
# Provider.generate_content is synchronous, not async
|
|
response = provider.generate_content(
|
|
prompt=prompt,
|
|
model_name=model_config.model,
|
|
system_prompt=self._get_stance_enhanced_prompt(model_config.stance, model_config.stance_prompt),
|
|
temperature=getattr(request, "temperature", None) or self.get_default_temperature(),
|
|
thinking_mode=getattr(request, "thinking_mode", "medium"),
|
|
images=getattr(request, "images", None) or [],
|
|
)
|
|
return {
|
|
"model": model_config.model,
|
|
"stance": model_config.stance,
|
|
"status": "success",
|
|
"verdict": response.content, # Contains structured Markdown
|
|
"metadata": {
|
|
"provider": getattr(provider.get_provider_type(), "value", provider.get_provider_type()),
|
|
"usage": response.usage if hasattr(response, "usage") else None,
|
|
"custom_stance_prompt": bool(model_config.stance_prompt),
|
|
},
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting response from {model_config.model}:{model_config.stance}: {str(e)}")
|
|
return {"model": model_config.model, "stance": model_config.stance, "status": "error", "error": str(e)}
|
|
|
|
def _get_consensus_responses(
|
|
self, provider_configs: list[tuple], prompt: str, request: ConsensusRequest
|
|
) -> list[dict[str, Any]]:
|
|
"""Execute all model requests sequentially - purely synchronous like other tools."""
|
|
|
|
logger.debug(f"Processing {len(provider_configs)} models sequentially")
|
|
responses = []
|
|
|
|
for i, (provider, model_config) in enumerate(provider_configs):
|
|
try:
|
|
logger.debug(
|
|
f"Processing {model_config.model}:{model_config.stance} sequentially ({i+1}/{len(provider_configs)})"
|
|
)
|
|
|
|
# Direct synchronous call - matches pattern of other tools
|
|
response = self._get_single_response(provider, model_config, prompt, request)
|
|
responses.append(response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get response from {model_config.model}:{model_config.stance}: {str(e)}")
|
|
responses.append(
|
|
{
|
|
"model": model_config.model,
|
|
"stance": model_config.stance,
|
|
"status": "error",
|
|
"error": f"Unhandled exception: {str(e)}",
|
|
}
|
|
)
|
|
|
|
logger.debug(f"Sequential processing completed for {len(responses)} models")
|
|
return responses
|
|
|
|
def _format_consensus_output(self, responses: list[dict[str, Any]], skipped_entries: list[str]) -> str:
|
|
"""Format the consensus responses into structured output for Claude."""
|
|
|
|
logger.debug(f"Formatting consensus output for {len(responses)} responses")
|
|
|
|
# Separate successful and failed responses
|
|
successful_responses = [r for r in responses if r["status"] == "success"]
|
|
failed_responses = [r for r in responses if r["status"] == "error"]
|
|
|
|
logger.debug(f"Successful responses: {len(successful_responses)}, Failed: {len(failed_responses)}")
|
|
|
|
# Prepare the structured output (minimize size for MCP stability)
|
|
models_used = [
|
|
f"{r['model']}:{r['stance']}" if r["stance"] != "neutral" else r["model"] for r in successful_responses
|
|
]
|
|
models_errored = [
|
|
f"{r['model']}:{r['stance']}" if r["stance"] != "neutral" else r["model"] for r in failed_responses
|
|
]
|
|
|
|
# Prepare clean responses without truncation
|
|
clean_responses = []
|
|
for r in responses:
|
|
if r["status"] == "success":
|
|
clean_responses.append(
|
|
{
|
|
"model": r["model"],
|
|
"stance": r["stance"],
|
|
"status": r["status"],
|
|
"verdict": r.get("verdict", ""),
|
|
"metadata": r.get("metadata", {}),
|
|
}
|
|
)
|
|
else:
|
|
clean_responses.append(
|
|
{
|
|
"model": r["model"],
|
|
"stance": r["stance"],
|
|
"status": r["status"],
|
|
"error": r.get("error", "Unknown error"),
|
|
}
|
|
)
|
|
|
|
output_data = {
|
|
"status": "consensus_success" if successful_responses else "consensus_failed",
|
|
"models_used": models_used,
|
|
"models_skipped": skipped_entries,
|
|
"models_errored": models_errored,
|
|
"responses": clean_responses,
|
|
"next_steps": self._get_synthesis_guidance(successful_responses, failed_responses),
|
|
}
|
|
|
|
return json.dumps(output_data, indent=2)
|
|
|
|
def _get_synthesis_guidance(
|
|
self, successful_responses: list[dict[str, Any]], failed_responses: list[dict[str, Any]]
|
|
) -> str:
|
|
"""Generate guidance for Claude on how to synthesize the consensus results."""
|
|
|
|
if not successful_responses:
|
|
return (
|
|
"No models provided successful responses. Please retry with different models or "
|
|
"check the error messages for guidance on resolving the issues."
|
|
)
|
|
|
|
if len(successful_responses) == 1:
|
|
return (
|
|
"Only one model provided a successful response. Synthesize based on the available "
|
|
"perspective and indicate areas where additional expert input would be valuable "
|
|
"due to the limited consensus data."
|
|
)
|
|
|
|
# Multiple successful responses - provide comprehensive synthesis guidance
|
|
stance_counts = {"for": 0, "against": 0, "neutral": 0}
|
|
for resp in successful_responses:
|
|
stance = resp.get("stance", "neutral")
|
|
stance_counts[stance] = stance_counts.get(stance, 0) + 1
|
|
|
|
guidance = (
|
|
"Claude, synthesize these perspectives by first identifying the key points of "
|
|
"**agreement** and **disagreement** between the models. Then provide your final, "
|
|
"consolidated recommendation, explaining how you weighed the different opinions and "
|
|
"why your proposed solution is the most balanced approach. Explicitly address the "
|
|
"most critical risks raised by each model and provide actionable next steps for implementation."
|
|
)
|
|
|
|
if failed_responses:
|
|
guidance += (
|
|
f" Note: {len(failed_responses)} model(s) failed to respond - consider this "
|
|
"partial consensus and indicate where additional expert input would strengthen the analysis."
|
|
)
|
|
|
|
return guidance
|
|
|
|
async def prepare_prompt(self, request: ConsensusRequest) -> str:
|
|
"""Prepare the consensus prompt with context files and focus areas."""
|
|
# Check for prompt.txt in files
|
|
prompt_content, updated_files = self.handle_prompt_file(request.files)
|
|
|
|
# Use prompt.txt content if available, otherwise use the prompt field
|
|
user_content = prompt_content if prompt_content else request.prompt
|
|
|
|
# Check user input size at MCP transport boundary (before adding internal content)
|
|
size_check = self.check_prompt_size(user_content)
|
|
if size_check:
|
|
# Need to return error, but prepare_prompt returns str
|
|
# Use exception to handle this cleanly
|
|
from tools.models import ToolOutput
|
|
|
|
raise ValueError(f"MCP_SIZE_CHECK:{ToolOutput(**size_check).model_dump_json()}")
|
|
|
|
# Update request files list
|
|
if updated_files is not None:
|
|
request.files = updated_files
|
|
|
|
# Add focus areas if specified
|
|
if request.focus_areas:
|
|
focus_areas_text = "\n\nSpecific focus areas for this analysis:\n" + "\n".join(
|
|
f"- {area}" for area in request.focus_areas
|
|
)
|
|
user_content += focus_areas_text
|
|
|
|
# Add context files if provided (using centralized file handling with filtering)
|
|
if request.files:
|
|
file_content, processed_files = self._prepare_file_content_for_prompt(
|
|
request.files, request.continuation_id, "Context files"
|
|
)
|
|
self._actually_processed_files = processed_files
|
|
if file_content:
|
|
user_content = f"{user_content}\n\n=== CONTEXT FILES ===\n{file_content}\n=== END CONTEXT ===="
|
|
|
|
# Check token limits
|
|
self._validate_token_limit(user_content, "Content")
|
|
|
|
return user_content
|
|
|
|
async def execute(self, arguments: dict[str, Any]) -> list[TextContent]:
|
|
"""Execute consensus gathering from multiple models."""
|
|
|
|
# Store arguments for base class methods
|
|
self._current_arguments = arguments
|
|
|
|
# Validate and create request
|
|
request = ConsensusRequest(**arguments)
|
|
|
|
# Validate model configurations and enforce limits
|
|
valid_configs, skipped_entries = self._validate_model_combinations(request.models)
|
|
|
|
if not valid_configs:
|
|
error_output = {
|
|
"status": "consensus_failed",
|
|
"error": "No valid model configurations after validation",
|
|
"models_skipped": skipped_entries,
|
|
"next_steps": "Please provide valid model configurations with proper model names and stance values.",
|
|
}
|
|
return [TextContent(type="text", text=json.dumps(error_output, indent=2))]
|
|
|
|
# Set up a dummy model context for consensus since we handle multiple models
|
|
# This is needed for base class methods like prepare_prompt to work
|
|
if not hasattr(self, "_model_context") or not self._model_context:
|
|
from utils.model_context import ModelContext
|
|
|
|
# Use the first model as the representative for token calculations
|
|
first_model = valid_configs[0].model if valid_configs else "flash"
|
|
self._model_context = ModelContext(first_model)
|
|
|
|
# Handle conversation continuation if specified
|
|
if request.continuation_id:
|
|
from utils.conversation_memory import build_conversation_history, get_thread
|
|
|
|
thread_context = get_thread(request.continuation_id)
|
|
if thread_context:
|
|
# Build conversation history using the same pattern as other tools
|
|
conversation_context, _ = build_conversation_history(thread_context, self._model_context)
|
|
if conversation_context:
|
|
# Add conversation context to the beginning of the prompt
|
|
enhanced_prompt = f"{conversation_context}\n\n{request.prompt}"
|
|
request.prompt = enhanced_prompt
|
|
|
|
# Prepare the consensus prompt
|
|
consensus_prompt = await self.prepare_prompt(request)
|
|
|
|
# Get providers for valid model configurations with caching to avoid duplicate lookups
|
|
provider_configs = []
|
|
provider_cache = {} # Cache to avoid duplicate provider lookups
|
|
|
|
for model_config in valid_configs:
|
|
try:
|
|
# Check cache first
|
|
if model_config.model in provider_cache:
|
|
provider = provider_cache[model_config.model]
|
|
else:
|
|
# Look up provider and cache it
|
|
provider = self.get_model_provider(model_config.model)
|
|
provider_cache[model_config.model] = provider
|
|
|
|
provider_configs.append((provider, model_config))
|
|
except Exception as e:
|
|
# Track failed models
|
|
model_display = (
|
|
f"{model_config.model}:{model_config.stance}"
|
|
if model_config.stance != "neutral"
|
|
else model_config.model
|
|
)
|
|
skipped_entries.append(f"{model_display} (provider not available: {str(e)})")
|
|
|
|
if not provider_configs:
|
|
error_output = {
|
|
"status": "consensus_failed",
|
|
"error": "No model providers available",
|
|
"models_skipped": skipped_entries,
|
|
"next_steps": "Please check that the specified models have configured API keys and are available.",
|
|
}
|
|
return [TextContent(type="text", text=json.dumps(error_output, indent=2))]
|
|
|
|
# Send to all models sequentially (purely synchronous like other tools)
|
|
logger.debug(f"Sending consensus request to {len(provider_configs)} models")
|
|
responses = self._get_consensus_responses(provider_configs, consensus_prompt, request)
|
|
logger.debug(f"Received {len(responses)} responses from consensus models")
|
|
|
|
# Enforce minimum success requirement - must have at least 1 successful response
|
|
successful_responses = [r for r in responses if r["status"] == "success"]
|
|
if not successful_responses:
|
|
error_output = {
|
|
"status": "consensus_failed",
|
|
"error": "All model calls failed - no successful responses received",
|
|
"models_skipped": skipped_entries,
|
|
"models_errored": [
|
|
f"{r['model']}:{r['stance']}" if r["stance"] != "neutral" else r["model"]
|
|
for r in responses
|
|
if r["status"] == "error"
|
|
],
|
|
"next_steps": "Please retry with different models or check the error messages for guidance on resolving the issues.",
|
|
}
|
|
return [TextContent(type="text", text=json.dumps(error_output, indent=2))]
|
|
|
|
logger.debug("About to format consensus output for MCP response")
|
|
|
|
# Structure the output and store in conversation memory
|
|
consensus_output = self._format_consensus_output(responses, skipped_entries)
|
|
|
|
# Log response size for debugging
|
|
output_size = len(consensus_output)
|
|
logger.debug(f"Consensus output size: {output_size:,} characters")
|
|
|
|
# Store in conversation memory if continuation_id is provided
|
|
if request.continuation_id:
|
|
self.store_conversation_turn(
|
|
request.continuation_id,
|
|
consensus_output,
|
|
request.files,
|
|
request.images,
|
|
responses, # Store individual responses in metadata
|
|
skipped_entries,
|
|
)
|
|
|
|
return [TextContent(type="text", text=consensus_output)]
|
|
|
|
def store_conversation_turn(
|
|
self,
|
|
continuation_id: str,
|
|
output: str,
|
|
files: list[str],
|
|
images: list[str],
|
|
responses: list[dict[str, Any]],
|
|
skipped_entries: list[str],
|
|
):
|
|
"""Store consensus turn in conversation memory with special metadata."""
|
|
from utils.conversation_memory import add_turn
|
|
|
|
# Filter successful and failed responses
|
|
successful_responses = [r for r in responses if r["status"] == "success"]
|
|
failed_responses = [r for r in responses if r["status"] == "error"]
|
|
|
|
# Prepare metadata for conversation storage
|
|
metadata = {
|
|
"tool_type": "consensus",
|
|
"models_used": [r["model"] for r in successful_responses],
|
|
"models_skipped": skipped_entries,
|
|
"models_errored": [r["model"] for r in failed_responses],
|
|
"individual_responses": successful_responses, # Only store successful responses
|
|
}
|
|
|
|
# Store the turn with special consensus metadata - add_turn is synchronous
|
|
add_turn(
|
|
thread_id=continuation_id,
|
|
role="assistant",
|
|
content=output,
|
|
files=files or [],
|
|
images=images or [],
|
|
tool_name="consensus",
|
|
model_provider="consensus", # Special provider name
|
|
model_name="consensus", # Special model name
|
|
model_metadata=metadata,
|
|
)
|