server.pyโข15.7 kB
"""
Core MCP Server Implementation
This module provides the foundation for a universal MCP server testing and hosting platform.
It can host multiple MCP servers and provide testing capabilities.
"""
import asyncio
import json
import logging
import uuid
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class MCPServerInfo:
"""Information about a hosted MCP server."""
id: str
name: str
description: str
version: str
host: str
port: int
status: str # 'running', 'stopped', 'error'
created_at: datetime
tools: List[Dict[str, Any]]
config: Dict[str, Any]
last_health_check: Optional[datetime] = None
def __post_init__(self):
if not hasattr(self, 'tools') or self.tools is None:
self.tools = []
if not hasattr(self, 'config') or self.config is None:
self.config = {}
@dataclass
class MCPTool:
"""Represents an MCP tool."""
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
handler: Callable
class MCPServerRegistry:
"""Manages multiple MCP servers."""
def __init__(self):
self.servers: Dict[str, MCPServerInfo] = {}
self.tools: Dict[str, MCPTool] = {}
self._lock = asyncio.Lock()
async def register_server(self, server_info: MCPServerInfo) -> str:
"""Register a new MCP server."""
async with self._lock:
server_id = str(uuid.uuid4())
server_info.id = server_id
self.servers[server_id] = server_info
logger.info(f"Registered MCP server: {server_info.name} (ID: {server_id})")
return server_id
async def unregister_server(self, server_id: str) -> bool:
"""Unregister an MCP server."""
async with self._lock:
if server_id in self.servers:
server_name = self.servers[server_id].name
del self.servers[server_id]
logger.info(f"Unregistered MCP server: {server_name} (ID: {server_id})")
return True
return False
async def get_server(self, server_id: str) -> Optional[MCPServerInfo]:
"""Get server information by ID."""
return self.servers.get(server_id)
async def list_servers(self) -> List[MCPServerInfo]:
"""List all registered servers."""
return list(self.servers.values())
async def update_server_status(self, server_id: str, status: str) -> bool:
"""Update server status."""
async with self._lock:
if server_id in self.servers:
self.servers[server_id].status = status
self.servers[server_id].last_health_check = datetime.now()
return True
return False
async def register_tool(self, tool: MCPTool) -> None:
"""Register a new tool."""
async with self._lock:
self.tools[tool.name] = tool
logger.info(f"Registered tool: {tool.name}")
async def get_tool(self, tool_name: str) -> Optional[MCPTool]:
"""Get tool by name."""
return self.tools.get(tool_name)
async def list_tools(self) -> List[MCPTool]:
"""List all registered tools."""
return list(self.tools.values())
class MCPServer:
"""Main MCP server that can host and test other MCP servers."""
def __init__(self, host: str = "0.0.0.0", port: int = 8000, debug: bool = False):
self.host = host
self.port = port
self.debug = debug
self.registry = MCPServerRegistry()
self.server = None
self.running = False
# Built-in tools for server management
self._setup_builtin_tools()
def _setup_builtin_tools(self):
"""Setup built-in tools for server management."""
self.builtin_tools = {
"list_servers": {
"name": "list_servers",
"description": "List all hosted MCP servers",
"input_schema": {},
"output_schema": {
"type": "object",
"properties": {
"servers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"status": {"type": "string"},
"host": {"type": "string"},
"port": {"type": "integer"}
}
}
}
}
}
},
"register_server": {
"name": "register_server",
"description": "Register a new MCP server",
"input_schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": "string"},
"host": {"type": "string"},
"port": {"type": "integer"},
"config": {"type": "object"}
},
"required": ["name", "host", "port"]
},
"output_schema": {
"type": "object",
"properties": {
"server_id": {"type": "string"},
"success": {"type": "boolean"}
}
}
},
"test_server": {
"name": "test_server",
"description": "Test an MCP server connection",
"input_schema": {
"type": "object",
"properties": {
"server_id": {"type": "string"}
},
"required": ["server_id"]
},
"output_schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"status": {"type": "string"},
"tools": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
}
async def start(self):
"""Start the MCP server."""
try:
# Start the server
self.server = await asyncio.start_server(
self._handle_connection,
self.host,
self.port
)
self.running = True
logger.info(f"MCP Server started on {self.host}:{self.port}")
# Keep the server running
async with self.server:
await self.server.serve_forever()
except Exception as e:
logger.error(f"Failed to start MCP server: {e}")
raise
async def stop(self):
"""Stop the MCP server."""
if self.server:
self.server.close()
await self.server.wait_closed()
self.running = False
logger.info("MCP Server stopped")
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Handle incoming MCP connections."""
try:
while True:
# Read the request
data = await reader.read(1024)
if not data:
break
# Parse the request
try:
request = json.loads(data.decode())
except json.JSONDecodeError:
await self._send_error(writer, "Invalid JSON")
continue
# Handle the request
response = await self._process_request(request)
# Send the response
await self._send_response(writer, response)
except Exception as e:
logger.error(f"Error handling connection: {e}")
try:
await self._send_error(writer, str(e))
except:
pass
finally:
writer.close()
await writer.wait_closed()
async def _process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Process an MCP request."""
method = request.get("method")
params = request.get("params", {})
if method == "tools/list":
return await self._handle_list_tools()
elif method == "tools/call":
return await self._handle_call_tool(params)
elif method == "initialize":
return await self._handle_initialize(params)
else:
return {"error": {"code": -32601, "message": "Method not found"}}
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialization request."""
return {
"jsonrpc": "2.0",
"id": params.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "MCP Testing Harness",
"version": "1.0.0"
}
}
}
async def _handle_list_tools(self) -> Dict[str, Any]:
"""Handle tools listing request."""
tools = []
# Add built-in tools
for tool_name, tool_info in self.builtin_tools.items():
tools.append({
"name": tool_info["name"],
"description": tool_info["description"],
"inputSchema": tool_info["input_schema"]
})
# Add registered tools
registered_tools = await self.registry.list_tools()
for tool in registered_tools:
tools.append({
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
})
return {
"jsonrpc": "2.0",
"result": {
"tools": tools
}
}
async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tool call request."""
tool_name = params.get("name")
arguments = params.get("arguments", {})
# Check if it's a built-in tool
if tool_name in self.builtin_tools:
return await self._call_builtin_tool(tool_name, arguments)
# Check if it's a registered tool
if tool_name is None:
return {
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Tool name is required"
}
}
tool = await self.registry.get_tool(tool_name)
if tool:
try:
result = await tool.handler(arguments)
return {
"jsonrpc": "2.0",
"result": {
"content": result
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": f"Tool execution failed: {str(e)}"
}
}
return {
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": f"Tool '{tool_name}' not found"
}
}
async def _call_builtin_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Call a built-in tool."""
if tool_name == "list_servers":
servers = await self.registry.list_servers()
return {
"jsonrpc": "2.0",
"result": {
"content": {
"servers": [asdict(server) for server in servers]
}
}
}
elif tool_name == "register_server":
server_info = MCPServerInfo(
id="", # Will be set by registry
name=arguments["name"],
description=arguments.get("description", ""),
version="1.0.0",
host=arguments["host"],
port=arguments["port"],
status="stopped",
created_at=datetime.now(),
tools=[],
config=arguments.get("config", {})
)
server_id = await self.registry.register_server(server_info)
return {
"jsonrpc": "2.0",
"result": {
"content": {
"server_id": server_id,
"success": True
}
}
}
elif tool_name == "test_server":
server_id = arguments["server_id"]
server = await self.registry.get_server(server_id)
if server:
# Simulate testing the server
await self.registry.update_server_status(server_id, "running")
return {
"jsonrpc": "2.0",
"result": {
"content": {
"success": True,
"status": "running",
"tools": ["test_tool_1", "test_tool_2"]
}
}
}
else:
return {
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": f"Server '{server_id}' not found"
}
}
return {
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": f"Built-in tool '{tool_name}' not implemented"
}
}
async def _send_response(self, writer: asyncio.StreamWriter, response: Dict[str, Any]):
"""Send a response to the client."""
data = json.dumps(response).encode()
writer.write(data)
await writer.drain()
async def _send_error(self, writer: asyncio.StreamWriter, message: str):
"""Send an error response to the client."""
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32603,
"message": message
}
}
await self._send_response(writer, error_response)