214 lines
7.3 KiB
Python
214 lines
7.3 KiB
Python
"""Execute configured CLI agents for the clink tool and parse output."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import tempfile
|
|
import time
|
|
from collections.abc import Sequence
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from clink.constants import DEFAULT_STREAM_LIMIT
|
|
from clink.models import ResolvedCLIClient, ResolvedCLIRole
|
|
from clink.parsers import BaseParser, ParsedCLIResponse, ParserError, get_parser
|
|
|
|
logger = logging.getLogger("clink.agent")
|
|
|
|
|
|
@dataclass
|
|
class AgentOutput:
|
|
"""Container returned by CLI agents after successful execution."""
|
|
|
|
parsed: ParsedCLIResponse
|
|
sanitized_command: list[str]
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
duration_seconds: float
|
|
parser_name: str
|
|
output_file_content: str | None = None
|
|
|
|
|
|
class CLIAgentError(RuntimeError):
|
|
"""Raised when a CLI agent fails (non-zero exit, timeout, parse errors)."""
|
|
|
|
def __init__(self, message: str, *, returncode: int | None = None, stdout: str = "", stderr: str = "") -> None:
|
|
super().__init__(message)
|
|
self.returncode = returncode
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
|
|
|
|
class BaseCLIAgent:
|
|
"""Execute a configured CLI command and parse its output."""
|
|
|
|
def __init__(self, client: ResolvedCLIClient):
|
|
self.client = client
|
|
self._parser: BaseParser = get_parser(client.parser)
|
|
self._logger = logging.getLogger(f"clink.runner.{client.name}")
|
|
|
|
async def run(
|
|
self,
|
|
*,
|
|
role: ResolvedCLIRole,
|
|
prompt: str,
|
|
files: Sequence[str],
|
|
images: Sequence[str],
|
|
) -> AgentOutput:
|
|
# Files and images are already embedded into the prompt by the tool; they are
|
|
# accepted here only to keep parity with SimpleTool callers.
|
|
_ = (files, images)
|
|
# The runner simply executes the configured CLI command for the selected role.
|
|
command = self._build_command(role=role)
|
|
env = self._build_environment()
|
|
sanitized_command = list(command)
|
|
|
|
cwd = str(self.client.working_dir) if self.client.working_dir else None
|
|
limit = DEFAULT_STREAM_LIMIT
|
|
|
|
stdout_text = ""
|
|
stderr_text = ""
|
|
output_file_content: str | None = None
|
|
start_time = time.monotonic()
|
|
|
|
output_file_path: Path | None = None
|
|
command_with_output_flag = list(command)
|
|
|
|
if self.client.output_to_file:
|
|
fd, tmp_path = tempfile.mkstemp(prefix="clink-", suffix=".json")
|
|
os.close(fd)
|
|
output_file_path = Path(tmp_path)
|
|
flag_template = self.client.output_to_file.flag_template
|
|
try:
|
|
rendered_flag = flag_template.format(path=str(output_file_path))
|
|
except KeyError as exc: # pragma: no cover - defensive
|
|
raise CLIAgentError(f"Invalid output flag template '{flag_template}': missing placeholder {exc}")
|
|
command_with_output_flag.extend(shlex.split(rendered_flag))
|
|
sanitized_command = list(command_with_output_flag)
|
|
|
|
self._logger.debug("Executing CLI command: %s", " ".join(sanitized_command))
|
|
if cwd:
|
|
self._logger.debug("Working directory: %s", cwd)
|
|
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*command_with_output_flag,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=cwd,
|
|
limit=limit,
|
|
env=env,
|
|
)
|
|
except FileNotFoundError as exc:
|
|
raise CLIAgentError(f"Executable not found for CLI '{self.client.name}': {exc}") from exc
|
|
|
|
try:
|
|
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
|
process.communicate(prompt.encode("utf-8")),
|
|
timeout=self.client.timeout_seconds,
|
|
)
|
|
except asyncio.TimeoutError as exc:
|
|
process.kill()
|
|
await process.communicate()
|
|
raise CLIAgentError(
|
|
f"CLI '{self.client.name}' timed out after {self.client.timeout_seconds} seconds",
|
|
returncode=None,
|
|
) from exc
|
|
|
|
duration = time.monotonic() - start_time
|
|
return_code = process.returncode
|
|
stdout_text = stdout_bytes.decode("utf-8", errors="replace")
|
|
stderr_text = stderr_bytes.decode("utf-8", errors="replace")
|
|
|
|
if output_file_path and output_file_path.exists():
|
|
output_file_content = output_file_path.read_text(encoding="utf-8", errors="replace")
|
|
if self.client.output_to_file and self.client.output_to_file.cleanup:
|
|
try:
|
|
output_file_path.unlink()
|
|
except OSError: # pragma: no cover - best effort cleanup
|
|
pass
|
|
|
|
if output_file_content and not stdout_text.strip():
|
|
stdout_text = output_file_content
|
|
|
|
if return_code != 0:
|
|
recovered = self._recover_from_error(
|
|
returncode=return_code,
|
|
stdout=stdout_text,
|
|
stderr=stderr_text,
|
|
sanitized_command=sanitized_command,
|
|
duration_seconds=duration,
|
|
output_file_content=output_file_content,
|
|
)
|
|
if recovered is not None:
|
|
return recovered
|
|
|
|
if return_code != 0:
|
|
raise CLIAgentError(
|
|
f"CLI '{self.client.name}' exited with status {return_code}",
|
|
returncode=return_code,
|
|
stdout=stdout_text,
|
|
stderr=stderr_text,
|
|
)
|
|
|
|
try:
|
|
parsed = self._parser.parse(stdout_text, stderr_text)
|
|
except ParserError as exc:
|
|
raise CLIAgentError(
|
|
f"Failed to parse output from CLI '{self.client.name}': {exc}",
|
|
returncode=return_code,
|
|
stdout=stdout_text,
|
|
stderr=stderr_text,
|
|
) from exc
|
|
|
|
return AgentOutput(
|
|
parsed=parsed,
|
|
sanitized_command=sanitized_command,
|
|
returncode=return_code,
|
|
stdout=stdout_text,
|
|
stderr=stderr_text,
|
|
duration_seconds=duration,
|
|
parser_name=self._parser.name,
|
|
output_file_content=output_file_content,
|
|
)
|
|
|
|
def _build_command(self, *, role: ResolvedCLIRole) -> list[str]:
|
|
base = list(self.client.executable)
|
|
base.extend(self.client.internal_args)
|
|
base.extend(self.client.config_args)
|
|
base.extend(role.role_args)
|
|
|
|
return base
|
|
|
|
def _build_environment(self) -> dict[str, str]:
|
|
env = os.environ.copy()
|
|
env.update(self.client.env)
|
|
return env
|
|
|
|
# ------------------------------------------------------------------
|
|
# Error recovery hooks
|
|
# ------------------------------------------------------------------
|
|
|
|
def _recover_from_error(
|
|
self,
|
|
*,
|
|
returncode: int,
|
|
stdout: str,
|
|
stderr: str,
|
|
sanitized_command: list[str],
|
|
duration_seconds: float,
|
|
output_file_content: str | None,
|
|
) -> AgentOutput | None:
|
|
"""Hook for subclasses to convert CLI errors into successful outputs.
|
|
|
|
Return an AgentOutput to treat the failure as success, or None to signal
|
|
that normal error handling should proceed.
|
|
"""
|
|
|
|
return None
|