feat: added intelligence_score to the model capabilities schema; a 1-20 number that can be specified to influence the sort order of models presented to the CLI in auto selection mode

fix: model definition re-introduced into the schema but intelligently and only a summary is generated per tool. Required to ensure CLI calls and uses the correct model
fix: removed `model` param from some tools where this wasn't needed
fix: fixed adherence to `*_ALLOWED_MODELS` by advertising only the allowed models to the CLI
fix: removed duplicates across providers when passing canonical names back to the CLI; the first enabled provider wins
This commit is contained in:
Fahad
2025-10-02 21:43:44 +04:00
parent e78fe35a1b
commit 6cab9e56fc
22 changed files with 525 additions and 110 deletions

View File

@@ -48,8 +48,9 @@ CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS = {
),
"relevant_files": "Optional supporting files that help the consensus analysis. Must be absolute full, non-abbreviated paths.",
"models": (
"List of models to consult. Each entry may include model, stance (for/against/neutral), and stance_prompt. "
"Each (model, stance) pair must be unique, e.g. [{'model':'o3','stance':'for'}, {'model':'o3','stance':'against'}]."
"User-specified list of models to consult (provide at least two entries). "
"Each entry may include model, stance (for/against/neutral), and stance_prompt. "
"Each (model, stance) pair must be unique, e.g. [{'model':'gpt5','stance':'for'}, {'model':'pro','stance':'against'}]."
),
"current_model_index": "0-based index of the next model to consult (managed internally).",
"model_responses": "Internal log of responses gathered so far.",
@@ -233,7 +234,11 @@ of the evidence, even when it strongly points in one direction.""",
},
"required": ["model"],
},
"description": CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["models"],
"description": (
"User-specified roster of models to consult (provide at least two entries). "
+ CONSENSUS_WORKFLOW_FIELD_DESCRIPTIONS["models"]
),
"minItems": 2,
},
"current_model_index": {
"type": "integer",
@@ -268,17 +273,19 @@ of the evidence, even when it strongly points in one direction.""",
"thinking_mode", # Not used in consensus workflow
]
# Build schema with proper field exclusion
# Include model field for compatibility but don't require it
schema = WorkflowSchemaBuilder.build_schema(
requires_model = self.requires_model()
model_field_schema = self.get_model_field_schema() if requires_model else None
auto_mode = self.is_effective_auto_mode() if requires_model else False
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=consensus_field_overrides,
model_field_schema=self.get_model_field_schema(),
auto_mode=False, # Consensus doesn't require model at MCP boundary
model_field_schema=model_field_schema,
auto_mode=auto_mode,
tool_name=self.get_name(),
excluded_workflow_fields=excluded_workflow_fields,
excluded_common_fields=excluded_common_fields,
require_model=requires_model,
)
return schema
def get_required_actions(
self, step_number: int, confidence: str, findings: str, total_steps: int, request=None

View File

@@ -40,8 +40,9 @@ class ListModelsTool(BaseTool):
"""Return the JSON schema for the tool's input"""
return {
"type": "object",
"properties": {"model": {"type": "string", "description": "Model to use (ignored by listmodels tool)"}},
"properties": {},
"required": [],
"additionalProperties": False,
}
def get_annotations(self) -> Optional[dict[str, Any]]:
@@ -106,7 +107,7 @@ class ListModelsTool(BaseTool):
output_lines.append("\n**Models**:")
aliases = []
for model_name, capabilities in provider.get_all_model_capabilities().items():
for model_name, capabilities in provider.get_capabilities_by_rank():
description = capabilities.description or "No description available"
context_window = capabilities.context_window
@@ -153,33 +154,44 @@ class ListModelsTool(BaseTool):
available_models = provider.list_models(respect_restrictions=True)
registry = OpenRouterModelRegistry()
# Group by provider for better organization
providers_models = {}
for model_name in available_models: # Show ALL available models
# Try to resolve to get config details
# Group by provider and retain ranking information for consistent ordering
providers_models: dict[str, list[tuple[int, str, Optional[Any]]]] = {}
def _format_context(tokens: int) -> str:
if not tokens:
return "?"
if tokens >= 1_000_000:
return f"{tokens // 1_000_000}M"
if tokens >= 1_000:
return f"{tokens // 1_000}K"
return str(tokens)
for model_name in available_models:
config = registry.resolve(model_name)
if config:
# Extract provider from model_name
provider_name = config.model_name.split("/")[0] if "/" in config.model_name else "other"
if provider_name not in providers_models:
providers_models[provider_name] = []
providers_models[provider_name].append((model_name, config))
else:
# Model without config - add with basic info
provider_name = model_name.split("/")[0] if "/" in model_name else "other"
if provider_name not in providers_models:
providers_models[provider_name] = []
providers_models[provider_name].append((model_name, None))
provider_name = "other"
if config and "/" in config.model_name:
provider_name = config.model_name.split("/")[0]
elif "/" in model_name:
provider_name = model_name.split("/")[0]
providers_models.setdefault(provider_name, [])
rank = config.get_effective_capability_rank() if config else 0
providers_models[provider_name].append((rank, model_name, config))
output_lines.append("\n**Available Models**:")
for provider_name, models in sorted(providers_models.items()):
output_lines.append(f"\n*{provider_name.title()}:*")
for alias, config in models: # Show ALL models from each provider
for rank, alias, config in sorted(models, key=lambda item: (-item[0], item[1])):
if config:
context_str = f"{config.context_window // 1000}K" if config.context_window else "?"
output_lines.append(f"- `{alias}` → `{config.model_name}` ({context_str} context)")
context_str = _format_context(config.context_window)
suffix_parts = [f"{context_str} context"]
if getattr(config, "supports_extended_thinking", False):
suffix_parts.append("thinking")
suffix = ", ".join(suffix_parts)
output_lines.append(f"- `{alias}` → `{config.model_name}` (score {rank}, {suffix})")
else:
output_lines.append(f"- `{alias}`")
output_lines.append(f"- `{alias}` (score {rank})")
total_models = len(available_models)
# Show all models - no truncation message needed

View File

@@ -291,13 +291,161 @@ class BaseTool(ABC):
def _format_available_models_list(self) -> str:
"""Return a human-friendly list of available models or guidance when none found."""
available_models = self._get_available_models()
if not available_models:
summaries, total, has_restrictions = self._get_ranked_model_summaries()
if not summaries:
return (
"No models detected. Configure provider credentials or set DEFAULT_MODEL to a valid option. "
"If the user requested a specific model, respond with this notice instead of substituting another model."
)
return ", ".join(available_models)
display = "; ".join(summaries)
remainder = total - len(summaries)
if remainder > 0:
display = f"{display}; +{remainder} more (use the `listmodels` tool for the full roster)"
return display
@staticmethod
def _format_context_window(tokens: int) -> Optional[str]:
"""Convert a raw context window into a short display string."""
if not tokens or tokens <= 0:
return None
if tokens >= 1_000_000:
if tokens % 1_000_000 == 0:
return f"{tokens // 1_000_000}M ctx"
return f"{tokens / 1_000_000:.1f}M ctx"
if tokens >= 1_000:
if tokens % 1_000 == 0:
return f"{tokens // 1_000}K ctx"
return f"{tokens / 1_000:.1f}K ctx"
return f"{tokens} ctx"
def _collect_ranked_capabilities(self) -> list[tuple[int, str, Any]]:
"""Gather available model capabilities sorted by capability rank."""
from providers.registry import ModelProviderRegistry
ranked: list[tuple[int, str, Any]] = []
available = ModelProviderRegistry.get_available_models(respect_restrictions=True)
for model_name, provider_type in available.items():
provider = ModelProviderRegistry.get_provider(provider_type)
if not provider:
continue
try:
capabilities = provider.get_capabilities(model_name)
except ValueError:
continue
rank = capabilities.get_effective_capability_rank()
ranked.append((rank, model_name, capabilities))
ranked.sort(key=lambda item: (-item[0], item[1]))
return ranked
@staticmethod
def _normalize_model_identifier(name: str) -> str:
"""Normalize model names for deduplication across providers."""
normalized = name.lower()
if ":" in normalized:
normalized = normalized.split(":", 1)[0]
if "/" in normalized:
normalized = normalized.split("/", 1)[-1]
return normalized
def _get_ranked_model_summaries(self, limit: int = 5) -> tuple[list[str], int, bool]:
"""Return formatted, ranked model summaries and restriction status."""
ranked = self._collect_ranked_capabilities()
# Build allowlist map (provider -> lowercase names) when restrictions are active
allowed_map: dict[Any, set[str]] = {}
try:
from utils.model_restrictions import get_restriction_service
restriction_service = get_restriction_service()
if restriction_service:
from providers.shared import ProviderType
for provider_type in ProviderType:
allowed = restriction_service.get_allowed_models(provider_type)
if allowed:
allowed_map[provider_type] = {name.lower() for name in allowed if name}
except Exception:
allowed_map = {}
filtered: list[tuple[int, str, Any]] = []
seen_normalized: set[str] = set()
for rank, model_name, capabilities in ranked:
canonical_name = getattr(capabilities, "model_name", model_name)
canonical_lower = canonical_name.lower()
alias_lower = model_name.lower()
provider_type = getattr(capabilities, "provider", None)
if allowed_map:
if provider_type not in allowed_map:
continue
allowed_set = allowed_map[provider_type]
if canonical_lower not in allowed_set and alias_lower not in allowed_set:
continue
normalized = self._normalize_model_identifier(canonical_name)
if normalized in seen_normalized:
continue
seen_normalized.add(normalized)
filtered.append((rank, canonical_name, capabilities))
summaries: list[str] = []
for rank, canonical_name, capabilities in filtered[:limit]:
details: list[str] = []
context_str = self._format_context_window(getattr(capabilities, "context_window", 0))
if context_str:
details.append(context_str)
if getattr(capabilities, "supports_extended_thinking", False):
details.append("thinking")
base = f"{canonical_name} (score {rank}"
if details:
base = f"{base}, {', '.join(details)}"
summaries.append(f"{base})")
return summaries, len(filtered), bool(allowed_map)
def _get_restriction_note(self) -> Optional[str]:
"""Return a string describing active per-provider allowlists, if any."""
env_labels = {
"OPENAI_ALLOWED_MODELS": "OpenAI",
"GOOGLE_ALLOWED_MODELS": "Google",
"XAI_ALLOWED_MODELS": "X.AI",
"OPENROUTER_ALLOWED_MODELS": "OpenRouter",
"DIAL_ALLOWED_MODELS": "DIAL",
}
notes: list[str] = []
for env_var, label in env_labels.items():
raw = os.getenv(env_var)
if not raw:
continue
models = sorted({token.strip() for token in raw.split(",") if token.strip()})
if not models:
continue
notes.append(f"{label}: {', '.join(models)}")
if not notes:
return None
return "Policy allows only → " + "; ".join(notes)
def _build_model_unavailable_message(self, model_name: str) -> str:
"""Compose a consistent error message for unavailable model scenarios."""
@@ -344,8 +492,23 @@ class BaseTool(ABC):
if self.is_effective_auto_mode():
description = (
"Currently in auto model selection mode. CRITICAL: When the user names a model, you MUST use that exact name unless the server rejects it. "
"If no model is provided, you may call the `listmodels` tool to review options and select an appropriate match."
"If no model is provided, you may use the `listmodels` tool to review options and select an appropriate match."
)
summaries, total, restricted = self._get_ranked_model_summaries()
remainder = max(0, total - len(summaries))
if summaries:
top_line = "; ".join(summaries)
if remainder > 0:
label = "Allowed models" if restricted else "Top models"
top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
else:
label = "Allowed models" if restricted else "Top models"
top_line = f"{label}: {top_line}."
description = f"{description} {top_line}"
restriction_note = self._get_restriction_note()
if restriction_note and (remainder > 0 or not summaries):
description = f"{description} {restriction_note}."
return {
"type": "string",
"description": description,
@@ -353,8 +516,23 @@ class BaseTool(ABC):
description = (
f"The default model is '{DEFAULT_MODEL}'. Override only when the user explicitly requests a different model, and use that exact name. "
"If the requested model fails validation, surface the server error instead of substituting another model. When unsure, call the `listmodels` tool for details."
"If the requested model fails validation, surface the server error instead of substituting another model. When unsure, use the `listmodels` tool for details."
)
summaries, total, restricted = self._get_ranked_model_summaries()
remainder = max(0, total - len(summaries))
if summaries:
top_line = "; ".join(summaries)
if remainder > 0:
label = "Allowed models" if restricted else "Preferred alternatives"
top_line = f"{label}: {top_line}; +{remainder} more via `listmodels`."
else:
label = "Allowed models" if restricted else "Preferred alternatives"
top_line = f"{label}: {top_line}."
description = f"{description} {top_line}"
restriction_note = self._get_restriction_note()
if restriction_note and (remainder > 0 or not summaries):
description = f"{description} {restriction_note}."
return {
"type": "string",
@@ -1242,31 +1420,6 @@ When recommending searches, be specific about what information you need and why
import base64
from pathlib import Path
# Handle legacy calls (positional model_name string)
if isinstance(model_context, str):
# Legacy call: _validate_image_limits(images, "model-name")
logger.warning(
"Legacy _validate_image_limits call with model_name string. Use model_context object instead."
)
try:
from utils.model_context import ModelContext
model_context = ModelContext(model_context)
except Exception as e:
logger.warning(f"Failed to create model context from legacy model_name: {e}")
# Generic error response for any unavailable model
return {
"status": "error",
"content": self._build_model_unavailable_message(str(model_context)),
"content_type": "text",
"metadata": {
"error_type": "validation_error",
"model_name": model_context,
"supports_images": None, # Unknown since model doesn't exist
"image_count": len(images) if images else 0,
},
}
if not model_context:
# Get from tool's stored context as fallback
model_context = getattr(self, "_model_context", None)

View File

@@ -146,8 +146,9 @@ class VersionTool(BaseTool):
"""Return the JSON schema for the tool's input"""
return {
"type": "object",
"properties": {"model": {"type": "string", "description": "Model to use (ignored by version tool)"}},
"properties": {},
"required": [],
"additionalProperties": False,
}
def get_annotations(self) -> Optional[dict[str, Any]]:

View File

@@ -139,12 +139,16 @@ class WorkflowTool(BaseTool, BaseWorkflowMixin):
Returns:
Complete JSON schema for the workflow tool
"""
requires_model = self.requires_model()
model_field_schema = self.get_model_field_schema() if requires_model else None
auto_mode = self.is_effective_auto_mode() if requires_model else False
return WorkflowSchemaBuilder.build_schema(
tool_specific_fields=self.get_tool_fields(),
required_fields=self.get_required_fields(),
model_field_schema=self.get_model_field_schema(),
auto_mode=self.is_effective_auto_mode(),
model_field_schema=model_field_schema,
auto_mode=auto_mode,
tool_name=self.get_name(),
require_model=requires_model,
)
def get_workflow_request_model(self):