server.py•10 kB
"""
Base MCP server implementation with standard capabilities.
"""
import asyncio
import signal
from typing import Any, Callable, Dict, List, Optional, Sequence
import mcp.server.stdio
import mcp.types as types
from mcp.server import Server
from mcp.server.models import InitializationOptions
from .config_loader import Settings
from .logger import get_logger
class MCPServer:
"""
Base MCP server implementation with extensible tool, resource, and prompt handlers.
This class provides the foundation for building MCP servers with standard
capabilities including tools, resources, and prompts. It handles server
lifecycle, graceful shutdown, and provides hooks for extensibility.
"""
def __init__(self, settings: Settings):
"""
Initialize the MCP server.
Args:
settings: Server configuration settings
"""
self.settings = settings
self.logger = get_logger(__name__, server=settings.server.name)
# Create MCP server instance
self.server = Server(settings.server.name)
# Shutdown flag
self._shutdown_event = asyncio.Event()
self._running = False
# Registry for handlers
self._tool_handlers: Dict[str, Callable] = {}
self._resource_handlers: Dict[str, Callable] = {}
self._prompt_handlers: Dict[str, Callable] = {}
# Setup signal handlers for graceful shutdown
self._setup_signal_handlers()
# Register MCP handlers
self._register_handlers()
self.logger.info(
"MCP server initialized",
version=settings.server.version,
debug=settings.server.debug,
)
def _setup_signal_handlers(self) -> None:
"""Setup signal handlers for graceful shutdown."""
def signal_handler(signum: int, frame: Any) -> None:
self.logger.info("Received shutdown signal", signal=signum)
self._shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def _register_handlers(self) -> None:
"""Register MCP protocol handlers."""
# List available tools
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
self.logger.debug("Listing available tools")
tools = []
for tool_name, handler in self._tool_handlers.items():
if tool_name in self.settings.tools.enabled:
# Get tool schema from handler
if hasattr(handler, "__tool_schema__"):
tools.append(handler.__tool_schema__)
return tools
# Call tool
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: Optional[Dict[str, Any]] = None
) -> Sequence[types.TextContent | types.ImageContent | types.EmbeddedResource]:
self.logger.info("Tool called", tool=name, arguments=arguments)
if name not in self._tool_handlers:
raise ValueError(f"Unknown tool: {name}")
if name not in self.settings.tools.enabled:
raise ValueError(f"Tool not enabled: {name}")
# Call the tool handler
handler = self._tool_handlers[name]
result = await handler(arguments or {})
# Return result as TextContent
return [types.TextContent(type="text", text=str(result))]
# List available resources
@self.server.list_resources()
async def handle_list_resources() -> List[types.Resource]:
self.logger.debug("Listing available resources")
resources = []
for resource_name, handler in self._resource_handlers.items():
if resource_name in self.settings.resources.enabled:
# Get resource schema from handler
if hasattr(handler, "__resource_schema__"):
resources.append(handler.__resource_schema__)
return resources
# Read resource
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
self.logger.info("Resource read", uri=uri)
# Extract resource name from URI
resource_name = uri.split("://")[-1].split("/")[0]
if resource_name not in self._resource_handlers:
raise ValueError(f"Unknown resource: {resource_name}")
if resource_name not in self.settings.resources.enabled:
raise ValueError(f"Resource not enabled: {resource_name}")
# Call the resource handler
handler = self._resource_handlers[resource_name]
result = await handler(uri)
return str(result)
# List available prompts
@self.server.list_prompts()
async def handle_list_prompts() -> List[types.Prompt]:
self.logger.debug("Listing available prompts")
prompts = []
for prompt_name, handler in self._prompt_handlers.items():
if prompt_name in self.settings.prompts.enabled:
# Get prompt schema from handler
if hasattr(handler, "__prompt_schema__"):
prompts.append(handler.__prompt_schema__)
return prompts
# Get prompt
@self.server.get_prompt()
async def handle_get_prompt(
name: str, arguments: Optional[Dict[str, Any]] = None
) -> types.GetPromptResult:
self.logger.info("Prompt requested", prompt=name, arguments=arguments)
if name not in self._prompt_handlers:
raise ValueError(f"Unknown prompt: {name}")
if name not in self.settings.prompts.enabled:
raise ValueError(f"Prompt not enabled: {name}")
# Call the prompt handler
handler = self._prompt_handlers[name]
result = await handler(arguments or {})
return result
def register_tool(
self,
name: str,
handler: Callable,
schema: types.Tool,
) -> None:
"""
Register a tool handler.
Args:
name: Tool name
handler: Async function that handles the tool call
schema: Tool schema definition
"""
self._tool_handlers[name] = handler
handler.__tool_schema__ = schema
self.logger.debug("Tool registered", tool=name)
def register_resource(
self,
name: str,
handler: Callable,
schema: types.Resource,
) -> None:
"""
Register a resource handler.
Args:
name: Resource name
handler: Async function that handles the resource read
schema: Resource schema definition
"""
self._resource_handlers[name] = handler
handler.__resource_schema__ = schema
self.logger.debug("Resource registered", resource=name)
def register_prompt(
self,
name: str,
handler: Callable,
schema: types.Prompt,
) -> None:
"""
Register a prompt handler.
Args:
name: Prompt name
handler: Async function that handles the prompt request
schema: Prompt schema definition
"""
self._prompt_handlers[name] = handler
handler.__prompt_schema__ = schema
self.logger.debug("Prompt registered", prompt=name)
async def run(self) -> None:
"""
Run the MCP server.
This method starts the server and handles the lifecycle until
a shutdown signal is received.
"""
try:
self._running = True
self.logger.info("Starting MCP server")
# Run the server using stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.settings.server.name,
server_version=self.settings.server.version,
),
)
except Exception as e:
self.logger.error("Server error", error=str(e), exc_info=True)
raise
finally:
await self.shutdown()
async def shutdown(self) -> None:
"""
Gracefully shutdown the server.
This method should be called when the server needs to stop.
It handles cleanup and ensures graceful shutdown.
"""
if not self._running:
return
self.logger.info("Shutting down MCP server")
self._running = False
# Perform cleanup tasks here
# (e.g., close connections, save state, deregister from registry)
self.logger.info("MCP server shutdown complete")
async def health_check(self) -> Dict[str, Any]:
"""
Perform health check.
Returns:
Dictionary containing health status information
"""
health_status = {
"status": "healthy",
"server": {
"name": self.settings.server.name,
"version": self.settings.server.version,
"running": self._running,
},
"checks": {},
}
# Add component health checks
if "tools" in self.settings.health.checks:
health_status["checks"]["tools"] = {
"enabled": len(self.settings.tools.enabled),
"registered": len(self._tool_handlers),
}
if "resources" in self.settings.health.checks:
health_status["checks"]["resources"] = {
"enabled": len(self.settings.resources.enabled),
"registered": len(self._resource_handlers),
}
return health_status