Skip to main content
Glama
mcp_server.py5.34 kB
"""Core MCP server implementation for KYC API integration.""" import json from typing import Any, Sequence from mcp.server import Server from mcp.types import TextContent, Tool from src.cache.redis_cache import CacheManager from src.registry.tool_registry import ToolRegistry from src.utils.logger import get_logger from src.utils.rate_limiter import RateLimiter logger = get_logger(__name__) class KYCMCPServer: """ Core MCP server implementation for KYC API integration. """ def __init__( self, tool_registry: ToolRegistry, cache_manager: CacheManager, rate_limiter: RateLimiter, ): """ Initialize MCP server. Args: tool_registry: Tool registry instance cache_manager: Cache manager instance rate_limiter: Rate limiter instance """ self.server = Server("kyc-mcp-server") self.tool_registry = tool_registry self.cache_manager = cache_manager self.rate_limiter = rate_limiter self._register_handlers() logger.info("mcp_server_initialized") def _register_handlers(self) -> None: """Register MCP protocol handlers.""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List all available tools.""" logger.debug("list_tools_called") tools = [] for tool_schema in self.tool_registry.list_tools(): tools.append( Tool( name=tool_schema["name"], description=tool_schema["description"], inputSchema=tool_schema["inputSchema"], ) ) logger.info("tools_listed", count=len(tools)) return tools @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]: """Execute a tool with given arguments.""" logger.info("tool_called", tool=name) try: # Rate limiting check if not await self.rate_limiter.check_limit(name): retry_after = self.rate_limiter.get_retry_after(name) error_msg = { "error": "RATE_LIMIT_EXCEEDED", "message": f"Rate limit exceeded. Retry after {retry_after} seconds.", "retry_after": retry_after, } logger.warning("rate_limit_exceeded", tool=name, retry_after=retry_after) return [TextContent(type="text", text=json.dumps(error_msg))] # Get tool instance tool = self.tool_registry.get_tool(name) if not tool: error_msg = { "error": "TOOL_NOT_FOUND", "message": f"Unknown tool: {name}", } logger.error("tool_not_found", tool=name) return [TextContent(type="text", text=json.dumps(error_msg))] # Check cache cache_key = tool.get_cache_key(arguments) if cache_key: cached_result = await self.cache_manager.get(cache_key) if cached_result: logger.info("cache_hit", tool=name, cache_key=cache_key) cached_result["_cached"] = True return [ TextContent(type="text", text=json.dumps(cached_result)) ] # Execute tool result = await tool.execute(arguments) # Cache result if cache_key: ttl = tool.get_cache_ttl() await self.cache_manager.set(cache_key, result, ttl=ttl) logger.debug("result_cached", tool=name, cache_key=cache_key, ttl=ttl) result["_cached"] = False logger.info("tool_executed_successfully", tool=name) return [TextContent(type="text", text=json.dumps(result))] except ValueError as e: # Validation errors error_msg = { "error": "VALIDATION_ERROR", "message": str(e), } logger.error("tool_validation_error", tool=name, error=str(e)) return [TextContent(type="text", text=json.dumps(error_msg))] except Exception as e: # General errors error_msg = { "error": "EXECUTION_ERROR", "message": f"Tool execution failed: {str(e)}", } logger.error("tool_execution_error", tool=name, error=str(e), exc_info=True) return [TextContent(type="text", text=json.dumps(error_msg))] async def run(self) -> None: """Start the MCP server.""" from mcp.server.stdio import stdio_server logger.info("mcp_server_starting") async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options(), )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CTD-Techs/CTD-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server