Mesh Agent MCP Server

Official
by heurist-network
Verified
MIT License
9
  • Apple
  • Linux
""" Mesh Tool MCP Server - Connects to mesh API endpoints and provides tools for tool execution. """ import logging import os from typing import Any, Dict, List, Optional import aiohttp import anyio import click import colorlog import mcp.types as types import uvicorn from dotenv import load_dotenv from mcp.server.lowlevel import Server from mcp.server.sse import SseServerTransport from mcp.server.stdio import stdio_server from starlette.applications import Starlette from starlette.routing import Mount, Route # Load environment variables load_dotenv() # ===== Configuration ===== class Config: """Server configuration settings.""" # API endpoints and authentication HEURIST_API_KEY = os.environ.get("HEURIST_API_KEY") HEURIST_API_ENDPOINT = os.getenv( "MESH_API_ENDPOINT", "https://sequencer-v2.heurist.xyz" ) HEURIST_METADATA_ENDPOINT = os.getenv( "MESH_METADATA_ENDPOINT", "https://mesh.heurist.ai/mesh_agents_metadata.json" ) # Default supported agents DEFAULT_AGENTS = [ "CoinGeckoTokenInfoAgent", "DexScreenerTokenInfoAgent", "ElfaTwitterIntelligenceAgent", "ExaSearchAgent", "FirecrawlSearchAgent", "GoplusAnalysisAgent", ] # Logging LOG_LEVEL = logging.INFO LOG_FORMAT = "%(log_color)s%(levelname)s%(reset)s: %(message)s" LOGGER_NAME = "mesh-mcp-tools" @classmethod def setup_logger(cls): """Configure and return a logger with colored output.""" logger = colorlog.getLogger(cls.LOGGER_NAME) handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter(cls.LOG_FORMAT)) logger.handlers = [] logger.addHandler(handler) logger.setLevel(cls.LOG_LEVEL) return logger # Configure logger logger = Config.setup_logger() # ===== Custom Exceptions ===== class MeshApiError(Exception): """Raised when there's an error with the Mesh API.""" pass class ToolExecutionError(Exception): """Raised when there's an error executing a tool.""" pass # ===== API Client ===== async def call_mesh_api( path: str, method: str = "GET", json: Dict[str, Any] = None ) -> Dict[str, Any]: """Helper function to call the mesh API endpoint. Args: path: API path to call method: HTTP method to use json: Optional JSON payload Returns: API response as dictionary Raises: MeshApiError: If there's an error calling the API """ async with aiohttp.ClientSession() as session: url = f"{Config.HEURIST_API_ENDPOINT}/{path}" try: headers = {} if Config.HEURIST_API_KEY: headers["X-HEURIST-API-KEY"] = Config.HEURIST_API_KEY async with session.request( method, url, json=json, headers=headers ) as response: if response.status != 200: error_text = await response.text() raise MeshApiError(f"Mesh API error: {error_text}") return await response.json() except aiohttp.ClientError as e: logger.error(f"Error calling mesh API: {e}") raise MeshApiError(f"Failed to connect to mesh API: {str(e)}") from e # ===== Tool MCP Server ===== class MeshToolServer: """Encapsulates the MCP server for mesh agent tools.""" def __init__(self, supported_agents: Optional[List[str]] = Config.DEFAULT_AGENTS): """Initialize the server. Args: supported_agents: List of agent IDs to support, or None for all agents """ self.tool_registry: Dict[str, Dict[str, Any]] = {} self.supported_agents = supported_agents self.server = None async def fetch_agent_metadata(self) -> Dict[str, Dict[str, Any]]: """Fetch agent metadata from the API. Returns: Dictionary mapping agent IDs to their metadata Raises: MeshApiError: If there's an error fetching metadata """ logger.info(f"Fetching agent metadata from {Config.HEURIST_METADATA_ENDPOINT}") try: async with aiohttp.ClientSession() as session: async with session.get(Config.HEURIST_METADATA_ENDPOINT) as response: if response.status != 200: logger.error( f"Failed to fetch agent metadata: HTTP {response.status}" ) return {} data = await response.json() return data.get("agents", {}) except Exception as e: logger.error(f"Error fetching agent metadata: {e}") raise MeshApiError(f"Failed to fetch agent metadata: {str(e)}") from e async def process_tool_metadata(self) -> Dict[str, Dict[str, Any]]: """Process agent metadata and extract tool information. Returns: Dictionary mapping tool IDs to tool information """ agents_metadata = await self.fetch_agent_metadata() tool_registry = {} # Log filtering status if self.supported_agents is not None: logger.info( f"Filtering tools using supported agent list ({len(self.supported_agents)} agents specified)" ) else: logger.info("Loading tools from all available agents (no filter applied)") for agent_id, agent_data in agents_metadata.items(): # Skip agents not in our supported list (if a list is specified) if ( self.supported_agents is not None and agent_id not in self.supported_agents ): continue # Process tools for this agent for tool in agent_data.get("tools", []): if tool.get("type") == "function": function_data = tool.get("function", {}) tool_name = function_data.get("name") if not tool_name: continue # Create a unique tool ID tool_id = f"{agent_id.lower()}_{tool_name}" # Get parameters or create default schema parameters = function_data.get("parameters", {}) if not parameters: parameters = { "type": "object", "properties": {}, "required": [], } # Store tool info tool_registry[tool_id] = { "agent_id": agent_id, "tool_name": tool_name, "description": function_data.get("description", ""), "parameters": parameters, } # Log which agents contributed tools agents_with_tools = set(info["agent_id"] for info in tool_registry.values()) logger.info(f"Loaded tools from agents: {', '.join(sorted(agents_with_tools))}") logger.info(f"Successfully loaded {len(tool_registry)} tools") return tool_registry async def execute_tool( self, agent_id: str, tool_name: str, tool_arguments: Dict[str, Any] ) -> Dict[str, Any]: """Execute a tool on a mesh agent. Args: agent_id: ID of the agent to execute the tool on tool_name: Name of the tool to execute tool_arguments: Arguments to pass to the tool Returns: Tool execution result Raises: ToolExecutionError: If there's an error executing the tool """ request_data = { "agent_id": agent_id, "input": {"tool": tool_name, "tool_arguments": tool_arguments}, } # Add API key if available if Config.HEURIST_API_KEY: request_data["api_key"] = Config.HEURIST_API_KEY try: result = await call_mesh_api( "mesh_request", method="POST", json=request_data ) return result.get("data", result) # Prefer the 'data' field if it exists except MeshApiError as e: # Re-raise API errors with clearer context raise ToolExecutionError(str(e)) from e except Exception as e: logger.error(f"Error calling {agent_id} tool {tool_name}: {e}") raise ToolExecutionError( f"Failed to call {agent_id} tool {tool_name}: {str(e)}" ) from e async def initialize(self) -> Server: """Initialize by loading tools from metadata. Returns: The configured MCP server instance Raises: ValueError: If no tools could be loaded from metadata """ self.tool_registry = await self.process_tool_metadata() if not self.tool_registry: logger.warning( "No tools loaded from metadata. Check the metadata endpoint." ) self.server = self._create_server() return self.server def _create_server(self) -> Server: """Create and configure the MCP server with all tools. Returns: Configured MCP server instance """ app = Server("mesh-agent-tools-mcp-server") @app.list_tools() async def list_tools() -> List[types.Tool]: """List all available tools.""" return [ types.Tool( name=tool_id, description=tool_info["description"], inputSchema=tool_info["parameters"], ) for tool_id, tool_info in self.tool_registry.items() ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> List[types.TextContent]: """Call the specified tool with the given arguments.""" try: if name not in self.tool_registry: raise ValueError(f"Unknown tool: {name}") tool_info = self.tool_registry[name] result = await self.execute_tool( agent_id=tool_info["agent_id"], tool_name=tool_info["tool_name"], tool_arguments=arguments, ) # Convert result to TextContent return [types.TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"Error calling tool {name}: {e}") raise ValueError(f"Failed to call tool {name}: {str(e)}") from e return app async def run_stdio(self): """Run the server using stdio transport.""" if not self.server: await self.initialize() logger.info("Starting stdio server") async with stdio_server() as streams: await self.server.run( streams[0], streams[1], self.server.create_initialization_options() ) def run_sse(self, port: int, base_path: str = ""): """Run the server using SSE transport. Args: port: Port to listen on base_path: Optional base path for URL construction """ if not self.server: anyio.run(self.initialize) # Use the base_path for messages endpoint messages_path = "/messages/" messages_endpoint = ( f"{base_path}{messages_path}" if base_path else messages_path ) sse = SseServerTransport(messages_endpoint) async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await self.server.run( streams[0], streams[1], self.server.create_initialization_options() ) starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) logger.info(f"Starting SSE server on port {port}") uvicorn.run(starlette_app, host="0.0.0.0", port=port) # ===== CLI Entry Point ===== @click.command() @click.option("--port", default=8000, help="Port to listen on for SSE") @click.option( "--transport", type=click.Choice(["stdio", "sse"]), default="stdio", help="Transport type", ) @click.option( "--base-path", default="", help="Base path for URL construction (e.g. /mcp)", is_flag=False, flag_value="", required=False, ) @click.option( "--all-agents", is_flag=True, help="Load all available agents instead of the default list", ) def main(port: int, transport: str, base_path: str, all_agents: bool) -> int: """Run the Mesh Agent Tools MCP Server. Connects to mesh API endpoints and provides tools for tool execution. """ # Create server instance with appropriate agent filtering supported_agents = None if all_agents else Config.DEFAULT_AGENTS server = MeshToolServer(supported_agents=supported_agents) # Run with appropriate transport if transport == "sse": server.run_sse(port, base_path) else: anyio.run(server.run_stdio) return 0 if __name__ == "__main__": main() # pylint: disable=no-value-for-parameter