Skip to main content
Glama
agent_discovery.py3.45 kB
# agent_discovery.py import asyncio import aiohttp from typing import List, Dict, Any from fastmcp import FastMCP from pydantic import BaseModel class ACPAgent(BaseModel): name: str description: str metadata: Dict[str, Any] = {} class AgentDiscoveryTool: def __init__(self, acp_base_url: str = "http://localhost:8000"): self.acp_base_url = acp_base_url self.discovered_agents: Dict[str, ACPAgent] = {} async def discover_agents(self) -> List[ACPAgent]: """Discover all available ACP agents""" async with aiohttp.ClientSession() as session: try: async with session.get(f"{self.acp_base_url}/agents") as response: if response.status == 200: data = await response.json() agents = [ACPAgent(**agent) for agent in data.get("agents", [])] # Update discovered agents cache for agent in agents: self.discovered_agents[agent.name] = agent return agents else: print(f"Failed to discover agents: {response.status}") return [] except Exception as e: print(f"Error discovering agents: {e}") return [] def get_mcp_resource_uri(self, agent_name: str) -> str: """Generate MCP resource URI for an ACP agent""" return f"acp://agents/{agent_name}" async def get_agent_capabilities(self, agent_name: str) -> Dict[str, Any]: """Get detailed capabilities of a specific agent""" # This could be extended to call a capabilities endpoint # For now, return basic info from discovery agent = self.discovered_agents.get(agent_name) if agent: return { "name": agent.name, "description": agent.description, "metadata": agent.metadata, "supports_streaming": True, # ACP supports streaming "supports_multimodal": True, # ACP supports multi-modal "interaction_modes": ["sync", "async", "stream"] } return {} # Integration with FastMCP def register_discovery_tools(mcp: FastMCP, discovery: AgentDiscoveryTool): @mcp.tool() async def discover_acp_agents() -> str: """Discover all available ACP agents and register them as resources""" agents = await discovery.discover_agents() result = { "discovered_count": len(agents), "agents": [] } for agent in agents: agent_info = { "name": agent.name, "description": agent.description, "resource_uri": discovery.get_mcp_resource_uri(agent.name), "capabilities": await discovery.get_agent_capabilities(agent.name) } result["agents"].append(agent_info) return str(result) @mcp.tool() async def get_agent_info(agent_name: str) -> str: """Get detailed information about a specific ACP agent""" capabilities = await discovery.get_agent_capabilities(agent_name) if capabilities: return str(capabilities) else: return f"Agent '{agent_name}' not found"

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/ACP-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server