mcp_server.py•5.34 kB
"""Core MCP server implementation for KYC API integration."""
import json
from typing import Any, Sequence
from mcp.server import Server
from mcp.types import TextContent, Tool
from src.cache.redis_cache import CacheManager
from src.registry.tool_registry import ToolRegistry
from src.utils.logger import get_logger
from src.utils.rate_limiter import RateLimiter
logger = get_logger(__name__)
class KYCMCPServer:
"""
Core MCP server implementation for KYC API integration.
"""
def __init__(
self,
tool_registry: ToolRegistry,
cache_manager: CacheManager,
rate_limiter: RateLimiter,
):
"""
Initialize MCP server.
Args:
tool_registry: Tool registry instance
cache_manager: Cache manager instance
rate_limiter: Rate limiter instance
"""
self.server = Server("kyc-mcp-server")
self.tool_registry = tool_registry
self.cache_manager = cache_manager
self.rate_limiter = rate_limiter
self._register_handlers()
logger.info("mcp_server_initialized")
def _register_handlers(self) -> None:
"""Register MCP protocol handlers."""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
logger.debug("list_tools_called")
tools = []
for tool_schema in self.tool_registry.list_tools():
tools.append(
Tool(
name=tool_schema["name"],
description=tool_schema["description"],
inputSchema=tool_schema["inputSchema"],
)
)
logger.info("tools_listed", count=len(tools))
return tools
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
"""Execute a tool with given arguments."""
logger.info("tool_called", tool=name)
try:
# Rate limiting check
if not await self.rate_limiter.check_limit(name):
retry_after = self.rate_limiter.get_retry_after(name)
error_msg = {
"error": "RATE_LIMIT_EXCEEDED",
"message": f"Rate limit exceeded. Retry after {retry_after} seconds.",
"retry_after": retry_after,
}
logger.warning("rate_limit_exceeded", tool=name, retry_after=retry_after)
return [TextContent(type="text", text=json.dumps(error_msg))]
# Get tool instance
tool = self.tool_registry.get_tool(name)
if not tool:
error_msg = {
"error": "TOOL_NOT_FOUND",
"message": f"Unknown tool: {name}",
}
logger.error("tool_not_found", tool=name)
return [TextContent(type="text", text=json.dumps(error_msg))]
# Check cache
cache_key = tool.get_cache_key(arguments)
if cache_key:
cached_result = await self.cache_manager.get(cache_key)
if cached_result:
logger.info("cache_hit", tool=name, cache_key=cache_key)
cached_result["_cached"] = True
return [
TextContent(type="text", text=json.dumps(cached_result))
]
# Execute tool
result = await tool.execute(arguments)
# Cache result
if cache_key:
ttl = tool.get_cache_ttl()
await self.cache_manager.set(cache_key, result, ttl=ttl)
logger.debug("result_cached", tool=name, cache_key=cache_key, ttl=ttl)
result["_cached"] = False
logger.info("tool_executed_successfully", tool=name)
return [TextContent(type="text", text=json.dumps(result))]
except ValueError as e:
# Validation errors
error_msg = {
"error": "VALIDATION_ERROR",
"message": str(e),
}
logger.error("tool_validation_error", tool=name, error=str(e))
return [TextContent(type="text", text=json.dumps(error_msg))]
except Exception as e:
# General errors
error_msg = {
"error": "EXECUTION_ERROR",
"message": f"Tool execution failed: {str(e)}",
}
logger.error("tool_execution_error", tool=name, error=str(e), exc_info=True)
return [TextContent(type="text", text=json.dumps(error_msg))]
async def run(self) -> None:
"""Start the MCP server."""
from mcp.server.stdio import stdio_server
logger.info("mcp_server_starting")
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options(),
)