Compare commits

...

16 Commits

Author SHA1 Message Date
c71a535f16 added opencode zen as provider 2025-12-25 11:08:23 +01:00
github-actions[bot]
7afc7c1cc9 chore: sync version to config.py [skip ci] 2025-12-15 17:07:31 +00:00
semantic-release
fa78edca0b chore(release): 9.8.2
Automatically generated by python-semantic-release
2025-12-15 17:07:24 +00:00
Beehive Innovations
2eb29b5a0f Merge pull request #353 from DragonFSKY/fix/path-traversal-security
fix: path traversal vulnerability in is_dangerous_path()
2025-12-15 21:06:41 +04:00
Fahad
ba08308a23 fix(security): handle macOS symlinked system dirs
Follow-up on PR #353 to keep dangerous-path blocking correct on macOS (/etc -> /private/etc) while avoiding overblocking Windows workspaces (C:\).
2025-12-15 17:02:24 +00:00
谢栋梁
e5548acb98 fix: allow home subdirectories through is_dangerous_path()
Split DANGEROUS_PATHS into two categories:
1. DANGEROUS_SYSTEM_PATHS: Block path AND all subdirectories
   (e.g., /etc, /etc/passwd, /var/log/auth.log)
2. DANGEROUS_HOME_CONTAINERS: Block ONLY exact match
   (e.g., /home is blocked but /home/user/project passes through)

This fixes the issue where /home/user/project was incorrectly blocked
by is_dangerous_path(). Subdirectory access control for home directories
is properly delegated to is_home_directory_root() in resolve_and_validate_path().

Addresses review feedback from @chatgpt-codex-connector about blocking
all home directory subpaths.
2025-12-15 20:24:44 +08:00
github-actions[bot]
c492735872 chore: sync version to config.py [skip ci] 2025-12-15 11:53:57 +00:00
semantic-release
35ffac5565 chore(release): 9.8.1
Automatically generated by python-semantic-release
2025-12-15 11:53:52 +00:00
Beehive Innovations
104d0dd421 Merge pull request #349 from brt-h/fix/openrouter-store-parameter 2025-12-15 15:53:09 +04:00
github-actions[bot]
69a42a71d1 chore: sync version to config.py [skip ci] 2025-12-15 11:38:26 +00:00
谢栋梁
df46708af9 Merge remote-tracking branch 'upstream/main' into fix/path-traversal-security 2025-12-07 00:07:59 +08:00
谢栋梁
91ffb51564 fix: use Path.is_relative_to() for cross-platform dangerous path detection
Replace string prefix matching with Path.is_relative_to() to correctly
handle Windows paths like "C:\" where trailing backslash caused double
separator issues (e.g., "C:\\" instead of "C:\").

Changes:
- Use Path.is_relative_to() for subdirectory detection (requires Python 3.9+)
- Add Windows path handling tests using PureWindowsPath
- Update test_utils.py to expect /etc/passwd to be blocked (security fix)
2025-12-05 13:53:39 +08:00
谢栋梁
9ed15f405a fix: path traversal vulnerability - use prefix matching in is_dangerous_path()
The is_dangerous_path() function only did exact string matching,
allowing attackers to bypass protection by accessing subdirectories:
- /etc was blocked but /etc/passwd was allowed
- C:\Windows was blocked but C:\Windows\System32\... was allowed

This minimal fix changes is_dangerous_path() to use PREFIX MATCHING:
- Now blocks dangerous directories AND all their subdirectories
- Paths like /etcbackup are still allowed (not under /etc)
- No changes to DANGEROUS_PATHS list

Security:
- Fixes CWE-22: Path Traversal vulnerability
- Reported by: Team off-course (K-Shield.Jr 15th)

Fixes #312
Fixes #293
2025-12-03 15:29:57 +08:00
Robert Hyman
b6a8d682d9 refactor(tests): remove unused setUp method
The setUp method created provider instances that were never used.
Each test creates its own instance inside the patch context manager,
which is the correct pattern for property mocking.
2025-11-29 01:42:27 -05:00
Robert Hyman
0c3e63c0c7 refactor(tests): address code review feedback
- Remove redundant @patch.object decorators (inner context manager suffices)
- Remove try/except blocks that could hide test failures
- Tests now fail fast if mocking is insufficient
2025-11-29 00:55:41 -05:00
Robert Hyman
1f8b58d607 fix(providers): omit store parameter for OpenRouter responses endpoint
OpenRouter's /responses endpoint rejects store:true via Zod validation.
This is an endpoint-level limitation, not model-specific. The fix
conditionally omits the store parameter for OpenRouter while maintaining
it for direct OpenAI and Azure OpenAI providers.

- Add provider type check in _generate_with_responses_endpoint
- Include debug logging when store parameter is omitted
- Add regression tests for both OpenRouter and OpenAI behavior

Fixes #348
2025-11-29 00:19:13 -05:00
21 changed files with 1383 additions and 30 deletions

View File

@@ -6,6 +6,7 @@
# IMPORTANT: Choose ONE approach: # IMPORTANT: Choose ONE approach:
# - Native APIs (Gemini/OpenAI/XAI) for direct access # - Native APIs (Gemini/OpenAI/XAI) for direct access
# - DIAL for unified enterprise access # - DIAL for unified enterprise access
# - OpenCode Zen for curated coding-focused models
# - OpenRouter for unified cloud access # - OpenRouter for unified cloud access
# Having multiple unified providers creates ambiguity about which serves each model. # Having multiple unified providers creates ambiguity about which serves each model.
# #
@@ -35,6 +36,10 @@ DIAL_API_KEY=your_dial_api_key_here
# DIAL_API_HOST=https://core.dialx.ai # Optional: Base URL without /openai suffix (auto-appended) # DIAL_API_HOST=https://core.dialx.ai # Optional: Base URL without /openai suffix (auto-appended)
# DIAL_API_VERSION=2025-01-01-preview # Optional: API version header for DIAL requests # DIAL_API_VERSION=2025-01-01-preview # Optional: API version header for DIAL requests
# Get your OpenCode Zen API key from: https://opencode.ai/auth
# Zen provides curated, tested models optimized for coding agents
ZEN_API_KEY=your_zen_api_key_here
# Option 2: Use OpenRouter for access to multiple models through one API # Option 2: Use OpenRouter for access to multiple models through one API
# Get your OpenRouter API key from: https://openrouter.ai/ # Get your OpenRouter API key from: https://openrouter.ai/
# If using OpenRouter, comment out the native API keys above # If using OpenRouter, comment out the native API keys above

View File

@@ -2,6 +2,49 @@
<!-- version list --> <!-- version list -->
## v9.8.2 (2025-12-15)
### Bug Fixes
- Allow home subdirectories through is_dangerous_path()
([`e5548ac`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/e5548acb984ca4f8b2ae8381f879a0285094257f))
- Path traversal vulnerability - use prefix matching in is_dangerous_path()
([`9ed15f4`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/9ed15f405a9462b4db7aa44ca2d989e092c008e4))
- Use Path.is_relative_to() for cross-platform dangerous path detection
([`91ffb51`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/91ffb51564e5655ec91111938039ed81e0d8e4c6))
- **security**: Handle macOS symlinked system dirs
([`ba08308`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/ba08308a23d1c1491099c5d0eae548077bd88f9f))
### Chores
- Sync version to config.py [skip ci]
([`c492735`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/c4927358720277efa0373b339bd8e06ee06498d0))
## v9.8.1 (2025-12-15)
### Bug Fixes
- **providers**: Omit store parameter for OpenRouter responses endpoint
([`1f8b58d`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/1f8b58d607c2809b9fa78860718a69207cb66e32))
### Chores
- Sync version to config.py [skip ci]
([`69a42a7`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/69a42a71d19d66f1d94d51fa27db29323e3d9a63))
### Refactoring
- **tests**: Address code review feedback
([`0c3e63c`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/0c3e63c0c7f1556f4b6686f9c6f30e4bb4a48c7c))
- **tests**: Remove unused setUp method
([`b6a8d68`](https://github.com/BeehiveInnovations/pal-mcp-server/commit/b6a8d682d920c2283724b588818bc1162a865d74))
## v9.8.0 (2025-12-15) ## v9.8.0 (2025-12-15)
### Chores ### Chores

186
conf/zen_models.json Normal file
View File

@@ -0,0 +1,186 @@
{
"_README": {
"description": "Model metadata for OpenCode Zen curated models.",
"documentation": "https://opencode.ai/docs/zen",
"usage": "Models listed here are exposed through OpenCode Zen. Aliases are case-insensitive.",
"field_notes": "Matches providers/shared/model_capabilities.py.",
"field_descriptions": {
"model_name": "The model identifier as returned by Zen API (e.g., 'gpt-5.1-codex')",
"aliases": "Array of short names users can type instead of the full model name",
"context_window": "Total number of tokens the model can process (input + output combined)",
"max_output_tokens": "Maximum number of tokens the model can generate in a single response",
"supports_extended_thinking": "Whether the model supports extended reasoning tokens",
"supports_json_mode": "Whether the model can guarantee valid JSON output",
"supports_function_calling": "Whether the model supports function/tool calling",
"supports_images": "Whether the model can process images/visual input",
"max_image_size_mb": "Maximum total size in MB for all images combined",
"supports_temperature": "Whether the model accepts temperature parameter",
"temperature_constraint": "Type of temperature constraint or omit for default range",
"description": "Human-readable description of the model",
"intelligence_score": "1-20 human rating used for auto-mode model ordering",
"allow_code_generation": "Whether this model can generate working code"
}
},
"models": [
{
"model_name": "claude-opus-4-5",
"aliases": ["zen-opus", "zen-opus4.5", "zen-claude-opus"],
"context_window": 200000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": true,
"max_image_size_mb": 5.0,
"description": "Claude Opus 4.5 via OpenCode Zen - Anthropic's frontier reasoning model for complex software engineering",
"intelligence_score": 18,
"allow_code_generation": true
},
{
"model_name": "claude-sonnet-4-5",
"aliases": ["zen-sonnet", "zen-sonnet4.5"],
"context_window": 200000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": true,
"max_image_size_mb": 5.0,
"description": "Claude Sonnet 4.5 via OpenCode Zen - Balanced performance for coding and general tasks",
"intelligence_score": 17,
"allow_code_generation": true
},
{
"model_name": "claude-haiku-4-5",
"aliases": ["zen-haiku", "zen-haiku4.5"],
"context_window": 200000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": true,
"max_image_size_mb": 5.0,
"description": "Claude Haiku 4.5 via OpenCode Zen - Fast and efficient for coding tasks",
"intelligence_score": 16,
"allow_code_generation": true
},
{
"model_name": "gpt-5.1-codex",
"aliases": ["zen-gpt-codex", "zen-codex"],
"context_window": 400000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "GPT 5.1 Codex via OpenCode Zen - Specialized for code generation and understanding",
"intelligence_score": 17,
"allow_code_generation": true
},
{
"model_name": "gpt-5.1",
"aliases": ["zen-gpt5.1"],
"context_window": 400000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "GPT 5.1 via OpenCode Zen - Latest GPT model for general AI tasks",
"intelligence_score": 16,
"allow_code_generation": true
},
{
"model_name": "gemini-3-pro",
"aliases": ["zen-gemini", "zen-gemini-pro"],
"context_window": 1000000,
"max_output_tokens": 64000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": true,
"max_image_size_mb": 10.0,
"description": "Gemini 3 Pro via OpenCode Zen - Google's multimodal model with large context",
"intelligence_score": 16,
"allow_code_generation": true
},
{
"model_name": "glm-4.6",
"aliases": ["zen-glm"],
"context_window": 205000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "GLM 4.6 via OpenCode Zen - High-performance model for coding and reasoning",
"intelligence_score": 15,
"allow_code_generation": true
},
{
"model_name": "kimi-k2",
"aliases": ["zen-kimi"],
"context_window": 400000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "Kimi K2 via OpenCode Zen - Advanced reasoning model",
"intelligence_score": 15,
"allow_code_generation": true
},
{
"model_name": "qwen3-coder",
"aliases": ["zen-qwen", "zen-qwen-coder"],
"context_window": 480000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "Qwen3 Coder via OpenCode Zen - Specialized coding model with large context",
"intelligence_score": 15,
"allow_code_generation": true
},
{
"model_name": "grok-code",
"aliases": ["zen-grok"],
"context_window": 200000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "Grok Code via OpenCode Zen - xAI's coding-focused model",
"intelligence_score": 14,
"allow_code_generation": true
},
{
"model_name": "big-pickle",
"aliases": ["zen-pickle"],
"context_window": 200000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "Big Pickle via OpenCode Zen - Stealth model for coding tasks",
"intelligence_score": 13,
"allow_code_generation": true
},
{
"model_name": "gpt-5-nano",
"aliases": ["zen-nano"],
"context_window": 400000,
"max_output_tokens": 32000,
"supports_extended_thinking": false,
"supports_json_mode": true,
"supports_function_calling": true,
"supports_images": false,
"description": "GPT 5 Nano via OpenCode Zen - Lightweight GPT model",
"intelligence_score": 12,
"allow_code_generation": true
}
]
}

View File

@@ -14,7 +14,7 @@ from utils.env import get_env
# These values are used in server responses and for tracking releases # These values are used in server responses and for tracking releases
# IMPORTANT: This is the single source of truth for version and author info # IMPORTANT: This is the single source of truth for version and author info
# Semantic versioning: MAJOR.MINOR.PATCH # Semantic versioning: MAJOR.MINOR.PATCH
__version__ = "9.7.0" __version__ = "9.8.2"
# Last update date in ISO format # Last update date in ISO format
__updated__ = "2025-12-15" __updated__ = "2025-12-15"
# Primary maintainer # Primary maintainer

View File

@@ -47,6 +47,14 @@ OPENROUTER_API_KEY=your_openrouter_api_key_here
# If using OpenRouter, comment out native API keys above # If using OpenRouter, comment out native API keys above
``` ```
**Option 2.5: OpenCode Zen (Curated coding models)**
```env
# OpenCode Zen for tested and verified coding models
ZEN_API_KEY=your_zen_api_key_here
# Get from: https://opencode.ai/auth
# Provides access to models optimized for coding agents
```
**Option 3: Custom API Endpoints (Local models)** **Option 3: Custom API Endpoints (Local models)**
```env ```env
# For Ollama, vLLM, LM Studio, etc. # For Ollama, vLLM, LM Studio, etc.

View File

@@ -8,6 +8,7 @@ from .openai_compatible import OpenAICompatibleProvider
from .openrouter import OpenRouterProvider from .openrouter import OpenRouterProvider
from .registry import ModelProviderRegistry from .registry import ModelProviderRegistry
from .shared import ModelCapabilities, ModelResponse from .shared import ModelCapabilities, ModelResponse
from .zen import ZenProvider
__all__ = [ __all__ = [
"ModelProvider", "ModelProvider",
@@ -19,4 +20,5 @@ __all__ = [
"OpenAIModelProvider", "OpenAIModelProvider",
"OpenAICompatibleProvider", "OpenAICompatibleProvider",
"OpenRouterProvider", "OpenRouterProvider",
"ZenProvider",
] ]

View File

@@ -421,9 +421,17 @@ class OpenAICompatibleProvider(ModelProvider):
"model": model_name, "model": model_name,
"input": input_messages, "input": input_messages,
"reasoning": {"effort": effort}, "reasoning": {"effort": effort},
"store": True,
} }
# Only include store parameter for providers that support it.
# OpenRouter's /responses endpoint rejects store:true via Zod validation (Issue #348).
# This is an endpoint-level limitation, not model-specific, so we omit for all
# OpenRouter /responses calls. If OpenRouter later supports store, revisit this logic.
if self.get_provider_type() != ProviderType.OPENROUTER:
completion_params["store"] = True
else:
logging.debug(f"Omitting 'store' parameter for OpenRouter provider (model: {model_name})")
# Add max tokens if specified (using max_completion_tokens for responses endpoint) # Add max tokens if specified (using max_completion_tokens for responses endpoint)
if max_output_tokens: if max_output_tokens:
completion_params["max_completion_tokens"] = max_output_tokens completion_params["max_completion_tokens"] = max_output_tokens

View File

@@ -0,0 +1,35 @@
"""OpenCode Zen model registry for managing model configurations and aliases."""
from __future__ import annotations
from ..shared import ModelCapabilities, ProviderType
from .base import CAPABILITY_FIELD_NAMES, CapabilityModelRegistry
class ZenModelRegistry(CapabilityModelRegistry):
"""Capability registry backed by ``conf/zen_models.json``."""
def __init__(self, config_path: str | None = None) -> None:
super().__init__(
env_var_name="ZEN_MODELS_CONFIG_PATH",
default_filename="zen_models.json",
provider=ProviderType.ZEN,
friendly_prefix="OpenCode Zen ({model})",
config_path=config_path,
)
def _finalise_entry(self, entry: dict) -> tuple[ModelCapabilities, dict]:
provider_override = entry.get("provider")
if isinstance(provider_override, str):
entry_provider = ProviderType(provider_override.lower())
elif isinstance(provider_override, ProviderType):
entry_provider = provider_override
else:
entry_provider = ProviderType.ZEN
entry.setdefault("friendly_name", f"OpenCode Zen ({entry['model_name']})")
filtered = {k: v for k, v in entry.items() if k in CAPABILITY_FIELD_NAMES}
filtered.setdefault("provider", entry_provider)
capability = ModelCapabilities(**filtered)
return capability, {}

View File

@@ -40,6 +40,7 @@ class ModelProviderRegistry:
ProviderType.OPENAI, # Direct OpenAI access ProviderType.OPENAI, # Direct OpenAI access
ProviderType.AZURE, # Azure-hosted OpenAI deployments ProviderType.AZURE, # Azure-hosted OpenAI deployments
ProviderType.XAI, # Direct X.AI GROK access ProviderType.XAI, # Direct X.AI GROK access
ProviderType.ZEN, # OpenCode Zen curated models
ProviderType.DIAL, # DIAL unified API access ProviderType.DIAL, # DIAL unified API access
ProviderType.CUSTOM, # Local/self-hosted models ProviderType.CUSTOM, # Local/self-hosted models
ProviderType.OPENROUTER, # Catch-all for cloud models ProviderType.OPENROUTER, # Catch-all for cloud models
@@ -336,6 +337,7 @@ class ModelProviderRegistry:
ProviderType.OPENAI: "OPENAI_API_KEY", ProviderType.OPENAI: "OPENAI_API_KEY",
ProviderType.AZURE: "AZURE_OPENAI_API_KEY", ProviderType.AZURE: "AZURE_OPENAI_API_KEY",
ProviderType.XAI: "XAI_API_KEY", ProviderType.XAI: "XAI_API_KEY",
ProviderType.ZEN: "ZEN_API_KEY",
ProviderType.OPENROUTER: "OPENROUTER_API_KEY", ProviderType.OPENROUTER: "OPENROUTER_API_KEY",
ProviderType.CUSTOM: "CUSTOM_API_KEY", # Can be empty for providers that don't need auth ProviderType.CUSTOM: "CUSTOM_API_KEY", # Can be empty for providers that don't need auth
ProviderType.DIAL: "DIAL_API_KEY", ProviderType.DIAL: "DIAL_API_KEY",

View File

@@ -15,3 +15,4 @@ class ProviderType(Enum):
OPENROUTER = "openrouter" OPENROUTER = "openrouter"
CUSTOM = "custom" CUSTOM = "custom"
DIAL = "dial" DIAL = "dial"
ZEN = "zen"

141
providers/zen.py Normal file
View File

@@ -0,0 +1,141 @@
"""OpenCode Zen provider implementation."""
import logging
from .openai_compatible import OpenAICompatibleProvider
from .registries.zen import ZenModelRegistry
from .shared import (
ModelCapabilities,
ProviderType,
)
class ZenProvider(OpenAICompatibleProvider):
"""Client for OpenCode Zen's curated model service.
Role
Surface OpenCode Zen's tested and verified models through the same interface as
native providers so tools can reference Zen models without special cases.
Characteristics
* Pulls model definitions from :class:`ZenModelRegistry`
(capabilities, metadata, pricing information)
* Reuses :class:`OpenAICompatibleProvider` infrastructure for request
execution so Zen endpoints behave like standard OpenAI-style APIs.
* Supports OpenCode Zen's curated list of coding-focused models.
"""
FRIENDLY_NAME = "OpenCode Zen"
# Model registry for managing configurations
_registry: ZenModelRegistry | None = None
def __init__(self, api_key: str, **kwargs):
"""Initialize OpenCode Zen provider.
Args:
api_key: OpenCode Zen API key
**kwargs: Additional configuration
"""
base_url = "https://opencode.ai/zen/v1"
super().__init__(api_key, base_url=base_url, **kwargs)
# Initialize model registry
if ZenProvider._registry is None:
ZenProvider._registry = ZenModelRegistry()
# Log loaded models only on first load
models = self._registry.list_models()
logging.info(f"OpenCode Zen loaded {len(models)} models")
# ------------------------------------------------------------------
# Capability surface
# ------------------------------------------------------------------
def _lookup_capabilities(
self,
canonical_name: str,
requested_name: str | None = None,
) -> ModelCapabilities | None:
"""Fetch Zen capabilities from the registry."""
capabilities = self._registry.get_capabilities(canonical_name)
if capabilities:
return capabilities
# For unknown models, return None to let base class handle error
logging.debug("Model '%s' not found in Zen registry", canonical_name)
return None
# ------------------------------------------------------------------
# Provider identity
# ------------------------------------------------------------------
def get_provider_type(self) -> ProviderType:
"""Identify this provider for restrictions and logging."""
return ProviderType.ZEN
# ------------------------------------------------------------------
# Registry helpers
# ------------------------------------------------------------------
def list_models(
self,
*,
respect_restrictions: bool = True,
include_aliases: bool = True,
lowercase: bool = False,
unique: bool = False,
) -> list[str]:
"""Return formatted Zen model names, respecting restrictions."""
if not self._registry:
return []
from utils.model_restrictions import get_restriction_service
restriction_service = get_restriction_service() if respect_restrictions else None
allowed_configs: dict[str, ModelCapabilities] = {}
for model_name in self._registry.list_models():
config = self._registry.resolve(model_name)
if not config:
continue
if restriction_service:
if not restriction_service.is_allowed(self.get_provider_type(), model_name):
continue
allowed_configs[model_name] = config
if not allowed_configs:
return []
return ModelCapabilities.collect_model_names(
allowed_configs,
include_aliases=include_aliases,
lowercase=lowercase,
unique=unique,
)
def _resolve_model_name(self, model_name: str) -> str:
"""Resolve aliases defined in the Zen registry."""
config = self._registry.resolve(model_name)
if config and config.model_name != model_name:
logging.debug("Resolved Zen model alias '%s' to '%s'", model_name, config.model_name)
return config.model_name
return model_name
def get_all_model_capabilities(self) -> dict[str, ModelCapabilities]:
"""Expose registry-backed Zen capabilities."""
if not self._registry:
return {}
capabilities: dict[str, ModelCapabilities] = {}
for model_name in self._registry.list_models():
config = self._registry.resolve(model_name)
if config:
capabilities[model_name] = config
return capabilities

View File

@@ -1,8 +1,8 @@
[project] [project]
name = "pal-mcp-server" name = "pal-mcp-server"
version = "9.8.0" version = "9.8.2"
description = "AI-powered MCP server with multiple model providers" description = "AI-powered MCP server with multiple model providers"
requires-python = ">=3.9" requires-python = ">=3.10"
dependencies = [ dependencies = [
"mcp>=1.0.0", "mcp>=1.0.0",
"google-genai>=1.19.0", "google-genai>=1.19.0",

View File

@@ -400,11 +400,13 @@ def configure_providers():
from providers.openrouter import OpenRouterProvider from providers.openrouter import OpenRouterProvider
from providers.shared import ProviderType from providers.shared import ProviderType
from providers.xai import XAIModelProvider from providers.xai import XAIModelProvider
from providers.zen import ZenProvider
from utils.model_restrictions import get_restriction_service from utils.model_restrictions import get_restriction_service
valid_providers = [] valid_providers = []
has_native_apis = False has_native_apis = False
has_openrouter = False has_openrouter = False
has_zen = False
has_custom = False has_custom = False
# Check for Gemini API key # Check for Gemini API key
@@ -475,6 +477,19 @@ def configure_providers():
else: else:
logger.debug("OpenRouter API key is placeholder value") logger.debug("OpenRouter API key is placeholder value")
# Check for OpenCode Zen API key
zen_key = get_env("ZEN_API_KEY")
logger.debug(f"OpenCode Zen key check: key={'[PRESENT]' if zen_key else '[MISSING]'}")
if zen_key and zen_key != "your_zen_api_key_here":
valid_providers.append("OpenCode Zen")
has_zen = True
logger.info("OpenCode Zen API key found - Curated models available via Zen")
else:
if not zen_key:
logger.debug("OpenCode Zen API key not found in environment")
else:
logger.debug("OpenCode Zen API key is placeholder value")
# Check for custom API endpoint (Ollama, vLLM, etc.) # Check for custom API endpoint (Ollama, vLLM, etc.)
custom_url = get_env("CUSTOM_API_URL") custom_url = get_env("CUSTOM_API_URL")
if custom_url: if custom_url:
@@ -530,7 +545,13 @@ def configure_providers():
registered_providers.append(ProviderType.CUSTOM.value) registered_providers.append(ProviderType.CUSTOM.value)
logger.debug(f"Registered provider: {ProviderType.CUSTOM.value}") logger.debug(f"Registered provider: {ProviderType.CUSTOM.value}")
# 3. OpenRouter last (catch-all for everything else) # 3. OpenCode Zen
if has_zen:
ModelProviderRegistry.register_provider(ProviderType.ZEN, ZenProvider)
registered_providers.append(ProviderType.ZEN.value)
logger.debug(f"Registered provider: {ProviderType.ZEN.value}")
# 4. OpenRouter last (catch-all for everything else)
if has_openrouter: if has_openrouter:
ModelProviderRegistry.register_provider(ProviderType.OPENROUTER, OpenRouterProvider) ModelProviderRegistry.register_provider(ProviderType.OPENROUTER, OpenRouterProvider)
registered_providers.append(ProviderType.OPENROUTER.value) registered_providers.append(ProviderType.OPENROUTER.value)
@@ -548,6 +569,7 @@ def configure_providers():
"- OPENAI_API_KEY for OpenAI models\n" "- OPENAI_API_KEY for OpenAI models\n"
"- XAI_API_KEY for X.AI GROK models\n" "- XAI_API_KEY for X.AI GROK models\n"
"- DIAL_API_KEY for DIAL models\n" "- DIAL_API_KEY for DIAL models\n"
"- ZEN_API_KEY for OpenCode Zen (curated models)\n"
"- OPENROUTER_API_KEY for OpenRouter (multiple models)\n" "- OPENROUTER_API_KEY for OpenRouter (multiple models)\n"
"- CUSTOM_API_URL for local models (Ollama, vLLM, etc.)" "- CUSTOM_API_URL for local models (Ollama, vLLM, etc.)"
) )
@@ -558,6 +580,8 @@ def configure_providers():
priority_info = [] priority_info = []
if has_native_apis: if has_native_apis:
priority_info.append("Native APIs (Gemini, OpenAI)") priority_info.append("Native APIs (Gemini, OpenAI)")
if has_zen:
priority_info.append("OpenCode Zen")
if has_custom: if has_custom:
priority_info.append("Custom endpoints") priority_info.append("Custom endpoints")
if has_openrouter: if has_openrouter:

View File

@@ -6,10 +6,18 @@ import asyncio
import importlib import importlib
import os import os
import sys import sys
import tempfile
from pathlib import Path from pathlib import Path
import pytest import pytest
# On macOS, the default pytest temp dir is typically under /var (e.g. /private/var/folders/...).
# If /var is considered a dangerous system path, tests must use a safe temp root (like /tmp).
if sys.platform == "darwin":
os.environ["TMPDIR"] = "/tmp"
# tempfile caches the temp dir after first lookup; clear it so pytest fixtures pick up TMPDIR.
tempfile.tempdir = None
# Ensure the parent directory is in the Python path for imports # Ensure the parent directory is in the Python path for imports
parent_dir = Path(__file__).resolve().parent.parent parent_dir = Path(__file__).resolve().parent.parent
if str(parent_dir) not in sys.path: if str(parent_dir) not in sys.path:
@@ -74,9 +82,20 @@ def project_path(tmp_path):
return test_dir return test_dir
@pytest.fixture
def zen_provider():
"""
Provides a Zen provider instance for testing.
Uses dummy API key for isolated testing.
"""
from providers.zen import ZenProvider
return ZenProvider(api_key="test-zen-key")
def _set_dummy_keys_if_missing(): def _set_dummy_keys_if_missing():
"""Set dummy API keys only when they are completely absent.""" """Set dummy API keys only when they are completely absent."""
for var in ("GEMINI_API_KEY", "OPENAI_API_KEY", "XAI_API_KEY"): for var in ("GEMINI_API_KEY", "OPENAI_API_KEY", "XAI_API_KEY", "ZEN_API_KEY"):
if not os.environ.get(var): if not os.environ.get(var):
os.environ[var] = "dummy-key-for-tests" os.environ[var] = "dummy-key-for-tests"

View File

@@ -0,0 +1,145 @@
"""Tests for OpenRouter store parameter handling in responses endpoint.
Regression tests for GitHub Issue #348: OpenAI "store" parameter validation error
for certain models via OpenRouter.
OpenRouter's /responses endpoint rejects store:true via Zod validation. This is an
endpoint-level limitation, not model-specific. These tests verify that:
- OpenRouter provider omits the store parameter
- Direct OpenAI provider includes store: true
"""
import unittest
from unittest.mock import Mock, patch
from providers.openai_compatible import OpenAICompatibleProvider
from providers.shared import ProviderType
class MockOpenRouterProvider(OpenAICompatibleProvider):
"""Mock provider that simulates OpenRouter behavior."""
FRIENDLY_NAME = "OpenRouter Test"
def get_provider_type(self):
return ProviderType.OPENROUTER
def get_capabilities(self, model_name):
mock_caps = Mock()
mock_caps.default_reasoning_effort = "high"
return mock_caps
def validate_model_name(self, model_name):
return True
def list_models(self, **kwargs):
return ["openai/gpt-5-pro", "openai/gpt-5.1-codex"]
class MockOpenAIProvider(OpenAICompatibleProvider):
"""Mock provider that simulates direct OpenAI behavior."""
FRIENDLY_NAME = "OpenAI Test"
def get_provider_type(self):
return ProviderType.OPENAI
def get_capabilities(self, model_name):
mock_caps = Mock()
mock_caps.default_reasoning_effort = "high"
return mock_caps
def validate_model_name(self, model_name):
return True
def list_models(self, **kwargs):
return ["gpt-5-pro", "gpt-5.1-codex"]
class TestStoreParameterHandling(unittest.TestCase):
"""Test store parameter is conditionally included based on provider type.
**Feature: openrouter-store-parameter-fix, Property 1: OpenRouter requests omit store parameter**
**Feature: openrouter-store-parameter-fix, Property 2: Direct OpenAI requests include store parameter**
"""
def test_openrouter_responses_omits_store_parameter(self):
"""Test that OpenRouter provider omits store parameter from responses endpoint.
**Feature: openrouter-store-parameter-fix, Property 1: OpenRouter requests omit store parameter**
**Validates: Requirements 1.1, 2.1**
OpenRouter's /responses endpoint rejects store:true via Zod validation (Issue #348).
The store parameter should be omitted entirely for OpenRouter requests.
"""
# Capture the completion_params passed to the API
captured_params = {}
def capture_create(**kwargs):
captured_params.update(kwargs)
# Return a mock response
mock_response = Mock()
mock_response.output_text = "Test response"
mock_response.usage = None
return mock_response
mock_client_instance = Mock()
mock_client_instance.responses.create = capture_create
with patch.object(
MockOpenRouterProvider, "client", new_callable=lambda: property(lambda self: mock_client_instance)
):
provider = MockOpenRouterProvider("test-key")
# Call the method that builds completion_params
provider._generate_with_responses_endpoint(
model_name="openai/gpt-5-pro",
messages=[{"role": "user", "content": "test"}],
temperature=0.7,
)
# Verify store parameter is NOT in the request
self.assertNotIn("store", captured_params, "OpenRouter requests should NOT include 'store' parameter")
def test_openai_responses_includes_store_parameter(self):
"""Test that direct OpenAI provider includes store parameter in responses endpoint.
**Feature: openrouter-store-parameter-fix, Property 2: Direct OpenAI requests include store parameter**
**Validates: Requirements 1.2, 2.2**
Direct OpenAI API supports the store parameter for stored completions.
The store parameter should be included with value True for OpenAI requests.
"""
# Capture the completion_params passed to the API
captured_params = {}
def capture_create(**kwargs):
captured_params.update(kwargs)
# Return a mock response
mock_response = Mock()
mock_response.output_text = "Test response"
mock_response.usage = None
return mock_response
mock_client_instance = Mock()
mock_client_instance.responses.create = capture_create
with patch.object(
MockOpenAIProvider, "client", new_callable=lambda: property(lambda self: mock_client_instance)
):
provider = MockOpenAIProvider("test-key")
# Call the method that builds completion_params
provider._generate_with_responses_endpoint(
model_name="gpt-5-pro",
messages=[{"role": "user", "content": "test"}],
temperature=0.7,
)
# Verify store parameter IS in the request with value True
self.assertIn("store", captured_params, "OpenAI requests should include 'store' parameter")
self.assertTrue(captured_params["store"], "OpenAI requests should have store=True")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,149 @@
"""
Test path traversal security fix.
Fixes vulnerability reported in:
- https://github.com/BeehiveInnovations/zen-mcp-server/issues/293
- https://github.com/BeehiveInnovations/zen-mcp-server/issues/312
The vulnerability: is_dangerous_path() only did exact string matching,
so /etc was blocked but /etc/passwd was allowed.
Additionally, this fix properly handles home directory containers:
- /home and C:\\Users are blocked (exact match only)
- /home/user/project paths are allowed through is_dangerous_path()
and handled by is_home_directory_root() in resolve_and_validate_path()
"""
from pathlib import Path
from utils.security_config import is_dangerous_path
class TestPathTraversalFix:
"""Test that subdirectories of dangerous system paths are blocked."""
def test_exact_match_still_works(self):
"""Test that exact dangerous paths are still blocked."""
assert is_dangerous_path(Path("/etc")) is True
assert is_dangerous_path(Path("/usr")) is True
assert is_dangerous_path(Path("/var")) is True
def test_subdirectory_now_blocked(self):
"""Test that subdirectories of system paths are blocked (the fix)."""
# These were allowed before the fix
assert is_dangerous_path(Path("/etc/passwd")) is True
assert is_dangerous_path(Path("/etc/shadow")) is True
assert is_dangerous_path(Path("/etc/hosts")) is True
assert is_dangerous_path(Path("/var/log/auth.log")) is True
def test_deeply_nested_blocked(self):
"""Test that deeply nested system paths are blocked."""
assert is_dangerous_path(Path("/etc/ssh/sshd_config")) is True
assert is_dangerous_path(Path("/usr/local/bin/python")) is True
def test_root_blocked(self):
"""Test that root directory is blocked."""
assert is_dangerous_path(Path("/")) is True
def test_safe_paths_allowed(self):
"""Test that safe paths are still allowed."""
# User project directories should be allowed
assert is_dangerous_path(Path("/tmp/test")) is False
assert is_dangerous_path(Path("/tmp/myproject/src")) is False
def test_similar_names_not_blocked(self):
"""Test that paths with similar names are not blocked."""
# /etcbackup should NOT be blocked (it's not under /etc)
assert is_dangerous_path(Path("/tmp/etcbackup")) is False
assert is_dangerous_path(Path("/tmp/my_etc_files")) is False
class TestHomeDirectoryHandling:
"""Test that home directory containers are handled correctly.
Home containers (/home, C:\\Users) should only block the exact path,
not subdirectories. Subdirectory access control is delegated to
is_home_directory_root() in resolve_and_validate_path().
"""
def test_home_container_blocked(self):
"""Test that /home itself is blocked."""
assert is_dangerous_path(Path("/home")) is True
def test_home_subdirectories_allowed(self):
"""Test that /home subdirectories pass through is_dangerous_path().
These paths should NOT be blocked by is_dangerous_path() because:
1. /home/user/project is a valid user workspace
2. Access control for /home/username is handled by is_home_directory_root()
"""
# User home directories should pass is_dangerous_path()
# (they are handled by is_home_directory_root() separately)
assert is_dangerous_path(Path("/home/user")) is False
assert is_dangerous_path(Path("/home/user/project")) is False
assert is_dangerous_path(Path("/home/user/project/src/main.py")) is False
def test_home_deeply_nested_allowed(self):
"""Test that deeply nested home paths are allowed."""
assert is_dangerous_path(Path("/home/user/documents/work/project/src")) is False
class TestRegressionPrevention:
"""Regression tests for the specific vulnerability."""
def test_etc_passwd_blocked(self):
"""Test /etc/passwd is blocked (common attack target)."""
assert is_dangerous_path(Path("/etc/passwd")) is True
def test_etc_shadow_blocked(self):
"""Test /etc/shadow is blocked (password hashes)."""
assert is_dangerous_path(Path("/etc/shadow")) is True
class TestWindowsPathHandling:
"""Test Windows path handling with trailing backslash.
Fixes issue reported in PR #353: Windows paths like C:\\ have trailing
backslash which caused double separator issues with string prefix matching.
Using Path.is_relative_to() resolves this correctly.
"""
def test_windows_root_drive_blocked(self):
"""Test that Windows root drive C:\\ is blocked."""
from pathlib import PureWindowsPath
# Simulate Windows path behavior using PureWindowsPath
# On Linux, we test the logic with PureWindowsPath to verify cross-platform correctness
c_root = PureWindowsPath("C:\\")
assert c_root.parent == c_root # Root check works
def test_windows_dangerous_subdirectory_detection(self):
"""Test that Windows subdirectories are correctly detected as dangerous.
This verifies the fix for the double backslash issue:
- Before fix: "C:\\" + "\\" = "C:\\\\" which doesn't match "C:\\Users"
- After fix: Path.is_relative_to() handles this correctly
"""
from pathlib import PureWindowsPath
# Verify is_relative_to works correctly for Windows paths
c_users = PureWindowsPath("C:\\Users")
c_root = PureWindowsPath("C:\\")
# This is the key test - subdirectory detection must work
assert c_users.is_relative_to(c_root) is True
# Deeper paths should also work
c_users_admin = PureWindowsPath("C:\\Users\\Admin")
assert c_users_admin.is_relative_to(c_root) is True
assert c_users_admin.is_relative_to(c_users) is True
def test_windows_path_not_relative_to_different_drive(self):
"""Test that paths on different drives are not related."""
from pathlib import PureWindowsPath
d_path = PureWindowsPath("D:\\Data")
c_root = PureWindowsPath("C:\\")
# D: drive paths should not be relative to C:
assert d_path.is_relative_to(c_root) is False

View File

@@ -29,14 +29,12 @@ class TestFileUtils:
assert "Error: File does not exist" in content assert "Error: File does not exist" in content
assert tokens > 0 assert tokens > 0
def test_read_file_content_safe_files_allowed(self): def test_read_file_content_dangerous_files_blocked(self):
"""Test that safe files outside the original project root are now allowed""" """Test that dangerous system files are blocked"""
# In the new security model, safe files like /etc/passwd # /etc/passwd should be blocked as it's under /etc (dangerous path)
# can be read as they're not in the dangerous paths list
content, tokens = read_file_content("/etc/passwd") content, tokens = read_file_content("/etc/passwd")
# Should successfully read the file (with timestamp in header) assert "--- ERROR ACCESSING FILE:" in content
assert "--- BEGIN FILE: /etc/passwd (Last modified:" in content assert "Access to system directory denied" in content
assert "--- END FILE: /etc/passwd ---" in content
assert tokens > 0 assert tokens > 0
def test_read_file_content_relative_path_rejected(self): def test_read_file_content_relative_path_rejected(self):

View File

@@ -0,0 +1,166 @@
"""Tests for OpenCode Zen model registry functionality."""
import json
import os
import tempfile
from unittest.mock import patch
from providers.registries.zen import ZenModelRegistry
from providers.shared import ProviderType
class TestZenModelRegistry:
"""Test cases for Zen model registry."""
def test_registry_initialization(self):
"""Test registry initializes with default config."""
registry = ZenModelRegistry()
# Should load models from default location
assert len(registry.list_models()) > 0
assert len(registry.list_aliases()) > 0
# Should include our configured models
assert "claude-sonnet-4-5" in registry.list_models()
assert "gpt-5.1-codex" in registry.list_models()
def test_custom_config_path(self):
"""Test registry with custom config path."""
# Create temporary config
config_data = {
"models": [
{
"model_name": "test/zen-model-1",
"aliases": ["zen-test1", "zt1"],
"context_window": 4096,
"max_output_tokens": 2048,
"intelligence_score": 15,
}
]
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(config_data, f)
temp_path = f.name
try:
registry = ZenModelRegistry(config_path=temp_path)
assert len(registry.list_models()) == 1
assert "test/zen-model-1" in registry.list_models()
assert "zen-test1" in registry.list_aliases()
assert "zt1" in registry.list_aliases()
finally:
os.unlink(temp_path)
def test_get_capabilities(self):
"""Test capability retrieval."""
registry = ZenModelRegistry()
# Test getting capabilities for a known model
caps = registry.get_capabilities("claude-sonnet-4-5")
assert caps is not None
assert caps.provider == ProviderType.ZEN
assert caps.model_name == "claude-sonnet-4-5"
assert caps.friendly_name == "OpenCode Zen (claude-sonnet-4-5)"
assert caps.context_window == 200000
assert caps.intelligence_score == 17
# Test getting capabilities for unknown model
caps = registry.get_capabilities("unknown-model")
assert caps is None
def test_resolve_model(self):
"""Test model resolution with aliases."""
registry = ZenModelRegistry()
# Test resolving a direct model name
config = registry.resolve("claude-sonnet-4-5")
assert config is not None
assert config.model_name == "claude-sonnet-4-5"
# Test resolving an alias
config = registry.resolve("zen-sonnet")
assert config is not None
assert config.model_name == "claude-sonnet-4-5"
# Test resolving unknown model
config = registry.resolve("unknown-model")
assert config is None
def test_list_aliases(self):
"""Test alias listing."""
registry = ZenModelRegistry()
aliases = registry.list_aliases()
assert isinstance(aliases, list)
assert len(aliases) > 0
# Should include our configured aliases
assert "zen-sonnet" in aliases
assert "zen-codex" in aliases
assert "zen-gemini" in aliases
def test_environment_config_path(self):
"""Test registry respects environment variable for config path."""
config_data = {
"models": [
{
"model_name": "env/test-model",
"aliases": ["env-test"],
"context_window": 8192,
"max_output_tokens": 4096,
"intelligence_score": 10,
}
]
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(config_data, f)
temp_path = f.name
try:
with patch.dict(os.environ, {"ZEN_MODELS_CONFIG_PATH": temp_path}):
registry = ZenModelRegistry()
assert "env/test-model" in registry.list_models()
assert "env-test" in registry.list_aliases()
finally:
os.unlink(temp_path)
def test_malformed_config(self):
"""Test registry handles malformed config gracefully."""
malformed_config = {
"models": [
{
"model_name": "test/bad-model",
# Missing required fields
}
]
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(malformed_config, f)
temp_path = f.name
try:
registry = ZenModelRegistry(config_path=temp_path)
# Should still initialize but model may not load properly
# This tests error handling in config loading
registry.list_models() # Test that this doesn't crash
# May or may not include the malformed model depending on validation
finally:
os.unlink(temp_path)
def test_empty_config(self):
"""Test registry with empty config."""
empty_config = {"models": []}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(empty_config, f)
temp_path = f.name
try:
registry = ZenModelRegistry(config_path=temp_path)
assert len(registry.list_models()) == 0
assert len(registry.list_aliases()) == 0
finally:
os.unlink(temp_path)

361
tests/test_zen_provider.py Normal file
View File

@@ -0,0 +1,361 @@
"""Tests for OpenCode Zen provider."""
import os
from unittest.mock import Mock, patch
import pytest
from providers.registry import ModelProviderRegistry
from providers.shared import ProviderType
from providers.zen import ZenProvider
class TestZenProvider:
"""Test cases for OpenCode Zen provider."""
def test_provider_initialization(self):
"""Test Zen provider initialization."""
provider = ZenProvider(api_key="test-key")
assert provider.api_key == "test-key"
assert provider.base_url == "https://opencode.ai/zen/v1"
assert provider.FRIENDLY_NAME == "OpenCode Zen"
def test_get_provider_type(self):
"""Test provider type identification."""
provider = ZenProvider(api_key="test-key")
assert provider.get_provider_type() == ProviderType.ZEN
def test_model_validation(self):
"""Test model validation."""
provider = ZenProvider(api_key="test-key")
# Zen accepts models that are in the registry
assert provider.validate_model_name("claude-sonnet-4-5") is True
assert provider.validate_model_name("gpt-5.1-codex") is True
# Unknown models are rejected
assert provider.validate_model_name("unknown-model") is False
def test_get_capabilities(self):
"""Test capability generation."""
provider = ZenProvider(api_key="test-key")
# Test with a model in the registry
caps = provider.get_capabilities("claude-sonnet-4-5")
assert caps.provider == ProviderType.ZEN
assert caps.model_name == "claude-sonnet-4-5"
assert caps.friendly_name == "OpenCode Zen (claude-sonnet-4-5)"
# Test with a model not in registry - should raise error
with pytest.raises(ValueError, match="Unsupported model 'unknown-model' for provider zen"):
provider.get_capabilities("unknown-model")
def test_model_alias_resolution(self):
"""Test model alias resolution."""
provider = ZenProvider(api_key="test-key")
# Test alias resolution
assert provider._resolve_model_name("zen-sonnet") == "claude-sonnet-4-5"
assert provider._resolve_model_name("zen-sonnet4.5") == "claude-sonnet-4-5"
assert provider._resolve_model_name("zen-codex") == "gpt-5.1-codex"
assert provider._resolve_model_name("zen-gpt-codex") == "gpt-5.1-codex"
# Test case-insensitive
assert provider._resolve_model_name("ZEN-SONNET") == "claude-sonnet-4-5"
assert provider._resolve_model_name("Zen-Codex") == "gpt-5.1-codex"
# Test direct model names (should pass through unchanged)
assert provider._resolve_model_name("claude-sonnet-4-5") == "claude-sonnet-4-5"
assert provider._resolve_model_name("gpt-5.1-codex") == "gpt-5.1-codex"
# Test unknown models pass through
assert provider._resolve_model_name("unknown-model") == "unknown-model"
def test_list_models(self):
"""Test model listing with various options."""
provider = ZenProvider(api_key="test-key")
# Test basic model listing
models = provider.list_models()
assert isinstance(models, list)
assert len(models) > 0
# Should include our configured models
assert "claude-sonnet-4-5" in models
assert "gpt-5.1-codex" in models
# Should include aliases
assert "zen-sonnet" in models
assert "zen-codex" in models
def test_list_models_with_options(self):
"""Test model listing with different options."""
provider = ZenProvider(api_key="test-key")
# Test without aliases
models_no_aliases = provider.list_models(include_aliases=False)
assert "zen-sonnet" not in models_no_aliases
assert "claude-sonnet-4-5" in models_no_aliases
# Test lowercase
models_lower = provider.list_models(lowercase=True)
assert all(model == model.lower() for model in models_lower)
def test_registry_capabilities(self):
"""Test that registry capabilities are properly loaded."""
provider = ZenProvider(api_key="test-key")
# Test that we have a registry
assert provider._registry is not None
# Test getting all capabilities
capabilities = provider.get_all_model_capabilities()
assert isinstance(capabilities, dict)
assert len(capabilities) > 0
# Should include our configured models
assert "claude-sonnet-4-5" in capabilities
assert "gpt-5.1-codex" in capabilities
# Check capability structure
caps = capabilities["claude-sonnet-4-5"]
assert caps.provider == ProviderType.ZEN
assert caps.context_window == 200000
assert caps.intelligence_score == 17
def test_model_capabilities_lookup(self):
"""Test capability lookup for known and unknown models."""
provider = ZenProvider(api_key="test-key")
# Test known model
caps = provider._lookup_capabilities("claude-sonnet-4-5")
assert caps is not None
assert caps.provider == ProviderType.ZEN
# Test unknown model returns None (base class handles error)
caps = provider._lookup_capabilities("unknown-zen-model")
assert caps is None
def test_zen_registration(self):
"""Test Zen can be registered and retrieved."""
with patch.dict(os.environ, {"ZEN_API_KEY": "test-key"}):
# Clean up any existing registration
ModelProviderRegistry.unregister_provider(ProviderType.ZEN)
# Register the provider
ModelProviderRegistry.register_provider(ProviderType.ZEN, ZenProvider)
# Retrieve and verify
provider = ModelProviderRegistry.get_provider(ProviderType.ZEN)
assert provider is not None
assert isinstance(provider, ZenProvider)
class TestZenAutoMode:
"""Test auto mode functionality when only Zen is configured."""
def setup_method(self):
"""Store original state before each test."""
self.registry = ModelProviderRegistry()
self._original_providers = self.registry._providers.copy()
self._original_initialized = self.registry._initialized_providers.copy()
# Clear the registry state for this test
self.registry._providers.clear()
self.registry._initialized_providers.clear()
self._original_env = {}
for key in ["ZEN_API_KEY", "GEMINI_API_KEY", "OPENAI_API_KEY", "DEFAULT_MODEL"]:
self._original_env[key] = os.environ.get(key)
def teardown_method(self):
"""Restore original state after each test."""
self.registry._providers.clear()
self.registry._initialized_providers.clear()
self.registry._providers.update(self._original_providers)
self.registry._initialized_providers.update(self._original_initialized)
for key, value in self._original_env.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
@pytest.mark.no_mock_provider
def test_zen_only_auto_mode(self):
"""Test that auto mode works when only Zen is configured."""
os.environ.pop("GEMINI_API_KEY", None)
os.environ.pop("OPENAI_API_KEY", None)
os.environ["ZEN_API_KEY"] = "test-zen-key"
os.environ["DEFAULT_MODEL"] = "auto"
mock_registry = Mock()
model_names = [
"claude-sonnet-4-5",
"claude-haiku-4-5",
"gpt-5.1-codex",
"gemini-3-pro",
"glm-4.6",
]
mock_registry.list_models.return_value = model_names
# Mock resolve to return a ModelCapabilities-like object for each model
def mock_resolve(model_name):
if model_name in model_names:
mock_config = Mock()
mock_config.provider = ProviderType.ZEN
mock_config.aliases = [] # Empty list of aliases
mock_config.get_effective_capability_rank = Mock(return_value=50) # Add ranking method
return mock_config
return None
mock_registry.resolve.side_effect = mock_resolve
ModelProviderRegistry.register_provider(ProviderType.ZEN, ZenProvider)
provider = ModelProviderRegistry.get_provider(ProviderType.ZEN)
assert provider is not None, "Zen provider should be available with API key"
provider._registry = mock_registry
available_models = ModelProviderRegistry.get_available_models(respect_restrictions=True)
assert len(available_models) > 0, "Should find Zen models in auto mode"
assert all(provider_type == ProviderType.ZEN for provider_type in available_models.values())
for model in model_names:
assert model in available_models, f"Model {model} should be available"
class TestZenIntegration:
"""Integration tests for Zen provider with server components."""
def test_zen_provider_in_server_init(self):
"""Test that Zen provider is properly handled during server initialization."""
# This test verifies that the server can handle Zen provider configuration
# without actual server startup
with patch.dict(os.environ, {"ZEN_API_KEY": "test-integration-key"}):
# Import server module to trigger provider setup
from providers.registry import ModelProviderRegistry
# Verify Zen provider can be registered
ModelProviderRegistry.register_provider(ProviderType.ZEN, ZenProvider)
provider = ModelProviderRegistry.get_provider(ProviderType.ZEN)
assert provider is not None
assert isinstance(provider, ZenProvider)
def test_zen_config_loading(self):
"""Test that Zen configuration loads properly in integration context."""
with patch.dict(os.environ, {"ZEN_API_KEY": "test-config-key"}):
from providers.registries.zen import ZenModelRegistry
# Test registry loads configuration
registry = ZenModelRegistry()
models = registry.list_models()
aliases = registry.list_aliases()
assert len(models) > 0, "Should load models from zen_models.json"
assert len(aliases) > 0, "Should load aliases from zen_models.json"
# Verify specific models are loaded
assert "claude-sonnet-4-5" in models
assert "zen-sonnet" in aliases
def test_zen_provider_priority(self):
"""Test that Zen provider follows correct priority order."""
# Zen should be prioritized after native APIs but before OpenRouter
from providers.registry import ModelProviderRegistry
priority_order = ModelProviderRegistry.PROVIDER_PRIORITY_ORDER
zen_index = priority_order.index(ProviderType.ZEN)
openrouter_index = priority_order.index(ProviderType.OPENROUTER)
# Zen should come before OpenRouter in priority
assert zen_index < openrouter_index, "Zen should have higher priority than OpenRouter"
class TestZenAPIMocking:
"""Test API interactions with mocked OpenAI SDK."""
def test_chat_completion_mock(self):
"""Test chat completion with mocked API response."""
provider = ZenProvider(api_key="test-key")
# Mock the OpenAI client and response
mock_response = Mock()
mock_response.choices = [Mock()]
mock_response.choices[0].message.content = "Mocked response from Zen"
mock_response.usage = Mock()
mock_response.usage.prompt_tokens = 10
mock_response.usage.completion_tokens = 20
with patch.object(provider.client.chat.completions, "create", return_value=mock_response):
# Test the completion method - this will initialize the client
response = provider.complete(
model="claude-sonnet-4-5", messages=[{"role": "user", "content": "Hello"}], temperature=0.7
)
assert response.content == "Mocked response from Zen"
def test_streaming_completion_mock(self):
"""Test streaming completion with mocked API."""
provider = ZenProvider(api_key="test-key")
# Mock streaming response
mock_chunk1 = Mock()
mock_chunk1.choices = [Mock()]
mock_chunk1.choices[0].delta.content = "Hello"
mock_chunk1.choices[0].finish_reason = None
mock_chunk2 = Mock()
mock_chunk2.choices = [Mock()]
mock_chunk2.choices[0].delta.content = " world!"
mock_chunk2.choices[0].finish_reason = "stop"
mock_stream = [mock_chunk1, mock_chunk2]
# Access client to initialize it first
_ = provider.client
with patch.object(provider.client.chat.completions, "create", return_value=mock_stream):
# Test streaming completion
stream = provider.complete_stream(
model="gpt-5.1-codex",
messages=[{"role": "user", "content": "Say hello"}],
)
chunks = list(stream)
assert len(chunks) == 2
assert chunks[0].content == "Hello"
assert chunks[1].content == " world!"
def test_api_error_handling(self):
"""Test error handling for API failures."""
provider = ZenProvider(api_key="test-key")
# Mock API error
from openai import APIError
api_error = APIError("Mock API error", request=Mock(), body="error details")
with patch.object(provider._client.chat.completions, "create", side_effect=api_error):
with pytest.raises(APIError):
provider.complete(model="claude-sonnet-4-5", messages=[{"role": "user", "content": "Test"}])
def test_invalid_model_error(self):
"""Test error handling for invalid models."""
provider = ZenProvider(api_key="test-key")
with pytest.raises(ValueError, match="Unsupported model 'invalid-model' for provider zen"):
provider.get_capabilities("invalid-model")
def test_authentication_error(self):
"""Test handling of authentication errors."""
provider = ZenProvider(api_key="invalid-key")
# Mock authentication error
from openai import AuthenticationError
auth_error = AuthenticationError("Invalid API key", request=Mock(), body="auth failed")
with patch.object(provider._client.chat.completions, "create", side_effect=auth_error):
with pytest.raises(AuthenticationError):
provider.complete(model="claude-sonnet-4-5", messages=[{"role": "user", "content": "Test"}])

View File

@@ -102,6 +102,7 @@ class ListModelsTool(BaseTool):
ProviderType.OPENAI: {"name": "OpenAI", "env_key": "OPENAI_API_KEY"}, ProviderType.OPENAI: {"name": "OpenAI", "env_key": "OPENAI_API_KEY"},
ProviderType.AZURE: {"name": "Azure OpenAI", "env_key": "AZURE_OPENAI_API_KEY"}, ProviderType.AZURE: {"name": "Azure OpenAI", "env_key": "AZURE_OPENAI_API_KEY"},
ProviderType.XAI: {"name": "X.AI (Grok)", "env_key": "XAI_API_KEY"}, ProviderType.XAI: {"name": "X.AI (Grok)", "env_key": "XAI_API_KEY"},
ProviderType.ZEN: {"name": "OpenCode Zen", "env_key": "ZEN_API_KEY"},
ProviderType.DIAL: {"name": "AI DIAL", "env_key": "DIAL_API_KEY"}, ProviderType.DIAL: {"name": "AI DIAL", "env_key": "DIAL_API_KEY"},
} }

View File

@@ -7,22 +7,30 @@ for file access control.
from pathlib import Path from pathlib import Path
# Dangerous paths that should never be scanned # Dangerous system paths - block these AND all their subdirectories
# These would give overly broad access and pose security risks # These are system directories where user code should never reside
DANGEROUS_PATHS = { DANGEROUS_SYSTEM_PATHS = {
"/", "/",
"/etc", "/etc",
"/usr", "/usr",
"/bin", "/bin",
"/var", "/var",
"/root", "/root",
"/home",
"C:\\",
"C:\\Windows", "C:\\Windows",
"C:\\Program Files", "C:\\Program Files",
}
# User home container paths - block ONLY the exact path, not subdirectories
# Subdirectory access (e.g., /home/user/project) is controlled by is_home_directory_root()
# This allows users to work in their home subdirectories while blocking overly broad access
DANGEROUS_HOME_CONTAINERS = {
"/home",
"C:\\Users", "C:\\Users",
} }
# Combined set for backward compatibility
DANGEROUS_PATHS = DANGEROUS_SYSTEM_PATHS | DANGEROUS_HOME_CONTAINERS
# Directories to exclude from recursive file search # Directories to exclude from recursive file search
# These typically contain generated code, dependencies, or build artifacts # These typically contain generated code, dependencies, or build artifacts
EXCLUDED_DIRS = { EXCLUDED_DIRS = {
@@ -89,16 +97,67 @@ EXCLUDED_DIRS = {
def is_dangerous_path(path: Path) -> bool: def is_dangerous_path(path: Path) -> bool:
""" """
Check if a path is in the dangerous paths list. Check if a path is in or under a dangerous directory.
This function handles two categories of dangerous paths differently:
1. System paths (DANGEROUS_SYSTEM_PATHS): Block the path AND all subdirectories.
Example: /etc is dangerous, so /etc/passwd is also blocked.
2. Home containers (DANGEROUS_HOME_CONTAINERS): Block ONLY the exact path.
Example: /home is blocked, but /home/user/project is allowed.
Subdirectory access control is delegated to is_home_directory_root().
Args: Args:
path: Path to check path: Path to check
Returns: Returns:
True if the path is dangerous and should not be accessed True if the path is dangerous and should not be accessed
Security:
Fixes path traversal vulnerability (CWE-22) while preserving
user access to home subdirectories.
""" """
try: try:
resolved = path.resolve() resolved = path.resolve()
return str(resolved) in DANGEROUS_PATHS or resolved.parent == resolved
def _dangerous_variants(p: Path) -> set[Path]:
variants = {p}
# Only resolve paths that are absolute on the current platform.
# This avoids turning Windows-style strings into nonsense absolute paths on POSIX.
if p.is_absolute():
try:
variants.add(p.resolve())
except Exception:
pass
return variants
# Check 1: Root directory (filesystem root)
if resolved.parent == resolved:
return True
# Check 2: System paths - block exact match AND all subdirectories
for dangerous in DANGEROUS_SYSTEM_PATHS:
# Skip root "/" - already handled above
if dangerous == "/":
continue
for dangerous_path in _dangerous_variants(Path(dangerous)):
# is_relative_to() correctly handles both exact matches and subdirectories.
# Resolving the dangerous base path also handles platform symlinks
# (e.g., macOS /etc -> /private/etc, /var -> /private/var).
if resolved == dangerous_path or resolved.is_relative_to(dangerous_path):
return True
# Check 3: Home containers - block ONLY exact match
# Subdirectories like /home/user/project should pass through here
# and be handled by is_home_directory_root() in resolve_and_validate_path()
for container in DANGEROUS_HOME_CONTAINERS:
for container_path in _dangerous_variants(Path(container)):
if resolved == container_path:
return True
return False
except Exception: except Exception:
return True # If we can't resolve, consider it dangerous return True # If we can't resolve, consider it dangerous