Skip to main content
Glama

MCP Learning Project

by BerdTan
server.pyโ€ข15.7 kB
""" Core MCP Server Implementation This module provides the foundation for a universal MCP server testing and hosting platform. It can host multiple MCP servers and provide testing capabilities. """ import asyncio import json import logging import uuid from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) @dataclass class MCPServerInfo: """Information about a hosted MCP server.""" id: str name: str description: str version: str host: str port: int status: str # 'running', 'stopped', 'error' created_at: datetime tools: List[Dict[str, Any]] config: Dict[str, Any] last_health_check: Optional[datetime] = None def __post_init__(self): if not hasattr(self, 'tools') or self.tools is None: self.tools = [] if not hasattr(self, 'config') or self.config is None: self.config = {} @dataclass class MCPTool: """Represents an MCP tool.""" name: str description: str input_schema: Dict[str, Any] output_schema: Dict[str, Any] handler: Callable class MCPServerRegistry: """Manages multiple MCP servers.""" def __init__(self): self.servers: Dict[str, MCPServerInfo] = {} self.tools: Dict[str, MCPTool] = {} self._lock = asyncio.Lock() async def register_server(self, server_info: MCPServerInfo) -> str: """Register a new MCP server.""" async with self._lock: server_id = str(uuid.uuid4()) server_info.id = server_id self.servers[server_id] = server_info logger.info(f"Registered MCP server: {server_info.name} (ID: {server_id})") return server_id async def unregister_server(self, server_id: str) -> bool: """Unregister an MCP server.""" async with self._lock: if server_id in self.servers: server_name = self.servers[server_id].name del self.servers[server_id] logger.info(f"Unregistered MCP server: {server_name} (ID: {server_id})") return True return False async def get_server(self, server_id: str) -> Optional[MCPServerInfo]: """Get server information by ID.""" return self.servers.get(server_id) async def list_servers(self) -> List[MCPServerInfo]: """List all registered servers.""" return list(self.servers.values()) async def update_server_status(self, server_id: str, status: str) -> bool: """Update server status.""" async with self._lock: if server_id in self.servers: self.servers[server_id].status = status self.servers[server_id].last_health_check = datetime.now() return True return False async def register_tool(self, tool: MCPTool) -> None: """Register a new tool.""" async with self._lock: self.tools[tool.name] = tool logger.info(f"Registered tool: {tool.name}") async def get_tool(self, tool_name: str) -> Optional[MCPTool]: """Get tool by name.""" return self.tools.get(tool_name) async def list_tools(self) -> List[MCPTool]: """List all registered tools.""" return list(self.tools.values()) class MCPServer: """Main MCP server that can host and test other MCP servers.""" def __init__(self, host: str = "0.0.0.0", port: int = 8000, debug: bool = False): self.host = host self.port = port self.debug = debug self.registry = MCPServerRegistry() self.server = None self.running = False # Built-in tools for server management self._setup_builtin_tools() def _setup_builtin_tools(self): """Setup built-in tools for server management.""" self.builtin_tools = { "list_servers": { "name": "list_servers", "description": "List all hosted MCP servers", "input_schema": {}, "output_schema": { "type": "object", "properties": { "servers": { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "string"}, "name": {"type": "string"}, "status": {"type": "string"}, "host": {"type": "string"}, "port": {"type": "integer"} } } } } } }, "register_server": { "name": "register_server", "description": "Register a new MCP server", "input_schema": { "type": "object", "properties": { "name": {"type": "string"}, "description": {"type": "string"}, "host": {"type": "string"}, "port": {"type": "integer"}, "config": {"type": "object"} }, "required": ["name", "host", "port"] }, "output_schema": { "type": "object", "properties": { "server_id": {"type": "string"}, "success": {"type": "boolean"} } } }, "test_server": { "name": "test_server", "description": "Test an MCP server connection", "input_schema": { "type": "object", "properties": { "server_id": {"type": "string"} }, "required": ["server_id"] }, "output_schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "status": {"type": "string"}, "tools": { "type": "array", "items": {"type": "string"} } } } } } async def start(self): """Start the MCP server.""" try: # Start the server self.server = await asyncio.start_server( self._handle_connection, self.host, self.port ) self.running = True logger.info(f"MCP Server started on {self.host}:{self.port}") # Keep the server running async with self.server: await self.server.serve_forever() except Exception as e: logger.error(f"Failed to start MCP server: {e}") raise async def stop(self): """Stop the MCP server.""" if self.server: self.server.close() await self.server.wait_closed() self.running = False logger.info("MCP Server stopped") async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """Handle incoming MCP connections.""" try: while True: # Read the request data = await reader.read(1024) if not data: break # Parse the request try: request = json.loads(data.decode()) except json.JSONDecodeError: await self._send_error(writer, "Invalid JSON") continue # Handle the request response = await self._process_request(request) # Send the response await self._send_response(writer, response) except Exception as e: logger.error(f"Error handling connection: {e}") try: await self._send_error(writer, str(e)) except: pass finally: writer.close() await writer.wait_closed() async def _process_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Process an MCP request.""" method = request.get("method") params = request.get("params", {}) if method == "tools/list": return await self._handle_list_tools() elif method == "tools/call": return await self._handle_call_tool(params) elif method == "initialize": return await self._handle_initialize(params) else: return {"error": {"code": -32601, "message": "Method not found"}} async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialization request.""" return { "jsonrpc": "2.0", "id": params.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "MCP Testing Harness", "version": "1.0.0" } } } async def _handle_list_tools(self) -> Dict[str, Any]: """Handle tools listing request.""" tools = [] # Add built-in tools for tool_name, tool_info in self.builtin_tools.items(): tools.append({ "name": tool_info["name"], "description": tool_info["description"], "inputSchema": tool_info["input_schema"] }) # Add registered tools registered_tools = await self.registry.list_tools() for tool in registered_tools: tools.append({ "name": tool.name, "description": tool.description, "inputSchema": tool.input_schema }) return { "jsonrpc": "2.0", "result": { "tools": tools } } async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tool call request.""" tool_name = params.get("name") arguments = params.get("arguments", {}) # Check if it's a built-in tool if tool_name in self.builtin_tools: return await self._call_builtin_tool(tool_name, arguments) # Check if it's a registered tool if tool_name is None: return { "jsonrpc": "2.0", "error": { "code": -32602, "message": "Tool name is required" } } tool = await self.registry.get_tool(tool_name) if tool: try: result = await tool.handler(arguments) return { "jsonrpc": "2.0", "result": { "content": result } } except Exception as e: return { "jsonrpc": "2.0", "error": { "code": -32603, "message": f"Tool execution failed: {str(e)}" } } return { "jsonrpc": "2.0", "error": { "code": -32601, "message": f"Tool '{tool_name}' not found" } } async def _call_builtin_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Call a built-in tool.""" if tool_name == "list_servers": servers = await self.registry.list_servers() return { "jsonrpc": "2.0", "result": { "content": { "servers": [asdict(server) for server in servers] } } } elif tool_name == "register_server": server_info = MCPServerInfo( id="", # Will be set by registry name=arguments["name"], description=arguments.get("description", ""), version="1.0.0", host=arguments["host"], port=arguments["port"], status="stopped", created_at=datetime.now(), tools=[], config=arguments.get("config", {}) ) server_id = await self.registry.register_server(server_info) return { "jsonrpc": "2.0", "result": { "content": { "server_id": server_id, "success": True } } } elif tool_name == "test_server": server_id = arguments["server_id"] server = await self.registry.get_server(server_id) if server: # Simulate testing the server await self.registry.update_server_status(server_id, "running") return { "jsonrpc": "2.0", "result": { "content": { "success": True, "status": "running", "tools": ["test_tool_1", "test_tool_2"] } } } else: return { "jsonrpc": "2.0", "error": { "code": -32602, "message": f"Server '{server_id}' not found" } } return { "jsonrpc": "2.0", "error": { "code": -32601, "message": f"Built-in tool '{tool_name}' not implemented" } } async def _send_response(self, writer: asyncio.StreamWriter, response: Dict[str, Any]): """Send a response to the client.""" data = json.dumps(response).encode() writer.write(data) await writer.drain() async def _send_error(self, writer: asyncio.StreamWriter, message: str): """Send an error response to the client.""" error_response = { "jsonrpc": "2.0", "error": { "code": -32603, "message": message } } await self._send_response(writer, error_response)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BerdTan/mcpharness'

If you have feedback or need assistance with the MCP directory API, please join our Discord server