Zen now allows you to define `roles` for an external CLI and delegate work to another CLI via the new `clink` tool (short for `CLI + Link`). Gemini, for instance, offers 1000 free requests a day - this means you can save on tokens and your weekly limits within Claude Code by delegating work to another entirely capable CLI agent! Define your own system prompts as `roles` and make another CLI do anything you'd like. Like the current tool you're connected to, the other CLI has complete access to your files and the current context. This also works incredibly well with Zen's `conversation continuity`.
253 lines
8.6 KiB
Python
253 lines
8.6 KiB
Python
"""Configuration registry for clink CLI integrations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import shlex
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
|
|
from clink.constants import (
|
|
CONFIG_DIR,
|
|
DEFAULT_TIMEOUT_SECONDS,
|
|
INTERNAL_DEFAULTS,
|
|
PROJECT_ROOT,
|
|
USER_CONFIG_DIR,
|
|
CLIInternalDefaults,
|
|
)
|
|
from clink.models import (
|
|
CLIClientConfig,
|
|
CLIRoleConfig,
|
|
ResolvedCLIClient,
|
|
ResolvedCLIRole,
|
|
)
|
|
from utils.env import get_env
|
|
from utils.file_utils import read_json_file
|
|
|
|
logger = logging.getLogger("clink.registry")
|
|
|
|
CONFIG_ENV_VAR = "CLI_CLIENTS_CONFIG_PATH"
|
|
|
|
|
|
class RegistryLoadError(RuntimeError):
|
|
"""Raised when configuration files are invalid or missing critical data."""
|
|
|
|
|
|
class ClinkRegistry:
|
|
"""Loads CLI client definitions and exposes them for schema generation/runtime use."""
|
|
|
|
def __init__(self) -> None:
|
|
self._clients: dict[str, ResolvedCLIClient] = {}
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
self._clients.clear()
|
|
for config_path in self._iter_config_files():
|
|
try:
|
|
data = read_json_file(str(config_path))
|
|
except json.JSONDecodeError as exc:
|
|
raise RegistryLoadError(f"Invalid JSON in {config_path}: {exc}") from exc
|
|
|
|
if not data:
|
|
logger.debug("Skipping empty configuration file: %s", config_path)
|
|
continue
|
|
|
|
config = CLIClientConfig.model_validate(data)
|
|
resolved = self._resolve_config(config, source_path=config_path)
|
|
key = resolved.name.lower()
|
|
if key in self._clients:
|
|
logger.info("Overriding CLI configuration for '%s' from %s", resolved.name, config_path)
|
|
else:
|
|
logger.debug("Loaded CLI configuration for '%s' from %s", resolved.name, config_path)
|
|
self._clients[key] = resolved
|
|
|
|
if not self._clients:
|
|
raise RegistryLoadError(
|
|
"No CLI clients configured. Ensure conf/cli_clients contains at least one definition or set "
|
|
f"{CONFIG_ENV_VAR}."
|
|
)
|
|
|
|
def reload(self) -> None:
|
|
"""Reload configurations from disk."""
|
|
self._load()
|
|
|
|
def list_clients(self) -> list[str]:
|
|
return sorted(client.name for client in self._clients.values())
|
|
|
|
def list_roles(self, cli_name: str) -> list[str]:
|
|
config = self.get_client(cli_name)
|
|
return sorted(config.roles.keys())
|
|
|
|
def get_client(self, cli_name: str) -> ResolvedCLIClient:
|
|
key = cli_name.lower()
|
|
if key not in self._clients:
|
|
available = ", ".join(self.list_clients())
|
|
raise KeyError(f"CLI '{cli_name}' is not configured. Available clients: {available}")
|
|
return self._clients[key]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _iter_config_files(self) -> Iterable[Path]:
|
|
search_paths: list[Path] = []
|
|
|
|
# 1. Built-in configs
|
|
search_paths.append(CONFIG_DIR)
|
|
|
|
# 2. CLI_CLIENTS_CONFIG_PATH environment override (file or directory)
|
|
env_path_raw = get_env(CONFIG_ENV_VAR)
|
|
if env_path_raw:
|
|
env_path = Path(env_path_raw).expanduser()
|
|
search_paths.append(env_path)
|
|
|
|
# 3. User overrides in ~/.zen/cli_clients
|
|
search_paths.append(USER_CONFIG_DIR)
|
|
|
|
seen: set[Path] = set()
|
|
|
|
for base in search_paths:
|
|
if not base:
|
|
continue
|
|
if base in seen:
|
|
continue
|
|
seen.add(base)
|
|
|
|
if base.is_file() and base.suffix.lower() == ".json":
|
|
yield base
|
|
continue
|
|
|
|
if base.is_dir():
|
|
for path in sorted(base.glob("*.json")):
|
|
if path.is_file():
|
|
yield path
|
|
else:
|
|
logger.debug("Configuration path does not exist: %s", base)
|
|
|
|
def _resolve_config(self, raw: CLIClientConfig, *, source_path: Path) -> ResolvedCLIClient:
|
|
if not raw.name:
|
|
raise RegistryLoadError(f"CLI configuration at {source_path} is missing a 'name' field")
|
|
|
|
normalized_name = raw.name.strip()
|
|
internal_defaults = INTERNAL_DEFAULTS.get(normalized_name.lower())
|
|
if internal_defaults is None:
|
|
raise RegistryLoadError(f"CLI '{raw.name}' is not supported by clink")
|
|
|
|
executable = self._resolve_executable(raw, internal_defaults, source_path)
|
|
|
|
internal_args = list(internal_defaults.additional_args) if internal_defaults else []
|
|
config_args = list(raw.additional_args)
|
|
|
|
timeout_seconds = raw.timeout_seconds or (
|
|
internal_defaults.timeout_seconds if internal_defaults else DEFAULT_TIMEOUT_SECONDS
|
|
)
|
|
|
|
parser_name = internal_defaults.parser
|
|
if not parser_name:
|
|
raise RegistryLoadError(
|
|
f"CLI '{raw.name}' must define a parser either in configuration or internal defaults"
|
|
)
|
|
|
|
env = self._merge_env(raw, internal_defaults)
|
|
working_dir = self._resolve_optional_path(raw.working_dir, source_path.parent)
|
|
roles = self._resolve_roles(raw, internal_defaults, source_path)
|
|
|
|
output_to_file = raw.output_to_file
|
|
|
|
return ResolvedCLIClient(
|
|
name=normalized_name,
|
|
executable=executable,
|
|
internal_args=internal_args,
|
|
config_args=config_args,
|
|
env=env,
|
|
timeout_seconds=int(timeout_seconds),
|
|
parser=parser_name,
|
|
roles=roles,
|
|
output_to_file=output_to_file,
|
|
working_dir=working_dir,
|
|
)
|
|
|
|
def _resolve_executable(
|
|
self,
|
|
raw: CLIClientConfig,
|
|
internal_defaults: CLIInternalDefaults | None,
|
|
source_path: Path,
|
|
) -> list[str]:
|
|
command = raw.command
|
|
if not command:
|
|
raise RegistryLoadError(f"CLI '{raw.name}' must specify a 'command' in configuration")
|
|
return shlex.split(command)
|
|
|
|
def _merge_env(
|
|
self,
|
|
raw: CLIClientConfig,
|
|
internal_defaults: CLIInternalDefaults | None,
|
|
) -> dict[str, str]:
|
|
merged: dict[str, str] = {}
|
|
if internal_defaults and internal_defaults.env:
|
|
merged.update(internal_defaults.env)
|
|
merged.update(raw.env)
|
|
return merged
|
|
|
|
def _resolve_roles(
|
|
self,
|
|
raw: CLIClientConfig,
|
|
internal_defaults: CLIInternalDefaults | None,
|
|
source_path: Path,
|
|
) -> dict[str, ResolvedCLIRole]:
|
|
roles: dict[str, CLIRoleConfig] = dict(raw.roles)
|
|
|
|
default_role_prompt = internal_defaults.default_role_prompt if internal_defaults else None
|
|
if "default" not in roles:
|
|
roles["default"] = CLIRoleConfig(prompt_path=default_role_prompt)
|
|
elif roles["default"].prompt_path is None and default_role_prompt:
|
|
roles["default"].prompt_path = default_role_prompt
|
|
|
|
resolved: dict[str, ResolvedCLIRole] = {}
|
|
for role_name, role_config in roles.items():
|
|
prompt_path_str = role_config.prompt_path or default_role_prompt
|
|
if not prompt_path_str:
|
|
raise RegistryLoadError(f"Role '{role_name}' for CLI '{raw.name}' must define a prompt_path")
|
|
prompt_path = self._resolve_prompt_path(prompt_path_str, source_path.parent)
|
|
resolved[role_name] = ResolvedCLIRole(
|
|
name=role_name,
|
|
prompt_path=prompt_path,
|
|
role_args=list(role_config.role_args),
|
|
description=role_config.description,
|
|
)
|
|
return resolved
|
|
|
|
def _resolve_prompt_path(self, prompt_path: str, base_dir: Path) -> Path:
|
|
resolved = self._resolve_path(prompt_path, base_dir)
|
|
if not resolved.exists():
|
|
raise RegistryLoadError(f"Prompt file not found: {resolved}")
|
|
return resolved
|
|
|
|
def _resolve_optional_path(self, candidate: str | None, base_dir: Path) -> Path | None:
|
|
if not candidate:
|
|
return None
|
|
return self._resolve_path(candidate, base_dir)
|
|
|
|
def _resolve_path(self, candidate: str, base_dir: Path) -> Path:
|
|
path = Path(candidate)
|
|
if path.is_absolute():
|
|
return path
|
|
|
|
candidate_path = (base_dir / path).resolve()
|
|
if candidate_path.exists():
|
|
return candidate_path
|
|
|
|
project_relative = (PROJECT_ROOT / path).resolve()
|
|
return project_relative
|
|
|
|
|
|
_REGISTRY: ClinkRegistry | None = None
|
|
|
|
|
|
def get_registry() -> ClinkRegistry:
|
|
global _REGISTRY
|
|
if _REGISTRY is None:
|
|
_REGISTRY = ClinkRegistry()
|
|
return _REGISTRY
|