Skip to main content
Glama
server.py11.6 kB
#!/usr/bin/env python3 """ A2A MCP Server - MCP server for A2A integration Provides tools for A2A agent registry and communication Tools: - a2a_server_registry: Add or remove A2A server URLs - list_agents: List available agents with their agent cards - call_agent: Call an agent with a prompt """ import asyncio import logging import json import os import sys import uuid from typing import Dict, List, Optional, Union, Literal, Any # Set up logging to stderr with more detailed format logging.basicConfig( level=logging.DEBUG, # Set to DEBUG for more detailed logs format="%(asctime)s [%(levelname)s] %(name)s:%(lineno)d - %(message)s", stream=sys.stderr ) logger = logging.getLogger(__name__) # MCP imports from mcp.server.fastmcp import FastMCP # A2A imports from a2a_min import A2aMinClient from a2a_min.base.client.card_resolver import A2ACardResolver from a2a_min.base.types import AgentCard, Message, TextPart, TaskSendParams # Create a class to store persistent state class ServerState: def __init__(self): self.registry = {} # name -> url self.cache = {} # name -> AgentCard # Create a single instance to be used throughout the app state = ServerState() async def fetch_agent_card(url: str) -> Optional[AgentCard]: """Fetch agent card from an A2A server using A2ACardResolver.""" try: logger.debug(f"Fetching agent card from {url}") # Use the A2ACardResolver from a2a_min to get the agent card card_resolver = A2ACardResolver(url) try: card = card_resolver.get_agent_card() logger.debug(f"Received agent card: {card}") return card except Exception as e: logger.error(f"Failed to fetch agent card: {str(e)}") return None except Exception as e: logger.exception(f"Error fetching agent card from {url}: {e}") return None async def update_agent_cache() -> None: """Update the cache of agent cards.""" logger.info("Updating agent cache...") new_cache = {} for name, url in state.registry.items(): card = await fetch_agent_card(url) if card: new_cache[name] = card logger.info(f"Added agent {name} to cache: {card.name}") # Update the state cache state.cache = new_cache logger.info(f"Agent cache updated with {len(state.cache)} agents") # Create the FastMCP server mcp = FastMCP("A2A MCP Server") # Register tools @mcp.tool() async def a2a_server_registry(action: Literal["add", "remove"], name: str, url: Optional[str] = None) -> Dict[str, Any]: """ Add or remove an A2A server URL. Args: action: Either "add" or "remove" name: Name of the server url: URL of the server (required for "add" action) Returns: Dict with status and message """ try: logger.debug(f"a2a_server_registry called with action={action}, name={name}, url={url}") logger.debug(f"Registry before: {state.registry}") if action == "add": if not url: logger.warning("URL is required for add action") return {"status": "error", "message": "URL is required for add action"} # Add to registry state.registry[name] = url logger.info(f"Added A2A server: {name} -> {url}") logger.debug(f"Registry after: {state.registry}") # Update the cache asynchronously asyncio.create_task(update_agent_cache()) return { "status": "success", "message": f"Added A2A server: {name}", "registry": state.registry } elif action == "remove": if name in state.registry: # Remove from registry del state.registry[name] # Remove from cache if present if name in state.cache: del state.cache[name] logger.info(f"Removed A2A server: {name}") logger.debug(f"Registry after: {state.registry}") return { "status": "success", "message": f"Removed A2A server: {name}", "registry": state.registry } else: logger.warning(f"Server {name} not found in registry") return { "status": "error", "message": f"Server {name} not found in registry" } logger.error(f"Invalid action: {action}") return {"status": "error", "message": "Invalid action. Use 'add' or 'remove'"} except Exception as e: logger.exception(f"Error in a2a_server_registry: {str(e)}") return {"status": "error", "message": f"Error: {str(e)}"} @mcp.tool() async def list_agents() -> Dict[str, Any]: """ List available agents with their agent cards. Returns: Dict with agents and their cards """ try: logger.debug("list_agents called") logger.debug(f"Current registry: {state.registry}") # Use the A2ACardResolver to fetch agent cards agents = {} for name, url in state.registry.items(): try: logger.debug(f"Fetching card for {name} from {url}") card = await fetch_agent_card(url) if card: # Store the card data in a format suitable for JSON response agents[name] = card.model_dump() # Update the cache with the fetched card state.cache[name] = card else: logger.error(f"Failed to fetch agent card for {name}") except Exception as e: logger.exception(f"Error fetching card for {name}: {e}") logger.info(f"Listing {len(agents)} agents") return { "agent_count": len(agents), "agents": agents } except Exception as e: logger.exception(f"Error in list_agents: {str(e)}") return {"status": "error", "message": f"Error: {str(e)}"} @mcp.tool() async def call_agent(agent_name: str, prompt: str) -> Dict[str, Any]: """ Call an agent with a prompt. Args: agent_name: Name of the agent to call prompt: Prompt to send to the agent Returns: Dict with response from the agent """ try: logger.debug(f"call_agent called with agent_name={agent_name}, prompt={prompt[:50]}...") logger.debug(f"Current registry: {state.registry}") # Verify the agent exists if agent_name not in state.registry: logger.warning(f"Agent '{agent_name}' not found in registry") return { "status": "error", "message": f"Agent '{agent_name}' not found in registry" } # Get the URL for the agent url = state.registry[agent_name] logger.debug(f"Using URL: {url}") try: # Get or create agent card if needed if agent_name not in state.cache: logger.debug(f"Agent card not in cache, fetching for {agent_name}") card = await fetch_agent_card(url) if card: state.cache[agent_name] = card else: logger.error(f"Failed to fetch agent card for {agent_name}") return { "status": "error", "message": f"Failed to fetch agent card for {agent_name}" } # Create a client using A2aMinClient try: # Connect to the A2A server logger.debug(f"Connecting to A2A server at {url}") client = A2aMinClient.connect(url) # Prepare message and send logger.debug(f"Sending message to agent {agent_name}: {prompt[:50]}...") task_id = str(uuid.uuid4()) session_id = str(uuid.uuid4()) # Send the message and get response task = await client.send_message( message=prompt, task_id=task_id, session_id=session_id ) # Process and return the response logger.debug(f"Received task response: {task}") response_data = { "status": "success", "task_id": task.id, "state": task.status.state if hasattr(task, 'status') else "unknown", "artifacts": [] } # Extract artifacts artifacts = task.artifacts if hasattr(task, 'artifacts') else [] if artifacts: logger.debug(f"Received {len(artifacts)} artifacts") for artifact in artifacts: artifact_data = { "name": artifact.name, "content": [] } for part in artifact.parts: part_type = part.type if part_type == "text": artifact_data["content"].append({ "type": "text", "text": part.text }) elif part_type == "file": file_data = part.file artifact_data["content"].append({ "type": "file", "name": file_data.name, "mime_type": file_data.mimeType }) elif part_type == "data": artifact_data["content"].append({ "type": "data", "data": part.data }) response_data["artifacts"].append(artifact_data) logger.info(f"Successfully called agent {agent_name}") return response_data except Exception as e: logger.exception(f"Error using A2aMinClient: {e}") return { "status": "error", "message": f"Error calling agent with A2aMinClient: {str(e)}" } except Exception as e: logger.exception(f"Error calling agent {agent_name}: {e}") return { "status": "error", "message": f"Error calling agent: {str(e)}" } except Exception as e: logger.exception(f"Error in call_agent: {str(e)}") return {"status": "error", "message": f"Error: {str(e)}"} def main(): """Run the MCP server.""" return mcp def main_cli(): """CLI entry point for running the server.""" mcp.run() if __name__ == "__main__": main_cli()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/regismesquita/MCP_A2A'

If you have feedback or need assistance with the MCP directory API, please join our Discord server