base.pyโข5.55 kB
"""Base adapter interface for CLI orchestrators."""
import os
import shutil
from abc import ABC, abstractmethod
from typing import Any
class CLIAdapter(ABC):
"""Base adapter for CLI orchestrators."""
def __init__(self, name: str, config: dict[str, Any]):
self.name = name
self.config = config
@abstractmethod
async def execute(self, task: str, **kwargs: Any) -> tuple[str, str, int]:
"""
Execute a task using the CLI.
Args:
task: Task description/query
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
pass
@abstractmethod
def validate(self) -> bool:
"""Validate the CLI is installed and accessible."""
pass
@abstractmethod
def format_task(self, task: str, **kwargs: Any) -> list[str]:
"""
Format task into CLI command arguments.
Args:
task: Task description
**kwargs: Additional formatting options
Returns:
list: Command arguments
"""
pass
def get_command(self) -> str | list[str]:
"""Get base command for this adapter."""
return self.config.get("command", self.name)
def get_args(self) -> list[str]:
"""Get default arguments for this adapter."""
return self.config.get("args", [])
def get_env(self) -> dict[str, str]:
"""Get environment variables for this adapter."""
return self.config.get("env", {})
def get_timeout(self) -> int:
"""Get timeout in seconds."""
return self.config.get("timeout", 300)
async def execute_streaming(
self,
task: str,
progress_callback: Any = None,
timeout: int | None = None,
**kwargs: Any
) -> tuple[str, str, int]:
"""
Execute task with real-time output streaming.
Args:
task: Task description/query
progress_callback: Optional async callback for progress updates
timeout: Override timeout in seconds
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
import asyncio
# Get command and args
cmd_args = self.format_task(task, **kwargs)
resolved_cmd = self.resolve_command(cmd_args[0])
if isinstance(resolved_cmd, str):
full_cmd = [resolved_cmd] + cmd_args[1:]
else:
full_cmd = resolved_cmd + cmd_args[1:]
# Merge environment
env = os.environ.copy()
env.update(self.get_env())
# Start process with pipes for streaming
process = await asyncio.create_subprocess_exec(
*full_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout_lines = []
stderr_lines = []
async def read_stream(stream, line_buffer, prefix=""):
"""Read stream line-by-line and call progress callback."""
while True:
line = await stream.readline()
if not line:
break
decoded = line.decode().strip()
if decoded:
line_buffer.append(decoded)
if progress_callback:
try:
await progress_callback(f"{prefix}{decoded}")
except Exception:
pass # Don't fail on callback errors
try:
# Read both streams concurrently with timeout
effective_timeout = timeout or self.get_timeout()
await asyncio.wait_for(
asyncio.gather(
read_stream(process.stdout, stdout_lines),
read_stream(process.stderr, stderr_lines, "[stderr] "),
),
timeout=effective_timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise TimeoutError(f"{self.name} timed out after {effective_timeout}s")
# Wait for process to complete
await process.wait()
return (
"\n".join(stdout_lines),
"\n".join(stderr_lines),
process.returncode
)
@staticmethod
def resolve_command(cmd: str | list[str]) -> str | list[str]:
"""
Resolve command to full path on Windows.
On Windows, asyncio.create_subprocess_exec() doesn't reliably search PATH,
so we need to resolve commands to their full paths using shutil.which().
Args:
cmd: Command string or list of command parts
Returns:
Resolved command (full path on Windows, original on Unix)
"""
if os.name != "nt":
# On Unix systems, PATH search works fine
return cmd
# On Windows, resolve the executable path
if isinstance(cmd, list):
if not cmd:
return cmd
# Resolve first element (the executable)
resolved = shutil.which(cmd[0])
if resolved:
return [resolved] + cmd[1:]
return cmd
else:
# Single string command
resolved = shutil.which(cmd)
return resolved if resolved else cmd