Skip to main content
Glama

MCP Router

mcp_router.py6.84 kB
import json import asyncio import httpx from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from mcp.server.fastmcp import FastMCP # Initialize the discovery service try: from discovery_service import DiscoveryService discovery_service = DiscoveryService() except ImportError: # Create a mock discovery service for testing class MockDiscoveryService: def search_server(self, query, top_k=3): return [{ "server_name": "example_service", "server_description": "An example service for demonstration", "server_endpoint": "http://example.com/mcp", "tools": "[]", "score": 0.95 }] def add_server(self, server_name, server_description, server_endpoint, tools): return {"status": "success", "message": f"Server '{server_name}' registered."} def get_server_endpoint(self, server_name): return "http://example.com/mcp" discovery_service = MockDiscoveryService() # Try to import SSE components for services that require it try: from mcp import ClientSession from mcp.client.sse import sse_client from contextlib import AsyncExitStack HAVE_SSE_SUPPORT = True except ImportError: HAVE_SSE_SUPPORT = False print("MCP SSE components not available. SSE-based services will not work.") # Define data models for our tools class SearchServerRequest(BaseModel): query: str top_k: Optional[int] = Field(default=3, description="Number of top results to return") class AddServerRequest(BaseModel): server_name: str server_description: str server_endpoint: str tools: List[Dict[str, Any]] class ExecToolRequest(BaseModel): target_server_name: str target_tool_name: str parameters: Dict[str, Any] # Initialize the MCP server mcp = FastMCP("mcp-router", port=9000) @mcp.tool() async def search_mcp_server(query: str, top_k: Optional[int] = 3) -> str: """ Search for registered MCP servers based on a query. Args: query: Search query top_k: Number of top results to return (default: 3) """ try: # Search for servers results = discovery_service.search_server(query, top_k) # Parse tools JSON strings back to objects for result in results: result["tools"] = json.loads(result["tools"]) return json.dumps(results, ensure_ascii=False) except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) @mcp.tool() async def add_mcp_server( server_name: str, server_description: str, server_endpoint: str, tools: List[Dict[str, Any]] ) -> str: """ Register a new MCP server with the discovery service. Args: server_name: Unique name of the server server_description: Description of the server server_endpoint: Endpoint URL of the server tools: List of tools provided by the server """ try: # Serialize tools list to JSON string tools_json = json.dumps(tools, ensure_ascii=False) # Add server to discovery service result = discovery_service.add_server( server_name, server_description, server_endpoint, tools_json ) return json.dumps(result, ensure_ascii=False) except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) @mcp.tool() async def exec_mcp_tool( target_server_name: str, target_tool_name: str, parameters: Dict[str, Any] ) -> str: """ Execute a tool on a target MCP server. Args: target_server_name: Name of the target server target_tool_name: Name of the tool to execute parameters: Parameters for the tool """ try: # Get target server endpoint target_endpoint = discovery_service.get_server_endpoint(target_server_name) # Check if this is an SSE service (like AMap) if "sse" in target_endpoint.lower() and HAVE_SSE_SUPPORT: # Use SSE connection for services that require it return await _execute_sse_tool(target_endpoint, target_tool_name, parameters) else: # Use HTTP POST for standard MCP services return await _execute_http_tool(target_endpoint, target_tool_name, parameters) except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) async def _execute_sse_tool(endpoint: str, tool_name: str, parameters: Dict[str, Any]) -> str: """ Execute a tool on a target MCP server using SSE connection. Args: endpoint: The SSE endpoint URL tool_name: Name of the tool to execute parameters: Parameters for the tool """ exit_stack = AsyncExitStack() try: # Create SSE client sse_cm = sse_client(endpoint) streams = await exit_stack.enter_async_context(sse_cm) # Create session session_cm = ClientSession(streams[0], streams[1]) session = await exit_stack.enter_async_context(session_cm) # Initialize session await session.initialize() # Execute tool result = await session.call_tool(tool_name, parameters) # Convert result to dict if it's a CallToolResult object if hasattr(result, '_asdict'): result = result._asdict() elif hasattr(result, '__dict__'): result = result.__dict__ # Return result as JSON return json.dumps(result, ensure_ascii=False, default=str) finally: # Clean up await exit_stack.aclose() async def _execute_http_tool(endpoint: str, tool_name: str, parameters: Dict[str, Any]) -> str: """ Execute a tool on a target MCP server using HTTP POST. Args: endpoint: The HTTP endpoint URL tool_name: Name of the tool to execute parameters: Parameters for the tool """ # Construct JSON-RPC request json_rpc_request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": parameters }, "id": 1 } # Send request to target server async with httpx.AsyncClient() as client: response = await client.post( endpoint, json=json_rpc_request, headers={"Content-Type": "application/json"} ) # Return the response from the target server response_data = response.json() return json.dumps(response_data, ensure_ascii=False) if __name__ == "__main__": # Run the server mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Maverick-LjXuan/mcp-router'

If you have feedback or need assistance with the MCP directory API, please join our Discord server