Files
my-pal-mcp-server/clink/registry.py
Fahad 9ffca53ce5 feat! Claude Code as a CLI agent now supported. Mix and match: spawn claude code from within claude code, or claude code from within codex.
Stay in codex, plan review and fix complicated bugs, then ask it to spawn claude code and implement the plan.

This uses your current subscription instead of API tokens.
2025-10-08 11:14:22 +04:00

256 lines
8.7 KiB
Python

"""Configuration registry for clink CLI integrations."""
from __future__ import annotations
import json
import logging
import shlex
from collections.abc import Iterable
from pathlib import Path
from clink.constants import (
CONFIG_DIR,
DEFAULT_TIMEOUT_SECONDS,
INTERNAL_DEFAULTS,
PROJECT_ROOT,
USER_CONFIG_DIR,
CLIInternalDefaults,
)
from clink.models import (
CLIClientConfig,
CLIRoleConfig,
ResolvedCLIClient,
ResolvedCLIRole,
)
from utils.env import get_env
from utils.file_utils import read_json_file
logger = logging.getLogger("clink.registry")
CONFIG_ENV_VAR = "CLI_CLIENTS_CONFIG_PATH"
class RegistryLoadError(RuntimeError):
"""Raised when configuration files are invalid or missing critical data."""
class ClinkRegistry:
"""Loads CLI client definitions and exposes them for schema generation/runtime use."""
def __init__(self) -> None:
self._clients: dict[str, ResolvedCLIClient] = {}
self._load()
def _load(self) -> None:
self._clients.clear()
for config_path in self._iter_config_files():
try:
data = read_json_file(str(config_path))
except json.JSONDecodeError as exc:
raise RegistryLoadError(f"Invalid JSON in {config_path}: {exc}") from exc
if not data:
logger.debug("Skipping empty configuration file: %s", config_path)
continue
config = CLIClientConfig.model_validate(data)
resolved = self._resolve_config(config, source_path=config_path)
key = resolved.name.lower()
if key in self._clients:
logger.info("Overriding CLI configuration for '%s' from %s", resolved.name, config_path)
else:
logger.debug("Loaded CLI configuration for '%s' from %s", resolved.name, config_path)
self._clients[key] = resolved
if not self._clients:
raise RegistryLoadError(
"No CLI clients configured. Ensure conf/cli_clients contains at least one definition or set "
f"{CONFIG_ENV_VAR}."
)
def reload(self) -> None:
"""Reload configurations from disk."""
self._load()
def list_clients(self) -> list[str]:
return sorted(client.name for client in self._clients.values())
def list_roles(self, cli_name: str) -> list[str]:
config = self.get_client(cli_name)
return sorted(config.roles.keys())
def get_client(self, cli_name: str) -> ResolvedCLIClient:
key = cli_name.lower()
if key not in self._clients:
available = ", ".join(self.list_clients())
raise KeyError(f"CLI '{cli_name}' is not configured. Available clients: {available}")
return self._clients[key]
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _iter_config_files(self) -> Iterable[Path]:
search_paths: list[Path] = []
# 1. Built-in configs
search_paths.append(CONFIG_DIR)
# 2. CLI_CLIENTS_CONFIG_PATH environment override (file or directory)
env_path_raw = get_env(CONFIG_ENV_VAR)
if env_path_raw:
env_path = Path(env_path_raw).expanduser()
search_paths.append(env_path)
# 3. User overrides in ~/.zen/cli_clients
search_paths.append(USER_CONFIG_DIR)
seen: set[Path] = set()
for base in search_paths:
if not base:
continue
if base in seen:
continue
seen.add(base)
if base.is_file() and base.suffix.lower() == ".json":
yield base
continue
if base.is_dir():
for path in sorted(base.glob("*.json")):
if path.is_file():
yield path
else:
logger.debug("Configuration path does not exist: %s", base)
def _resolve_config(self, raw: CLIClientConfig, *, source_path: Path) -> ResolvedCLIClient:
if not raw.name:
raise RegistryLoadError(f"CLI configuration at {source_path} is missing a 'name' field")
normalized_name = raw.name.strip()
internal_defaults = INTERNAL_DEFAULTS.get(normalized_name.lower())
if internal_defaults is None:
raise RegistryLoadError(f"CLI '{raw.name}' is not supported by clink")
executable = self._resolve_executable(raw, internal_defaults, source_path)
internal_args = list(internal_defaults.additional_args) if internal_defaults else []
config_args = list(raw.additional_args)
timeout_seconds = raw.timeout_seconds or (
internal_defaults.timeout_seconds if internal_defaults else DEFAULT_TIMEOUT_SECONDS
)
parser_name = internal_defaults.parser
if not parser_name:
raise RegistryLoadError(
f"CLI '{raw.name}' must define a parser either in configuration or internal defaults"
)
runner_name = internal_defaults.runner if internal_defaults else None
env = self._merge_env(raw, internal_defaults)
working_dir = self._resolve_optional_path(raw.working_dir, source_path.parent)
roles = self._resolve_roles(raw, internal_defaults, source_path)
output_to_file = raw.output_to_file
return ResolvedCLIClient(
name=normalized_name,
executable=executable,
internal_args=internal_args,
config_args=config_args,
env=env,
timeout_seconds=int(timeout_seconds),
parser=parser_name,
runner=runner_name,
roles=roles,
output_to_file=output_to_file,
working_dir=working_dir,
)
def _resolve_executable(
self,
raw: CLIClientConfig,
internal_defaults: CLIInternalDefaults | None,
source_path: Path,
) -> list[str]:
command = raw.command
if not command:
raise RegistryLoadError(f"CLI '{raw.name}' must specify a 'command' in configuration")
return shlex.split(command)
def _merge_env(
self,
raw: CLIClientConfig,
internal_defaults: CLIInternalDefaults | None,
) -> dict[str, str]:
merged: dict[str, str] = {}
if internal_defaults and internal_defaults.env:
merged.update(internal_defaults.env)
merged.update(raw.env)
return merged
def _resolve_roles(
self,
raw: CLIClientConfig,
internal_defaults: CLIInternalDefaults | None,
source_path: Path,
) -> dict[str, ResolvedCLIRole]:
roles: dict[str, CLIRoleConfig] = dict(raw.roles)
default_role_prompt = internal_defaults.default_role_prompt if internal_defaults else None
if "default" not in roles:
roles["default"] = CLIRoleConfig(prompt_path=default_role_prompt)
elif roles["default"].prompt_path is None and default_role_prompt:
roles["default"].prompt_path = default_role_prompt
resolved: dict[str, ResolvedCLIRole] = {}
for role_name, role_config in roles.items():
prompt_path_str = role_config.prompt_path or default_role_prompt
if not prompt_path_str:
raise RegistryLoadError(f"Role '{role_name}' for CLI '{raw.name}' must define a prompt_path")
prompt_path = self._resolve_prompt_path(prompt_path_str, source_path.parent)
resolved[role_name] = ResolvedCLIRole(
name=role_name,
prompt_path=prompt_path,
role_args=list(role_config.role_args),
description=role_config.description,
)
return resolved
def _resolve_prompt_path(self, prompt_path: str, base_dir: Path) -> Path:
resolved = self._resolve_path(prompt_path, base_dir)
if not resolved.exists():
raise RegistryLoadError(f"Prompt file not found: {resolved}")
return resolved
def _resolve_optional_path(self, candidate: str | None, base_dir: Path) -> Path | None:
if not candidate:
return None
return self._resolve_path(candidate, base_dir)
def _resolve_path(self, candidate: str, base_dir: Path) -> Path:
path = Path(candidate)
if path.is_absolute():
return path
candidate_path = (base_dir / path).resolve()
if candidate_path.exists():
return candidate_path
project_relative = (PROJECT_ROOT / path).resolve()
return project_relative
_REGISTRY: ClinkRegistry | None = None
def get_registry() -> ClinkRegistry:
global _REGISTRY
if _REGISTRY is None:
_REGISTRY = ClinkRegistry()
return _REGISTRY