Skip to main content
Glama

Polyagent MCP

by execveat
server.py41.8 kB
"""FastMCP bridge that exposes Claude-style agent definitions to any MCP client. The bridge reads `.claude/agents/*.md` files (or another directory, if configured) and registers each agent as a FastMCP tool plus a mirrored `agent://` resource. When a tool is invoked the bridge assembles the agent instructions, spawns the requested CLI runner (Codex, Claude, Gemini, etc.), and streams structured logs back to the caller for easy debugging. Quick start (from the repo root): ```bash uv sync # install project dependencies once uv run fastmcp run polyagent_mcp.py:mcp # stdio transport by default ``` Add the same command to your LLM CLI of choice: * **Codex CLI** – `codex mcp add agents -- uv run fastmcp run polyagent_mcp.py:mcp` * **Gemini CLI** – `gemini mcp add agents -- uv run fastmcp run polyagent_mcp.py:mcp` Both CLIs understand stdio transports and will stream the bridge's structured logs, manifest resources, and tool outputs. ### Configuration `BridgeSettings` is a `BaseSettings` model, so environment variables are parsed and type-coerced automatically: * `POLYAGENT_MCP_DIR` – directory with Markdown agent files (default: `.claude/agents`). * `POLYAGENT_MCP_CONFIG` – optional YAML/JSON file that extends runner/agent config. * `POLYAGENT_MCP_WORKSPACE` – working directory used when launching runners. * `POLYAGENT_MCP_TIMEOUT` – default timeout (seconds) for runner execution. * `POLYAGENT_MCP_LOG_LEVEL` – Python log level (also forwarded to MCP clients). * `POLYAGENT_MCP_DEBUG` – enable FastMCP debug mode when set to truthy value. * `POLYAGENT_MCP_SERVER_NAME` / `POLYAGENT_MCP_INSTRUCTIONS` – MCP metadata overrides. * `POLYAGENT_MCP_TRANSPORT`, `POLYAGENT_MCP_HOST`, `POLYAGENT_MCP_PORT` – only needed when running this module directly instead of via `fastmcp run`. Expose diagnostics through the built-in resources: * `resource://agents/manifest` – discovered agents with runner availability. * `resource://agents/config` – effective config plus the optional override file. * `agent://<name>` – raw Markdown instructions for each agent. The rest of the module keeps to the KISS principle: minimal state, convenient logging, and a factory function (`create_mcp_server`) that the FastMCP CLI can import without extra glue code. """ from __future__ import annotations import asyncio import json import os import shutil from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Literal, cast import fastmcp import yaml from fastmcp import Context, FastMCP from fastmcp.utilities.logging import configure_logging, get_logger from pydantic import AliasChoices, BaseModel, Field, ValidationError, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict DEFAULT_SERVER_NAME = "agents-mcp" DEFAULT_INSTRUCTIONS = ( "You are connected to Agents MCP, which exposes the project's specialist agents " "as standalone MCP tools. This server mirrors the workflow described in CLAUDE.md/AGENTS.md " "so clients without native agent support (Codex CLI, Gemini CLI, etc.) can delegate work to " "dedicated sub-agents while keeping the main conversation concise.\n\n" "Discovery:\n" "- Call `list_tools` or read `resource://agents/manifest` to see the available agents and the CLI runners they can use.\n" "- Read `agent://<name>` to review an agent's full instructions before delegating.\n\n" "Execution:\n" "- Invoke the tool named after the agent (e.g. `commit-agent`) and provide at least a `task` string describing the desired outcome.\n" "- Optional arguments: `context` for extra details, `runner` to force a specific CLI (codex/claude/gemini/custom), `model` for runner-specific model aliases, and `timeout_seconds` to extend long operations.\n" "- When no runner is supplied the server picks the first available option in its priority list.\n\n" "Why delegate:\n" "- Agents operate with focused prompts, reducing the primary conversation's token footprint and improving adherence to specialized procedures.\n" "- Use them whenever a self-contained specialist can produce higher quality output, perform automated checks, or run local tooling on your behalf.\n" "- Inspect MCP log messages to monitor progress; stdout/stderr from each run is streamed back for debugging." ) DEFAULT_TIMEOUT_SECONDS = 900 DEFAULT_AGENT_GLOBS = ("*.md", "*.markdown") DEFAULT_RUNNER_PRIORITY = ("codex", "claude", "gemini") ENV_PREFIX = "POLYAGENT_MCP_" logger = get_logger(__name__) # --------------------------------------------------------------------------- # Output Format System # --------------------------------------------------------------------------- class OutputFormat(str, Enum): """Supported output formats for agent runners.""" CODEX_JSONL = "codex_jsonl" GEMINI_JSON = "gemini_json" RAW = "raw" # Codex JSONL Models class CodexTokenUsage(BaseModel): """Token usage statistics from a Codex turn.""" input_tokens: int cached_input_tokens: int = 0 output_tokens: int class CodexItem(BaseModel): """An item in a Codex thread (message, reasoning, command, etc.).""" id: str type: str # Codex uses "type" not "item_type" at the item level text: str | None = None command: str | None = None aggregated_output: str | None = None exit_code: int | None = None status: str | None = None class CodexEvent(BaseModel): """A single event in Codex JSONL output.""" type: str thread_id: str | None = None item: CodexItem | None = None usage: CodexTokenUsage | None = None error: str | None = None # Gemini JSON Models class GeminiApiStats(BaseModel): """API request statistics from Gemini.""" total_requests: int = Field(default=0, alias="totalRequests") total_errors: int = Field(default=0, alias="totalErrors") total_latency_ms: int = Field(default=0, alias="totalLatencyMs") model_config = {"populate_by_name": True} class GeminiTokenStats(BaseModel): """Token usage statistics from Gemini.""" prompt: int = 0 candidates: int = 0 total: int = 0 cached: int = 0 thoughts: int = 0 tool: int = 0 class GeminiModelStats(BaseModel): """Per-model statistics from Gemini.""" api: GeminiApiStats tokens: GeminiTokenStats class GeminiToolDecisions(BaseModel): """Tool approval decisions from Gemini.""" accept: int = 0 reject: int = 0 modify: int = 0 auto_accept: int = 0 class GeminiToolStats(BaseModel): """Per-tool statistics from Gemini.""" count: int = 0 success: int = 0 fail: int = 0 duration_ms: int = Field(default=0, alias="durationMs") decisions: GeminiToolDecisions model_config = {"populate_by_name": True} class GeminiStats(BaseModel): """Complete statistics from Gemini CLI output.""" models: dict[str, GeminiModelStats] = Field(default_factory=dict) tools: dict[str, Any] = Field(default_factory=dict) # Partial, includes byName dict files: dict[str, int] = Field(default_factory=dict) class GeminiOutput(BaseModel): """Complete Gemini CLI JSON output.""" response: str stats: GeminiStats | None = None # Parser Functions def parse_codex_jsonl(stdout: str) -> str: """Extract assistant messages from codex JSONL output. Codex produces JSON Lines where each line is an event. We only care about 'item.completed' events where item_type is 'assistant_message', and we extract the text field from those items. Args: stdout: Raw stdout from codex CLI with --json flag Returns: Combined text from all assistant messages Raises: ValueError: If no valid assistant messages found or output is malformed """ if not stdout.strip(): raise ValueError("Codex output is empty") messages: list[str] = [] for line_num, line in enumerate(stdout.splitlines(), start=1): line = line.strip() if not line: continue try: event = CodexEvent.model_validate_json(line) except ValidationError as exc: # Provide detailed context for debugging errors_detail = "; ".join( f"{'.'.join(str(loc) for loc in err['loc'])}: {err['msg']}" for err in exc.errors()[:3] # Show first 3 errors ) raise ValueError( f"Codex validation failed on line {line_num}. " f"Errors: {errors_detail}. " f"Raw JSON: {line[:500]}" ) from exc except json.JSONDecodeError as exc: raise ValueError( f"Invalid JSON on line {line_num} at position {exc.pos}: {exc.msg}. " f"Raw content: {line[:500]}" ) from exc # Only process completed items with assistant/agent messages # Note: Codex uses both "assistant_message" and "agent_message" if ( event.type == "item.completed" and event.item and event.item.type in ("assistant_message", "agent_message") and event.item.text ): messages.append(event.item.text.strip()) if not messages: raise ValueError( f"No assistant messages found in codex JSONL output. " f"Processed {line_num} lines. " f"First line sample: {stdout.splitlines()[0][:200] if stdout.splitlines() else 'N/A'}" ) return "\n\n".join(messages) def parse_gemini_json(stdout: str) -> str: """Extract response field from gemini JSON output. Gemini with --output-format json produces a single JSON object with a 'response' field containing the actual agent output. Other fields like 'stats' are ignored to minimize context. Args: stdout: Raw stdout from gemini CLI with --output-format json Returns: Content of the 'response' field Raises: ValueError: If JSON is malformed or response field is missing/empty """ if not stdout.strip(): raise ValueError("Gemini output is empty") try: output = GeminiOutput.model_validate_json(stdout) except ValidationError as exc: # Provide detailed context for debugging errors_detail = "; ".join( f"{'.'.join(str(loc) for loc in err['loc'])}: {err['msg']}" for err in exc.errors()[:3] # Show first 3 errors ) raise ValueError( f"Gemini validation failed. " f"Errors: {errors_detail}. " f"Raw JSON: {stdout[:500]}" ) from exc except json.JSONDecodeError as exc: raise ValueError( f"Invalid JSON in gemini output at position {exc.pos}: {exc.msg}. " f"Raw content: {stdout[:500]}" ) from exc if not output.response.strip(): raise ValueError( f"Gemini 'response' field is empty. " f"Full output keys: {', '.join(stdout[:200])}" ) return output.response.strip() def parse_raw_output(stdout: str, stderr: str) -> str: """Fallback parser for runners without structured output. Returns stdout if non-empty, otherwise stderr. This is used for runners that don't support JSON output formats (e.g., claude CLI, custom tools). Args: stdout: Raw stdout from runner stderr: Raw stderr from runner Returns: stdout if non-empty, otherwise stderr Raises: ValueError: If both stdout and stderr are empty """ if stdout.strip(): return stdout.strip() if stderr.strip(): return stderr.strip() raise ValueError("Runner produced no output on stdout or stderr") # --------------------------------------------------------------------------- # Configuration & State # --------------------------------------------------------------------------- DEFAULT_RUNNERS: dict[str, dict[str, Any]] = { "codex": { "command": ["codex", "exec", "--full-auto", "--json"], "output_format": "codex_jsonl", "model_flag": "--model", "model_map": {"sonnet": "gpt-5-codex"}, }, "claude": { "command": ["claude", "--print"], "output_format": "raw", "model_flag": "--model", "model_map": {"sonnet": "sonnet"}, }, "gemini": { "command": ["gemini", "--output-format", "json"], "output_format": "gemini_json", "model_flag": "-m", "model_map": {"sonnet": "gemini-2.5-pro"}, }, } def _default_agents_dir() -> Path: """Find agents directory by walking up from CWD to filesystem root. Searches for .claude/agents/ or agents/ in the current directory and all parent directories, similar to how git finds .git/. This allows the tool to work when invoked from project subdirectories. """ current = Path.cwd() for parent in [current, *current.parents]: for candidate in (parent / ".claude/agents", parent / "agents"): if candidate.exists() and candidate.is_dir(): return candidate raise RuntimeError( "Unable to locate agents directory. Searched for '.claude/agents' or 'agents' " f"in {current} and all parent directories. Set POLYAGENT_MCP_DIR explicitly " "or create one of these directories with agent definitions." ) class BridgeSettings(BaseSettings): """Typed configuration with automatic environment loading.""" model_config = SettingsConfigDict(case_sensitive=False) agents_dir: Path = Field( default_factory=_default_agents_dir, validation_alias=AliasChoices("agents_dir", f"{ENV_PREFIX}DIR"), ) config_path: Path | None = Field( default=None, validation_alias=AliasChoices("config_path", f"{ENV_PREFIX}CONFIG"), ) workspace: Path = Field( default_factory=Path.cwd, validation_alias=AliasChoices("workspace", f"{ENV_PREFIX}WORKSPACE"), ) timeout: int = Field( default=DEFAULT_TIMEOUT_SECONDS, validation_alias=AliasChoices("timeout", f"{ENV_PREFIX}TIMEOUT"), ) log_level: str = Field( default="INFO", validation_alias=AliasChoices("log_level", f"{ENV_PREFIX}LOG_LEVEL"), ) debug: bool = Field( default=False, validation_alias=AliasChoices("debug", f"{ENV_PREFIX}DEBUG"), ) server_name: str = Field( default=DEFAULT_SERVER_NAME, validation_alias=AliasChoices("server_name", f"{ENV_PREFIX}SERVER_NAME"), ) server_instructions: str = Field( default=DEFAULT_INSTRUCTIONS, validation_alias=AliasChoices("server_instructions", f"{ENV_PREFIX}INSTRUCTIONS"), ) @field_validator("agents_dir") @classmethod def _validate_agents_dir(cls, value: Path) -> Path: if not value.exists(): raise ValueError(f"Agents directory not found: {value}") return value @field_validator("log_level") @classmethod def _normalise_level(cls, value: str) -> str: return value.upper() @dataclass(frozen=True) class AgentDefinition: name: str description: str instructions: str model_hint: str | None metadata: dict[str, Any] path: Path @dataclass(frozen=True) class AgentOverride: runner: str | None = None model: str | None = None @dataclass class RunnerConfig: command: tuple[str, ...] output_format: OutputFormat env: dict[str, str] = field(default_factory=dict) model_flag: str | None = None model_map: dict[str, str | None] = field(default_factory=dict) prompt_prefix: str | None = None prompt_suffix: str | None = None def available(self) -> bool: return bool(self.command) and shutil.which(self.command[0]) is not None def build_command(self, prompt: str, model_alias: str | None) -> list[str]: args = list(self.command) mapped = self.map_model(model_alias) if self.model_flag and mapped: args.extend([self.model_flag, mapped]) args.append(prompt) return args def map_model(self, alias: str | None) -> str | None: if alias is None: return None if not self.model_map: return alias return self.model_map.get(alias, alias) @dataclass(frozen=True) class BridgeConfig: default_runner: str | None runner_priority: tuple[str, ...] runners: dict[str, RunnerConfig] overrides: dict[str, AgentOverride] @property def available_runners(self) -> dict[str, RunnerConfig]: return {name: cfg for name, cfg in self.runners.items() if cfg.available()} @dataclass class BridgeState: settings: BridgeSettings config: BridgeConfig agents: list[AgentDefinition] @property def manifest(self) -> dict[str, Any]: return { "agents": [ { "name": agent.name, "description": agent.description, "model": agent.model_hint, "path": str(agent.path), "metadata": agent.metadata, "override": _override_to_dict(self.config.overrides.get(agent.name)), } for agent in self.agents ], "runners": { name: { "command": list(cfg.command), "output_format": cfg.output_format.value, "available": cfg.available(), } for name, cfg in self.config.runners.items() }, "runner_priority": list(self.config.runner_priority), "default_runner": self.config.default_runner, "workspace": str(self.settings.workspace), "timeout": self.settings.timeout, } # --------------------------------------------------------------------------- # Bootstrap # --------------------------------------------------------------------------- def create_mcp_server() -> FastMCP: """Factory function used by fastmcp CLI and script entry point.""" settings = BridgeSettings() effective_level = "DEBUG" if settings.debug else settings.log_level configure_logging(cast(Any, effective_level)) logger.info( "Initialising agents bridge", extra=settings.model_dump(mode="json"), ) mcp = FastMCP(name=settings.server_name, instructions=settings.server_instructions) if settings.debug: fastmcp.settings.debug = True config = _load_bridge_config(settings.config_path) agents = _discover_agents(settings.agents_dir) state = BridgeState(settings=settings, config=config, agents=agents) _register_manifest_resources(mcp, state) _register_agent_resources(mcp, state) _register_agent_tools(mcp, state) logger.info( "Loaded %s agents and %s runners", len(agents), len(config.available_runners), extra={ "agents": [agent.name for agent in agents], "available_runners": list(config.available_runners), }, ) return mcp # --------------------------------------------------------------------------- # Registration helpers # --------------------------------------------------------------------------- def _register_manifest_resources(mcp: FastMCP, state: BridgeState) -> None: @mcp.resource( "resource://agents/manifest", description="List of registered agents and runner availability", mime_type="application/json", ) def _manifest() -> dict[str, Any]: return state.manifest @mcp.resource( "resource://agents/config", description="Raw bridge configuration", mime_type="application/json", ) def _config_resource() -> dict[str, Any]: config_path = state.settings.config_path data = state.manifest.copy() if config_path and config_path.exists(): data["config_source"] = str(config_path) data["config_payload"] = _load_config_file(config_path) else: data["config_source"] = None return data def _register_agent_resources(mcp: FastMCP, state: BridgeState) -> None: for agent in state.agents: _register_single_agent_resource(mcp, agent) def _register_single_agent_resource(mcp: FastMCP, agent: AgentDefinition) -> None: uri = f"agent://{agent.name}" @mcp.resource( uri, description=agent.description, mime_type="text/markdown", ) def _read_agent() -> str: return agent.instructions def _register_agent_tools(mcp: FastMCP, state: BridgeState) -> None: for agent in state.agents: _register_single_agent_tool(mcp, state, agent) def _register_single_agent_tool(mcp: FastMCP, state: BridgeState, agent: AgentDefinition) -> None: @mcp.tool(name=agent.name, description=agent.description) async def _run_agent( # type: ignore[no-redef] task: str, ctx: Context, context: str | None = None, runner: str | None = None, model: str | None = None, timeout_seconds: int | None = None, ) -> dict[str, Any]: return await _execute_agent( state=state, agent=agent, task=task, context_block=context, requested_runner=runner, requested_model=model, timeout_override=timeout_seconds, ctx=ctx, ) # --------------------------------------------------------------------------- # Agent Execution # --------------------------------------------------------------------------- async def _execute_agent( *, state: BridgeState, agent: AgentDefinition, task: str, context_block: str | None, requested_runner: str | None, requested_model: str | None, timeout_override: int | None, ctx: Context, ) -> dict[str, Any]: """Execute an agent task using the configured runner. Returns a minimal response optimized for context management: - output: The agent's response text - success: Whether the execution succeeded - error: Error message if failed (only present on failure) """ if not task or not task.strip(): raise ValueError("Agent tool requires a non-empty `task` argument") runner_name, runner_cfg = _select_runner(state, agent, requested_runner) model_alias = _select_model(state, agent, requested_model) prompt = _compose_prompt(agent, runner_cfg, task, context_block) command = runner_cfg.build_command(prompt, model_alias) env = os.environ.copy() env.update(runner_cfg.env) timeout = timeout_override or state.settings.timeout payload = { "agent": agent.name, "runner": runner_name, "model": model_alias, "timeout": timeout, "workspace": str(state.settings.workspace), "request_id": str(ctx.request_id), } await ctx.info(f"Running agent '{agent.name}'", extra=payload) logger.debug("Launching runner", extra=payload) result = await _spawn_runner( runner=runner_name, command=command, env=env, cwd=state.settings.workspace, timeout=timeout, ) # Parse output based on runner format try: output = _parse_runner_output(runner_cfg, result.stdout, result.stderr) success = True error = None debug_context = None except ValueError as exc: output = None success = False error = str(exc) # Provide comprehensive debug context with FULL output for debugging debug_context = { "runner": runner_name, "format": runner_cfg.output_format.value, "exit_code": result.exit_code, "stdout": result.stdout, # Full stdout - needed for debugging "stderr": result.stderr, # Full stderr - needed for debugging } await _log_result(ctx, agent.name, runner_name, result, success, error) # Return minimal response for context optimization response: dict[str, Any] = { "success": success, } if success: response["output"] = output else: response["error"] = error # Include comprehensive debug context for failures if debug_context: response["debug"] = debug_context return response def _parse_runner_output(runner_cfg: RunnerConfig, stdout: str, stderr: str) -> str: """Parse runner output based on configured format. Args: runner_cfg: Runner configuration with output_format stdout: Raw stdout from runner stderr: Raw stderr from runner Returns: Parsed agent output text Raises: ValueError: If output cannot be parsed according to expected format """ if runner_cfg.output_format == OutputFormat.CODEX_JSONL: return parse_codex_jsonl(stdout) elif runner_cfg.output_format == OutputFormat.GEMINI_JSON: return parse_gemini_json(stdout) elif runner_cfg.output_format == OutputFormat.RAW: return parse_raw_output(stdout, stderr) else: raise ValueError(f"Unsupported output format: {runner_cfg.output_format}") def _select_runner( state: BridgeState, agent: AgentDefinition, requested: str | None, ) -> tuple[str, RunnerConfig]: candidates: list[str | None] = [requested] override = state.config.overrides.get(agent.name) if override and override.runner: candidates.append(override.runner) meta_runner = agent.metadata.get("bridge_runner") or agent.metadata.get("runner") candidates.append(meta_runner if isinstance(meta_runner, str) else None) candidates.append(state.config.default_runner) candidates.extend(state.config.runner_priority) candidates.extend(state.config.available_runners) seen: set[str] = set() for candidate in filter(None, candidates): if candidate in seen: continue seen.add(candidate) runner_cfg = state.config.runners.get(candidate) if runner_cfg and runner_cfg.available(): return candidate, runner_cfg raise RuntimeError( "No runnable CLI found for agent. Install codex/claude/gemini or configure a custom runner." ) def _select_model( state: BridgeState, agent: AgentDefinition, requested: str | None, ) -> str | None: override = state.config.overrides.get(agent.name) if override and override.model: return override.model if requested: return requested return agent.model_hint def _compose_prompt( agent: AgentDefinition, runner_cfg: RunnerConfig, task: str, context_block: str | None, ) -> str: sections: list[str] = [] if runner_cfg.prompt_prefix: sections.append(runner_cfg.prompt_prefix.strip()) sections.append(agent.instructions.strip()) sections.append("\n## Task\n" + task.strip()) if context_block and context_block.strip(): sections.append("\n## Additional Context\n" + context_block.strip()) if runner_cfg.prompt_suffix: sections.append(runner_cfg.prompt_suffix.strip()) return "\n\n".join(part for part in sections if part) # --------------------------------------------------------------------------- # Subprocess Management # --------------------------------------------------------------------------- @dataclass class SubprocessResult: runner: str command: list[str] exit_code: int | None timed_out: bool stdout: str stderr: str spawn_error: str | None async def _spawn_runner( *, runner: str, command: list[str], env: dict[str, str], cwd: Path, timeout: int, ) -> SubprocessResult: try: process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(cwd), env=env, ) except FileNotFoundError as exc: logger.error("Runner %s executable not found", runner) return SubprocessResult( runner=runner, command=command, exit_code=None, timed_out=False, stdout="", stderr=str(exc), spawn_error=str(exc), ) except Exception as exc: # noqa: BLE001 logger.exception("Failed to spawn runner %s", runner) return SubprocessResult( runner=runner, command=command, exit_code=None, timed_out=False, stdout="", stderr=str(exc), spawn_error=str(exc), ) try: stdout_bytes, stderr_bytes = await asyncio.wait_for(process.communicate(), timeout=timeout) timed_out = False except TimeoutError: process.kill() await process.wait() stdout_bytes, stderr_bytes = b"", b"" timed_out = True stdout = stdout_bytes.decode("utf-8", errors="replace") stderr = stderr_bytes.decode("utf-8", errors="replace") exit_code = None if timed_out else process.returncode return SubprocessResult( runner=runner, command=command, exit_code=exit_code, timed_out=timed_out, stdout=stdout, stderr=stderr, spawn_error=None, ) async def _log_result( ctx: Context, agent_name: str, runner_name: str, result: SubprocessResult, success: bool, error: str | None, ) -> None: """Log execution result to MCP context.""" payload = { "agent": agent_name, "runner": runner_name, "exit_code": result.exit_code, "timed_out": result.timed_out, "success": success, } if result.spawn_error: payload["spawn_error"] = result.spawn_error await ctx.error(f"Runner '{runner_name}' failed to start", extra=payload) logger.error("Runner %s failed to start: %s", runner_name, result.spawn_error) return if result.timed_out: await ctx.error(f"Runner '{runner_name}' exceeded timeout", extra=payload) logger.error("Runner %s exceeded timeout", runner_name) elif not success: payload["error"] = error await ctx.warning(f"Runner '{runner_name}' output parsing failed", extra=payload) logger.warning("Runner %s output parsing failed: %s", runner_name, error) elif result.exit_code not in (None, 0): await ctx.warning( f"Runner '{runner_name}' exited with code {result.exit_code}", extra=payload, ) logger.warning("Runner %s exited with %s", runner_name, result.exit_code) else: await ctx.info(f"Runner '{runner_name}' completed successfully", extra=payload) logger.debug("Runner %s completed", runner_name) # --------------------------------------------------------------------------- # Discovery & configuration # --------------------------------------------------------------------------- def _discover_agents(agents_dir: Path) -> list[AgentDefinition]: definitions: list[AgentDefinition] = [] for pattern in DEFAULT_AGENT_GLOBS: for path in sorted(agents_dir.glob(pattern)): if path.is_file(): definitions.append(_parse_agent_file(path)) if not definitions: raise RuntimeError(f"No agent definitions discovered in {agents_dir}") return definitions def _parse_agent_file(path: Path) -> AgentDefinition: text = path.read_text(encoding="utf-8") metadata, body = _extract_front_matter(text, path) name = str(metadata.get("name") or path.stem) description = str(metadata.get("description") or f"Agent {name}") model_hint = metadata.get("model") instructions = body.strip() return AgentDefinition( name=name, description=description, instructions=instructions, model_hint=str(model_hint) if model_hint else None, metadata=metadata, path=path, ) def _extract_front_matter(text: str, path: Path | None = None) -> tuple[dict[str, Any], str]: """Extract YAML front matter from agent markdown files. Expects front matter in the format: --- key: value --- Uses a hybrid approach: 1. First tries standard YAML parsing (supports full YAML spec) 2. Falls back to simple key-value regex parsing for malformed YAML This handles cases where Claude Code generates files with unquoted multi-line values or other YAML syntax issues. Args: text: The full text content of the agent file path: Optional path to the agent file (for better error messages) Returns: Tuple of (metadata dict, body text) """ if not text.startswith("---"): return {}, text lines = text.splitlines() metadata_lines: list[str] = [] closing_index: int | None = None for idx, line in enumerate(lines[1:], start=1): if line.strip() == "---": closing_index = idx break metadata_lines.append(line) if closing_index is None: return {}, text metadata_text = "\n".join(metadata_lines) file_info = f" in {path}" if path else "" # Try standard YAML parsing first try: metadata = yaml.safe_load(metadata_text) or {} if not isinstance(metadata, dict): logger.warning( "Front matter is not a dictionary%s, ignoring it. Content: %s", file_info, metadata_text[:100], ) metadata = {} except yaml.YAMLError as exc: # Fall back to simple regex parsing for malformed YAML logger.info( "YAML parsing failed%s, trying simple key-value extraction. Error: %s", file_info, str(exc), ) metadata = _parse_simple_front_matter(metadata_text, path) body = "\n".join(lines[closing_index + 1 :]) return metadata, body def _parse_simple_front_matter(text: str, path: Path | None = None) -> dict[str, Any]: """Parse front matter using simple key-value regex extraction. This is a fallback for when YAML parsing fails due to syntax issues. It handles simple cases like: key: value key: value with spaces key: > multi-line value Only extracts the first value for each key (ignores continuation lines and complex YAML features). This is intentionally simple and robust. Args: text: The front matter text (between --- markers) path: Optional path for logging Returns: Dictionary of extracted key-value pairs """ import re metadata: dict[str, Any] = {} file_info = f" in {path}" if path else "" # Match lines like "key: value" or "key:value" # Captures key and everything after the colon as value pattern = re.compile(r"^([a-zA-Z_][a-zA-Z0-9_-]*)\s*:\s*(.*)$") for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue match = pattern.match(line) if match: key, value = match.groups() # Clean up common issues in values value = value.strip() # Skip YAML multi-line indicators (>, |) - we only get first line anyway if value in (">", "|", ">-", "|-"): # For multi-line values, we'd need to continue reading, # but for simplicity just skip them (use filename defaults) continue # Remove surrounding quotes if present if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] # Unescape common escape sequences value = value.replace("\\n", "\n").replace("\\t", "\t").replace('\\"', '"') # Only store first occurrence of each key if key not in metadata: metadata[key] = value if metadata: logger.info("Extracted %d fields using simple parser%s: %s", len(metadata), file_info, list(metadata.keys())) else: logger.warning("Simple parser found no fields%s", file_info) return metadata def _load_bridge_config(config_path: Path | None) -> BridgeConfig: base = { "runner_priority": list(DEFAULT_RUNNER_PRIORITY), "runners": DEFAULT_RUNNERS, "agents": {}, } if config_path: if not config_path.exists(): raise FileNotFoundError(f"Config file not found: {config_path}") override = _load_config_file(config_path) base = _deep_merge_dicts(base, override) default_runner = base.get("default_runner") priority_raw = base.get("runner_priority") or DEFAULT_RUNNER_PRIORITY runner_priority = tuple(str(item) for item in priority_raw) runners_data = base.get("runners") or {} if not isinstance(runners_data, dict): runners_data = {} runners = {name: _build_runner_config(name, data) for name, data in runners_data.items()} overrides_data = base.get("agents") or {} if not isinstance(overrides_data, dict): overrides_data = {} overrides = { name: AgentOverride( runner=str(data.get("runner")) if data.get("runner") else None, model=str(data.get("model")) if data.get("model") else None, ) for name, data in overrides_data.items() if isinstance(data, dict) } return BridgeConfig( default_runner=str(default_runner) if default_runner else None, runner_priority=runner_priority, runners=runners, overrides=overrides, ) def _build_runner_config(name: str, data: dict[str, Any]) -> RunnerConfig: if not isinstance(data, dict): raise ValueError(f"Runner '{name}' configuration must be a mapping") command = data.get("command") if not isinstance(command, (list, tuple)) or not command: raise ValueError(f"Runner '{name}' must define a non-empty list `command`") output_format_str = data.get("output_format", "raw") try: output_format = OutputFormat(output_format_str) except ValueError: raise ValueError( f"Runner '{name}' has invalid output_format '{output_format_str}'. " f"Must be one of: {', '.join(f.value for f in OutputFormat)}" ) from None env_raw = data.get("env") or {} if not isinstance(env_raw, dict): raise ValueError(f"Runner '{name}' env must be a mapping") model_map_raw = data.get("model_map") or {} if not isinstance(model_map_raw, dict): raise ValueError(f"Runner '{name}' model_map must be a mapping") def _maybe_str(value: Any) -> str | None: return None if value is None else str(value) return RunnerConfig( command=tuple(str(part) for part in command), output_format=output_format, env={str(k): str(v) for k, v in env_raw.items()}, model_flag=_maybe_str(data.get("model_flag")), model_map={str(k): _maybe_str(v) for k, v in model_map_raw.items()}, prompt_prefix=_maybe_str(data.get("prompt_prefix")), prompt_suffix=_maybe_str(data.get("prompt_suffix")), ) def _load_config_file(config_path: Path) -> dict[str, Any]: suffix = config_path.suffix.lower() text = config_path.read_text(encoding="utf-8") if suffix in {".yaml", ".yml", ""}: data = yaml.safe_load(text) or {} elif suffix == ".json": data = json.loads(text) else: raise ValueError(f"Unsupported config format: {config_path}") if not isinstance(data, dict): raise ValueError("Config file must contain a mapping at the top level") return data def _deep_merge_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: result = dict(base) for key, value in override.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = _deep_merge_dicts(result[key], value) else: result[key] = value return result # --------------------------------------------------------------------------- # Utilities # --------------------------------------------------------------------------- def _override_to_dict(override: AgentOverride | None) -> dict[str, Any] | None: if not override: return None return {"runner": override.runner, "model": override.model} # Module-level mcp instance - DO NOT create it here as it would execute on import! # The __init__.py lazy loader or cli.py will create it when needed. # This variable is set by __init__.py's __getattr__ for backward compatibility. _mcp_instance: FastMCP | None = None def __getattr__(name: str): """Lazy module attribute access for mcp instance. This allows importing create_mcp_server without triggering mcp creation, while still providing `from polyagent_mcp.server import mcp` for fastmcp CLI. """ if name == "mcp": global _mcp_instance if _mcp_instance is None: _mcp_instance = create_mcp_server() return _mcp_instance raise AttributeError(f"module {__name__!r} has no attribute {name!r}") if __name__ == "__main__": transport_str = os.getenv(f"{ENV_PREFIX}TRANSPORT", "stdio") transport = cast(Literal["stdio", "http", "sse", "streamable-http"], transport_str) kwargs: dict[str, Any] = {} host = os.getenv(f"{ENV_PREFIX}HOST") port = os.getenv(f"{ENV_PREFIX}PORT") if host: kwargs["host"] = host if port: kwargs["port"] = int(port) mcp.run(transport=transport, **kwargs)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/execveat/polyagent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server