server.py•41.8 kB
"""FastMCP bridge that exposes Claude-style agent definitions to any MCP client.
The bridge reads `.claude/agents/*.md` files (or another directory, if
configured) and registers each agent as a FastMCP tool plus a mirrored
`agent://` resource. When a tool is invoked the bridge assembles the agent
instructions, spawns the requested CLI runner (Codex, Claude, Gemini, etc.),
and streams structured logs back to the caller for easy debugging.
Quick start (from the repo root):
```bash
uv sync # install project dependencies once
uv run fastmcp run polyagent_mcp.py:mcp # stdio transport by default
```
Add the same command to your LLM CLI of choice:
* **Codex CLI** – `codex mcp add agents -- uv run fastmcp run polyagent_mcp.py:mcp`
* **Gemini CLI** – `gemini mcp add agents -- uv run fastmcp run polyagent_mcp.py:mcp`
Both CLIs understand stdio transports and will stream the bridge's structured
logs, manifest resources, and tool outputs.
### Configuration
`BridgeSettings` is a `BaseSettings` model, so environment variables are parsed
and type-coerced automatically:
* `POLYAGENT_MCP_DIR` – directory with Markdown agent files (default: `.claude/agents`).
* `POLYAGENT_MCP_CONFIG` – optional YAML/JSON file that extends runner/agent config.
* `POLYAGENT_MCP_WORKSPACE` – working directory used when launching runners.
* `POLYAGENT_MCP_TIMEOUT` – default timeout (seconds) for runner execution.
* `POLYAGENT_MCP_LOG_LEVEL` – Python log level (also forwarded to MCP clients).
* `POLYAGENT_MCP_DEBUG` – enable FastMCP debug mode when set to truthy value.
* `POLYAGENT_MCP_SERVER_NAME` / `POLYAGENT_MCP_INSTRUCTIONS` – MCP metadata overrides.
* `POLYAGENT_MCP_TRANSPORT`, `POLYAGENT_MCP_HOST`, `POLYAGENT_MCP_PORT` – only needed
when running this module directly instead of via `fastmcp run`.
Expose diagnostics through the built-in resources:
* `resource://agents/manifest` – discovered agents with runner availability.
* `resource://agents/config` – effective config plus the optional override file.
* `agent://<name>` – raw Markdown instructions for each agent.
The rest of the module keeps to the KISS principle: minimal state, convenient
logging, and a factory function (`create_mcp_server`) that the FastMCP CLI can
import without extra glue code.
"""
from __future__ import annotations
import asyncio
import json
import os
import shutil
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Literal, cast
import fastmcp
import yaml
from fastmcp import Context, FastMCP
from fastmcp.utilities.logging import configure_logging, get_logger
from pydantic import AliasChoices, BaseModel, Field, ValidationError, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
DEFAULT_SERVER_NAME = "agents-mcp"
DEFAULT_INSTRUCTIONS = (
"You are connected to Agents MCP, which exposes the project's specialist agents "
"as standalone MCP tools. This server mirrors the workflow described in CLAUDE.md/AGENTS.md "
"so clients without native agent support (Codex CLI, Gemini CLI, etc.) can delegate work to "
"dedicated sub-agents while keeping the main conversation concise.\n\n"
"Discovery:\n"
"- Call `list_tools` or read `resource://agents/manifest` to see the available agents and the CLI runners they can use.\n"
"- Read `agent://<name>` to review an agent's full instructions before delegating.\n\n"
"Execution:\n"
"- Invoke the tool named after the agent (e.g. `commit-agent`) and provide at least a `task` string describing the desired outcome.\n"
"- Optional arguments: `context` for extra details, `runner` to force a specific CLI (codex/claude/gemini/custom), `model` for runner-specific model aliases, and `timeout_seconds` to extend long operations.\n"
"- When no runner is supplied the server picks the first available option in its priority list.\n\n"
"Why delegate:\n"
"- Agents operate with focused prompts, reducing the primary conversation's token footprint and improving adherence to specialized procedures.\n"
"- Use them whenever a self-contained specialist can produce higher quality output, perform automated checks, or run local tooling on your behalf.\n"
"- Inspect MCP log messages to monitor progress; stdout/stderr from each run is streamed back for debugging."
)
DEFAULT_TIMEOUT_SECONDS = 900
DEFAULT_AGENT_GLOBS = ("*.md", "*.markdown")
DEFAULT_RUNNER_PRIORITY = ("codex", "claude", "gemini")
ENV_PREFIX = "POLYAGENT_MCP_"
logger = get_logger(__name__)
# ---------------------------------------------------------------------------
# Output Format System
# ---------------------------------------------------------------------------
class OutputFormat(str, Enum):
"""Supported output formats for agent runners."""
CODEX_JSONL = "codex_jsonl"
GEMINI_JSON = "gemini_json"
RAW = "raw"
# Codex JSONL Models
class CodexTokenUsage(BaseModel):
"""Token usage statistics from a Codex turn."""
input_tokens: int
cached_input_tokens: int = 0
output_tokens: int
class CodexItem(BaseModel):
"""An item in a Codex thread (message, reasoning, command, etc.)."""
id: str
type: str # Codex uses "type" not "item_type" at the item level
text: str | None = None
command: str | None = None
aggregated_output: str | None = None
exit_code: int | None = None
status: str | None = None
class CodexEvent(BaseModel):
"""A single event in Codex JSONL output."""
type: str
thread_id: str | None = None
item: CodexItem | None = None
usage: CodexTokenUsage | None = None
error: str | None = None
# Gemini JSON Models
class GeminiApiStats(BaseModel):
"""API request statistics from Gemini."""
total_requests: int = Field(default=0, alias="totalRequests")
total_errors: int = Field(default=0, alias="totalErrors")
total_latency_ms: int = Field(default=0, alias="totalLatencyMs")
model_config = {"populate_by_name": True}
class GeminiTokenStats(BaseModel):
"""Token usage statistics from Gemini."""
prompt: int = 0
candidates: int = 0
total: int = 0
cached: int = 0
thoughts: int = 0
tool: int = 0
class GeminiModelStats(BaseModel):
"""Per-model statistics from Gemini."""
api: GeminiApiStats
tokens: GeminiTokenStats
class GeminiToolDecisions(BaseModel):
"""Tool approval decisions from Gemini."""
accept: int = 0
reject: int = 0
modify: int = 0
auto_accept: int = 0
class GeminiToolStats(BaseModel):
"""Per-tool statistics from Gemini."""
count: int = 0
success: int = 0
fail: int = 0
duration_ms: int = Field(default=0, alias="durationMs")
decisions: GeminiToolDecisions
model_config = {"populate_by_name": True}
class GeminiStats(BaseModel):
"""Complete statistics from Gemini CLI output."""
models: dict[str, GeminiModelStats] = Field(default_factory=dict)
tools: dict[str, Any] = Field(default_factory=dict) # Partial, includes byName dict
files: dict[str, int] = Field(default_factory=dict)
class GeminiOutput(BaseModel):
"""Complete Gemini CLI JSON output."""
response: str
stats: GeminiStats | None = None
# Parser Functions
def parse_codex_jsonl(stdout: str) -> str:
"""Extract assistant messages from codex JSONL output.
Codex produces JSON Lines where each line is an event. We only care about
'item.completed' events where item_type is 'assistant_message', and we
extract the text field from those items.
Args:
stdout: Raw stdout from codex CLI with --json flag
Returns:
Combined text from all assistant messages
Raises:
ValueError: If no valid assistant messages found or output is malformed
"""
if not stdout.strip():
raise ValueError("Codex output is empty")
messages: list[str] = []
for line_num, line in enumerate(stdout.splitlines(), start=1):
line = line.strip()
if not line:
continue
try:
event = CodexEvent.model_validate_json(line)
except ValidationError as exc:
# Provide detailed context for debugging
errors_detail = "; ".join(
f"{'.'.join(str(loc) for loc in err['loc'])}: {err['msg']}"
for err in exc.errors()[:3] # Show first 3 errors
)
raise ValueError(
f"Codex validation failed on line {line_num}. "
f"Errors: {errors_detail}. "
f"Raw JSON: {line[:500]}"
) from exc
except json.JSONDecodeError as exc:
raise ValueError(
f"Invalid JSON on line {line_num} at position {exc.pos}: {exc.msg}. "
f"Raw content: {line[:500]}"
) from exc
# Only process completed items with assistant/agent messages
# Note: Codex uses both "assistant_message" and "agent_message"
if (
event.type == "item.completed"
and event.item
and event.item.type in ("assistant_message", "agent_message")
and event.item.text
):
messages.append(event.item.text.strip())
if not messages:
raise ValueError(
f"No assistant messages found in codex JSONL output. "
f"Processed {line_num} lines. "
f"First line sample: {stdout.splitlines()[0][:200] if stdout.splitlines() else 'N/A'}"
)
return "\n\n".join(messages)
def parse_gemini_json(stdout: str) -> str:
"""Extract response field from gemini JSON output.
Gemini with --output-format json produces a single JSON object with a
'response' field containing the actual agent output. Other fields like
'stats' are ignored to minimize context.
Args:
stdout: Raw stdout from gemini CLI with --output-format json
Returns:
Content of the 'response' field
Raises:
ValueError: If JSON is malformed or response field is missing/empty
"""
if not stdout.strip():
raise ValueError("Gemini output is empty")
try:
output = GeminiOutput.model_validate_json(stdout)
except ValidationError as exc:
# Provide detailed context for debugging
errors_detail = "; ".join(
f"{'.'.join(str(loc) for loc in err['loc'])}: {err['msg']}"
for err in exc.errors()[:3] # Show first 3 errors
)
raise ValueError(
f"Gemini validation failed. "
f"Errors: {errors_detail}. "
f"Raw JSON: {stdout[:500]}"
) from exc
except json.JSONDecodeError as exc:
raise ValueError(
f"Invalid JSON in gemini output at position {exc.pos}: {exc.msg}. "
f"Raw content: {stdout[:500]}"
) from exc
if not output.response.strip():
raise ValueError(
f"Gemini 'response' field is empty. "
f"Full output keys: {', '.join(stdout[:200])}"
)
return output.response.strip()
def parse_raw_output(stdout: str, stderr: str) -> str:
"""Fallback parser for runners without structured output.
Returns stdout if non-empty, otherwise stderr. This is used for runners
that don't support JSON output formats (e.g., claude CLI, custom tools).
Args:
stdout: Raw stdout from runner
stderr: Raw stderr from runner
Returns:
stdout if non-empty, otherwise stderr
Raises:
ValueError: If both stdout and stderr are empty
"""
if stdout.strip():
return stdout.strip()
if stderr.strip():
return stderr.strip()
raise ValueError("Runner produced no output on stdout or stderr")
# ---------------------------------------------------------------------------
# Configuration & State
# ---------------------------------------------------------------------------
DEFAULT_RUNNERS: dict[str, dict[str, Any]] = {
"codex": {
"command": ["codex", "exec", "--full-auto", "--json"],
"output_format": "codex_jsonl",
"model_flag": "--model",
"model_map": {"sonnet": "gpt-5-codex"},
},
"claude": {
"command": ["claude", "--print"],
"output_format": "raw",
"model_flag": "--model",
"model_map": {"sonnet": "sonnet"},
},
"gemini": {
"command": ["gemini", "--output-format", "json"],
"output_format": "gemini_json",
"model_flag": "-m",
"model_map": {"sonnet": "gemini-2.5-pro"},
},
}
def _default_agents_dir() -> Path:
"""Find agents directory by walking up from CWD to filesystem root.
Searches for .claude/agents/ or agents/ in the current directory and all
parent directories, similar to how git finds .git/. This allows the tool
to work when invoked from project subdirectories.
"""
current = Path.cwd()
for parent in [current, *current.parents]:
for candidate in (parent / ".claude/agents", parent / "agents"):
if candidate.exists() and candidate.is_dir():
return candidate
raise RuntimeError(
"Unable to locate agents directory. Searched for '.claude/agents' or 'agents' "
f"in {current} and all parent directories. Set POLYAGENT_MCP_DIR explicitly "
"or create one of these directories with agent definitions."
)
class BridgeSettings(BaseSettings):
"""Typed configuration with automatic environment loading."""
model_config = SettingsConfigDict(case_sensitive=False)
agents_dir: Path = Field(
default_factory=_default_agents_dir,
validation_alias=AliasChoices("agents_dir", f"{ENV_PREFIX}DIR"),
)
config_path: Path | None = Field(
default=None,
validation_alias=AliasChoices("config_path", f"{ENV_PREFIX}CONFIG"),
)
workspace: Path = Field(
default_factory=Path.cwd,
validation_alias=AliasChoices("workspace", f"{ENV_PREFIX}WORKSPACE"),
)
timeout: int = Field(
default=DEFAULT_TIMEOUT_SECONDS,
validation_alias=AliasChoices("timeout", f"{ENV_PREFIX}TIMEOUT"),
)
log_level: str = Field(
default="INFO",
validation_alias=AliasChoices("log_level", f"{ENV_PREFIX}LOG_LEVEL"),
)
debug: bool = Field(
default=False,
validation_alias=AliasChoices("debug", f"{ENV_PREFIX}DEBUG"),
)
server_name: str = Field(
default=DEFAULT_SERVER_NAME,
validation_alias=AliasChoices("server_name", f"{ENV_PREFIX}SERVER_NAME"),
)
server_instructions: str = Field(
default=DEFAULT_INSTRUCTIONS,
validation_alias=AliasChoices("server_instructions", f"{ENV_PREFIX}INSTRUCTIONS"),
)
@field_validator("agents_dir")
@classmethod
def _validate_agents_dir(cls, value: Path) -> Path:
if not value.exists():
raise ValueError(f"Agents directory not found: {value}")
return value
@field_validator("log_level")
@classmethod
def _normalise_level(cls, value: str) -> str:
return value.upper()
@dataclass(frozen=True)
class AgentDefinition:
name: str
description: str
instructions: str
model_hint: str | None
metadata: dict[str, Any]
path: Path
@dataclass(frozen=True)
class AgentOverride:
runner: str | None = None
model: str | None = None
@dataclass
class RunnerConfig:
command: tuple[str, ...]
output_format: OutputFormat
env: dict[str, str] = field(default_factory=dict)
model_flag: str | None = None
model_map: dict[str, str | None] = field(default_factory=dict)
prompt_prefix: str | None = None
prompt_suffix: str | None = None
def available(self) -> bool:
return bool(self.command) and shutil.which(self.command[0]) is not None
def build_command(self, prompt: str, model_alias: str | None) -> list[str]:
args = list(self.command)
mapped = self.map_model(model_alias)
if self.model_flag and mapped:
args.extend([self.model_flag, mapped])
args.append(prompt)
return args
def map_model(self, alias: str | None) -> str | None:
if alias is None:
return None
if not self.model_map:
return alias
return self.model_map.get(alias, alias)
@dataclass(frozen=True)
class BridgeConfig:
default_runner: str | None
runner_priority: tuple[str, ...]
runners: dict[str, RunnerConfig]
overrides: dict[str, AgentOverride]
@property
def available_runners(self) -> dict[str, RunnerConfig]:
return {name: cfg for name, cfg in self.runners.items() if cfg.available()}
@dataclass
class BridgeState:
settings: BridgeSettings
config: BridgeConfig
agents: list[AgentDefinition]
@property
def manifest(self) -> dict[str, Any]:
return {
"agents": [
{
"name": agent.name,
"description": agent.description,
"model": agent.model_hint,
"path": str(agent.path),
"metadata": agent.metadata,
"override": _override_to_dict(self.config.overrides.get(agent.name)),
}
for agent in self.agents
],
"runners": {
name: {
"command": list(cfg.command),
"output_format": cfg.output_format.value,
"available": cfg.available(),
}
for name, cfg in self.config.runners.items()
},
"runner_priority": list(self.config.runner_priority),
"default_runner": self.config.default_runner,
"workspace": str(self.settings.workspace),
"timeout": self.settings.timeout,
}
# ---------------------------------------------------------------------------
# Bootstrap
# ---------------------------------------------------------------------------
def create_mcp_server() -> FastMCP:
"""Factory function used by fastmcp CLI and script entry point."""
settings = BridgeSettings()
effective_level = "DEBUG" if settings.debug else settings.log_level
configure_logging(cast(Any, effective_level))
logger.info(
"Initialising agents bridge",
extra=settings.model_dump(mode="json"),
)
mcp = FastMCP(name=settings.server_name, instructions=settings.server_instructions)
if settings.debug:
fastmcp.settings.debug = True
config = _load_bridge_config(settings.config_path)
agents = _discover_agents(settings.agents_dir)
state = BridgeState(settings=settings, config=config, agents=agents)
_register_manifest_resources(mcp, state)
_register_agent_resources(mcp, state)
_register_agent_tools(mcp, state)
logger.info(
"Loaded %s agents and %s runners",
len(agents),
len(config.available_runners),
extra={
"agents": [agent.name for agent in agents],
"available_runners": list(config.available_runners),
},
)
return mcp
# ---------------------------------------------------------------------------
# Registration helpers
# ---------------------------------------------------------------------------
def _register_manifest_resources(mcp: FastMCP, state: BridgeState) -> None:
@mcp.resource(
"resource://agents/manifest",
description="List of registered agents and runner availability",
mime_type="application/json",
)
def _manifest() -> dict[str, Any]:
return state.manifest
@mcp.resource(
"resource://agents/config",
description="Raw bridge configuration",
mime_type="application/json",
)
def _config_resource() -> dict[str, Any]:
config_path = state.settings.config_path
data = state.manifest.copy()
if config_path and config_path.exists():
data["config_source"] = str(config_path)
data["config_payload"] = _load_config_file(config_path)
else:
data["config_source"] = None
return data
def _register_agent_resources(mcp: FastMCP, state: BridgeState) -> None:
for agent in state.agents:
_register_single_agent_resource(mcp, agent)
def _register_single_agent_resource(mcp: FastMCP, agent: AgentDefinition) -> None:
uri = f"agent://{agent.name}"
@mcp.resource(
uri,
description=agent.description,
mime_type="text/markdown",
)
def _read_agent() -> str:
return agent.instructions
def _register_agent_tools(mcp: FastMCP, state: BridgeState) -> None:
for agent in state.agents:
_register_single_agent_tool(mcp, state, agent)
def _register_single_agent_tool(mcp: FastMCP, state: BridgeState, agent: AgentDefinition) -> None:
@mcp.tool(name=agent.name, description=agent.description)
async def _run_agent( # type: ignore[no-redef]
task: str,
ctx: Context,
context: str | None = None,
runner: str | None = None,
model: str | None = None,
timeout_seconds: int | None = None,
) -> dict[str, Any]:
return await _execute_agent(
state=state,
agent=agent,
task=task,
context_block=context,
requested_runner=runner,
requested_model=model,
timeout_override=timeout_seconds,
ctx=ctx,
)
# ---------------------------------------------------------------------------
# Agent Execution
# ---------------------------------------------------------------------------
async def _execute_agent(
*,
state: BridgeState,
agent: AgentDefinition,
task: str,
context_block: str | None,
requested_runner: str | None,
requested_model: str | None,
timeout_override: int | None,
ctx: Context,
) -> dict[str, Any]:
"""Execute an agent task using the configured runner.
Returns a minimal response optimized for context management:
- output: The agent's response text
- success: Whether the execution succeeded
- error: Error message if failed (only present on failure)
"""
if not task or not task.strip():
raise ValueError("Agent tool requires a non-empty `task` argument")
runner_name, runner_cfg = _select_runner(state, agent, requested_runner)
model_alias = _select_model(state, agent, requested_model)
prompt = _compose_prompt(agent, runner_cfg, task, context_block)
command = runner_cfg.build_command(prompt, model_alias)
env = os.environ.copy()
env.update(runner_cfg.env)
timeout = timeout_override or state.settings.timeout
payload = {
"agent": agent.name,
"runner": runner_name,
"model": model_alias,
"timeout": timeout,
"workspace": str(state.settings.workspace),
"request_id": str(ctx.request_id),
}
await ctx.info(f"Running agent '{agent.name}'", extra=payload)
logger.debug("Launching runner", extra=payload)
result = await _spawn_runner(
runner=runner_name,
command=command,
env=env,
cwd=state.settings.workspace,
timeout=timeout,
)
# Parse output based on runner format
try:
output = _parse_runner_output(runner_cfg, result.stdout, result.stderr)
success = True
error = None
debug_context = None
except ValueError as exc:
output = None
success = False
error = str(exc)
# Provide comprehensive debug context with FULL output for debugging
debug_context = {
"runner": runner_name,
"format": runner_cfg.output_format.value,
"exit_code": result.exit_code,
"stdout": result.stdout, # Full stdout - needed for debugging
"stderr": result.stderr, # Full stderr - needed for debugging
}
await _log_result(ctx, agent.name, runner_name, result, success, error)
# Return minimal response for context optimization
response: dict[str, Any] = {
"success": success,
}
if success:
response["output"] = output
else:
response["error"] = error
# Include comprehensive debug context for failures
if debug_context:
response["debug"] = debug_context
return response
def _parse_runner_output(runner_cfg: RunnerConfig, stdout: str, stderr: str) -> str:
"""Parse runner output based on configured format.
Args:
runner_cfg: Runner configuration with output_format
stdout: Raw stdout from runner
stderr: Raw stderr from runner
Returns:
Parsed agent output text
Raises:
ValueError: If output cannot be parsed according to expected format
"""
if runner_cfg.output_format == OutputFormat.CODEX_JSONL:
return parse_codex_jsonl(stdout)
elif runner_cfg.output_format == OutputFormat.GEMINI_JSON:
return parse_gemini_json(stdout)
elif runner_cfg.output_format == OutputFormat.RAW:
return parse_raw_output(stdout, stderr)
else:
raise ValueError(f"Unsupported output format: {runner_cfg.output_format}")
def _select_runner(
state: BridgeState,
agent: AgentDefinition,
requested: str | None,
) -> tuple[str, RunnerConfig]:
candidates: list[str | None] = [requested]
override = state.config.overrides.get(agent.name)
if override and override.runner:
candidates.append(override.runner)
meta_runner = agent.metadata.get("bridge_runner") or agent.metadata.get("runner")
candidates.append(meta_runner if isinstance(meta_runner, str) else None)
candidates.append(state.config.default_runner)
candidates.extend(state.config.runner_priority)
candidates.extend(state.config.available_runners)
seen: set[str] = set()
for candidate in filter(None, candidates):
if candidate in seen:
continue
seen.add(candidate)
runner_cfg = state.config.runners.get(candidate)
if runner_cfg and runner_cfg.available():
return candidate, runner_cfg
raise RuntimeError(
"No runnable CLI found for agent. Install codex/claude/gemini or configure a custom runner."
)
def _select_model(
state: BridgeState,
agent: AgentDefinition,
requested: str | None,
) -> str | None:
override = state.config.overrides.get(agent.name)
if override and override.model:
return override.model
if requested:
return requested
return agent.model_hint
def _compose_prompt(
agent: AgentDefinition,
runner_cfg: RunnerConfig,
task: str,
context_block: str | None,
) -> str:
sections: list[str] = []
if runner_cfg.prompt_prefix:
sections.append(runner_cfg.prompt_prefix.strip())
sections.append(agent.instructions.strip())
sections.append("\n## Task\n" + task.strip())
if context_block and context_block.strip():
sections.append("\n## Additional Context\n" + context_block.strip())
if runner_cfg.prompt_suffix:
sections.append(runner_cfg.prompt_suffix.strip())
return "\n\n".join(part for part in sections if part)
# ---------------------------------------------------------------------------
# Subprocess Management
# ---------------------------------------------------------------------------
@dataclass
class SubprocessResult:
runner: str
command: list[str]
exit_code: int | None
timed_out: bool
stdout: str
stderr: str
spawn_error: str | None
async def _spawn_runner(
*,
runner: str,
command: list[str],
env: dict[str, str],
cwd: Path,
timeout: int,
) -> SubprocessResult:
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(cwd),
env=env,
)
except FileNotFoundError as exc:
logger.error("Runner %s executable not found", runner)
return SubprocessResult(
runner=runner,
command=command,
exit_code=None,
timed_out=False,
stdout="",
stderr=str(exc),
spawn_error=str(exc),
)
except Exception as exc: # noqa: BLE001
logger.exception("Failed to spawn runner %s", runner)
return SubprocessResult(
runner=runner,
command=command,
exit_code=None,
timed_out=False,
stdout="",
stderr=str(exc),
spawn_error=str(exc),
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=timeout)
timed_out = False
except TimeoutError:
process.kill()
await process.wait()
stdout_bytes, stderr_bytes = b"", b""
timed_out = True
stdout = stdout_bytes.decode("utf-8", errors="replace")
stderr = stderr_bytes.decode("utf-8", errors="replace")
exit_code = None if timed_out else process.returncode
return SubprocessResult(
runner=runner,
command=command,
exit_code=exit_code,
timed_out=timed_out,
stdout=stdout,
stderr=stderr,
spawn_error=None,
)
async def _log_result(
ctx: Context,
agent_name: str,
runner_name: str,
result: SubprocessResult,
success: bool,
error: str | None,
) -> None:
"""Log execution result to MCP context."""
payload = {
"agent": agent_name,
"runner": runner_name,
"exit_code": result.exit_code,
"timed_out": result.timed_out,
"success": success,
}
if result.spawn_error:
payload["spawn_error"] = result.spawn_error
await ctx.error(f"Runner '{runner_name}' failed to start", extra=payload)
logger.error("Runner %s failed to start: %s", runner_name, result.spawn_error)
return
if result.timed_out:
await ctx.error(f"Runner '{runner_name}' exceeded timeout", extra=payload)
logger.error("Runner %s exceeded timeout", runner_name)
elif not success:
payload["error"] = error
await ctx.warning(f"Runner '{runner_name}' output parsing failed", extra=payload)
logger.warning("Runner %s output parsing failed: %s", runner_name, error)
elif result.exit_code not in (None, 0):
await ctx.warning(
f"Runner '{runner_name}' exited with code {result.exit_code}",
extra=payload,
)
logger.warning("Runner %s exited with %s", runner_name, result.exit_code)
else:
await ctx.info(f"Runner '{runner_name}' completed successfully", extra=payload)
logger.debug("Runner %s completed", runner_name)
# ---------------------------------------------------------------------------
# Discovery & configuration
# ---------------------------------------------------------------------------
def _discover_agents(agents_dir: Path) -> list[AgentDefinition]:
definitions: list[AgentDefinition] = []
for pattern in DEFAULT_AGENT_GLOBS:
for path in sorted(agents_dir.glob(pattern)):
if path.is_file():
definitions.append(_parse_agent_file(path))
if not definitions:
raise RuntimeError(f"No agent definitions discovered in {agents_dir}")
return definitions
def _parse_agent_file(path: Path) -> AgentDefinition:
text = path.read_text(encoding="utf-8")
metadata, body = _extract_front_matter(text, path)
name = str(metadata.get("name") or path.stem)
description = str(metadata.get("description") or f"Agent {name}")
model_hint = metadata.get("model")
instructions = body.strip()
return AgentDefinition(
name=name,
description=description,
instructions=instructions,
model_hint=str(model_hint) if model_hint else None,
metadata=metadata,
path=path,
)
def _extract_front_matter(text: str, path: Path | None = None) -> tuple[dict[str, Any], str]:
"""Extract YAML front matter from agent markdown files.
Expects front matter in the format:
---
key: value
---
Uses a hybrid approach:
1. First tries standard YAML parsing (supports full YAML spec)
2. Falls back to simple key-value regex parsing for malformed YAML
This handles cases where Claude Code generates files with unquoted
multi-line values or other YAML syntax issues.
Args:
text: The full text content of the agent file
path: Optional path to the agent file (for better error messages)
Returns:
Tuple of (metadata dict, body text)
"""
if not text.startswith("---"):
return {}, text
lines = text.splitlines()
metadata_lines: list[str] = []
closing_index: int | None = None
for idx, line in enumerate(lines[1:], start=1):
if line.strip() == "---":
closing_index = idx
break
metadata_lines.append(line)
if closing_index is None:
return {}, text
metadata_text = "\n".join(metadata_lines)
file_info = f" in {path}" if path else ""
# Try standard YAML parsing first
try:
metadata = yaml.safe_load(metadata_text) or {}
if not isinstance(metadata, dict):
logger.warning(
"Front matter is not a dictionary%s, ignoring it. Content: %s",
file_info,
metadata_text[:100],
)
metadata = {}
except yaml.YAMLError as exc:
# Fall back to simple regex parsing for malformed YAML
logger.info(
"YAML parsing failed%s, trying simple key-value extraction. Error: %s",
file_info,
str(exc),
)
metadata = _parse_simple_front_matter(metadata_text, path)
body = "\n".join(lines[closing_index + 1 :])
return metadata, body
def _parse_simple_front_matter(text: str, path: Path | None = None) -> dict[str, Any]:
"""Parse front matter using simple key-value regex extraction.
This is a fallback for when YAML parsing fails due to syntax issues.
It handles simple cases like:
key: value
key: value with spaces
key: >
multi-line value
Only extracts the first value for each key (ignores continuation lines
and complex YAML features). This is intentionally simple and robust.
Args:
text: The front matter text (between --- markers)
path: Optional path for logging
Returns:
Dictionary of extracted key-value pairs
"""
import re
metadata: dict[str, Any] = {}
file_info = f" in {path}" if path else ""
# Match lines like "key: value" or "key:value"
# Captures key and everything after the colon as value
pattern = re.compile(r"^([a-zA-Z_][a-zA-Z0-9_-]*)\s*:\s*(.*)$")
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
match = pattern.match(line)
if match:
key, value = match.groups()
# Clean up common issues in values
value = value.strip()
# Skip YAML multi-line indicators (>, |) - we only get first line anyway
if value in (">", "|", ">-", "|-"):
# For multi-line values, we'd need to continue reading,
# but for simplicity just skip them (use filename defaults)
continue
# Remove surrounding quotes if present
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
elif value.startswith("'") and value.endswith("'"):
value = value[1:-1]
# Unescape common escape sequences
value = value.replace("\\n", "\n").replace("\\t", "\t").replace('\\"', '"')
# Only store first occurrence of each key
if key not in metadata:
metadata[key] = value
if metadata:
logger.info("Extracted %d fields using simple parser%s: %s", len(metadata), file_info, list(metadata.keys()))
else:
logger.warning("Simple parser found no fields%s", file_info)
return metadata
def _load_bridge_config(config_path: Path | None) -> BridgeConfig:
base = {
"runner_priority": list(DEFAULT_RUNNER_PRIORITY),
"runners": DEFAULT_RUNNERS,
"agents": {},
}
if config_path:
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
override = _load_config_file(config_path)
base = _deep_merge_dicts(base, override)
default_runner = base.get("default_runner")
priority_raw = base.get("runner_priority") or DEFAULT_RUNNER_PRIORITY
runner_priority = tuple(str(item) for item in priority_raw)
runners_data = base.get("runners") or {}
if not isinstance(runners_data, dict):
runners_data = {}
runners = {name: _build_runner_config(name, data) for name, data in runners_data.items()}
overrides_data = base.get("agents") or {}
if not isinstance(overrides_data, dict):
overrides_data = {}
overrides = {
name: AgentOverride(
runner=str(data.get("runner")) if data.get("runner") else None,
model=str(data.get("model")) if data.get("model") else None,
)
for name, data in overrides_data.items()
if isinstance(data, dict)
}
return BridgeConfig(
default_runner=str(default_runner) if default_runner else None,
runner_priority=runner_priority,
runners=runners,
overrides=overrides,
)
def _build_runner_config(name: str, data: dict[str, Any]) -> RunnerConfig:
if not isinstance(data, dict):
raise ValueError(f"Runner '{name}' configuration must be a mapping")
command = data.get("command")
if not isinstance(command, (list, tuple)) or not command:
raise ValueError(f"Runner '{name}' must define a non-empty list `command`")
output_format_str = data.get("output_format", "raw")
try:
output_format = OutputFormat(output_format_str)
except ValueError:
raise ValueError(
f"Runner '{name}' has invalid output_format '{output_format_str}'. "
f"Must be one of: {', '.join(f.value for f in OutputFormat)}"
) from None
env_raw = data.get("env") or {}
if not isinstance(env_raw, dict):
raise ValueError(f"Runner '{name}' env must be a mapping")
model_map_raw = data.get("model_map") or {}
if not isinstance(model_map_raw, dict):
raise ValueError(f"Runner '{name}' model_map must be a mapping")
def _maybe_str(value: Any) -> str | None:
return None if value is None else str(value)
return RunnerConfig(
command=tuple(str(part) for part in command),
output_format=output_format,
env={str(k): str(v) for k, v in env_raw.items()},
model_flag=_maybe_str(data.get("model_flag")),
model_map={str(k): _maybe_str(v) for k, v in model_map_raw.items()},
prompt_prefix=_maybe_str(data.get("prompt_prefix")),
prompt_suffix=_maybe_str(data.get("prompt_suffix")),
)
def _load_config_file(config_path: Path) -> dict[str, Any]:
suffix = config_path.suffix.lower()
text = config_path.read_text(encoding="utf-8")
if suffix in {".yaml", ".yml", ""}:
data = yaml.safe_load(text) or {}
elif suffix == ".json":
data = json.loads(text)
else:
raise ValueError(f"Unsupported config format: {config_path}")
if not isinstance(data, dict):
raise ValueError("Config file must contain a mapping at the top level")
return data
def _deep_merge_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
result = dict(base)
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = _deep_merge_dicts(result[key], value)
else:
result[key] = value
return result
# ---------------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------------
def _override_to_dict(override: AgentOverride | None) -> dict[str, Any] | None:
if not override:
return None
return {"runner": override.runner, "model": override.model}
# Module-level mcp instance - DO NOT create it here as it would execute on import!
# The __init__.py lazy loader or cli.py will create it when needed.
# This variable is set by __init__.py's __getattr__ for backward compatibility.
_mcp_instance: FastMCP | None = None
def __getattr__(name: str):
"""Lazy module attribute access for mcp instance.
This allows importing create_mcp_server without triggering mcp creation,
while still providing `from polyagent_mcp.server import mcp` for fastmcp CLI.
"""
if name == "mcp":
global _mcp_instance
if _mcp_instance is None:
_mcp_instance = create_mcp_server()
return _mcp_instance
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
if __name__ == "__main__":
transport_str = os.getenv(f"{ENV_PREFIX}TRANSPORT", "stdio")
transport = cast(Literal["stdio", "http", "sse", "streamable-http"], transport_str)
kwargs: dict[str, Any] = {}
host = os.getenv(f"{ENV_PREFIX}HOST")
port = os.getenv(f"{ENV_PREFIX}PORT")
if host:
kwargs["host"] = host
if port:
kwargs["port"] = int(port)
mcp.run(transport=transport, **kwargs)