Skip to main content
Glama

Katamari MCP Server

by ciphernaut
server.py10.2 kB
""" Main MCP server implementation - MVP version with no ML dependencies. """ import asyncio import logging from typing import Dict, List, Optional from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import ( CallToolRequest, CallToolResult, ListToolsRequest, ListToolsResult, Tool, TextContent, ) # Intelligent router with LLM-powered routing from katamari_mcp.router.intelligent_router import IntelligentRouter from katamari_mcp.security.validator import SecurityValidator from katamari_mcp.utils.config import Config from katamari_mcp.transport.manager import TransportManager from katamari_mcp.transport.base import TransportMessage # Phase 3 components disabled for MVP # from katamari_mcp.acp.workflow_optimizer import WorkflowOptimizer # from katamari_mcp.acp.predictive_engine import PredictiveEngine # from katamari_mcp.acp.knowledge_transfer import KnowledgeTransferEngine # from katamari_mcp.acp.self_healing import SelfHealingEngine logger = logging.getLogger(__name__) class KatamariServer: """Main Katamari MCP server with multi-transport support - MVP version.""" def __init__(self, transport_config: Optional[Dict] = None): self.server = Server("katamari-mcp") self.router = IntelligentRouter() # Use intelligent router with LLM self.security_validator = SecurityValidator() self.config = Config() # ACP and Phase 3 components disabled for MVP self.acp_controller = None self.workflow_optimizer = None self.predictive_engine = None self.knowledge_transfer = None self.self_healing = None # Initialize transport manager self.transport_manager = TransportManager(transport_config or self.config.get("transport", {})) self._setup_handlers() self._setup_transport_handlers() def _setup_handlers(self): """Setup MCP server handlers.""" @self.server.list_tools() async def list_tools(request: ListToolsRequest) -> ListToolsResult: """List all available capabilities as tools.""" capabilities = await self.router.list_capabilities() tools = [ Tool( name=cap["name"], description=cap["description"], inputSchema=cap.get("input_schema", {}) ) for cap in capabilities ] return ListToolsResult(tools=tools) @self.server.call_tool() async def call_tool(tool_name: str, arguments: dict) -> CallToolResult: """Route tool call to appropriate capability.""" try: # Route to capability result = await self.router.route_call( tool_name=tool_name, arguments=arguments or {} ) return CallToolResult( content=[TextContent(type="text", text=result)], isError=False ) except Exception as e: logger.error(f"Tool call failed: {e}") error_msg = await self.router.format_error(e) return CallToolResult( content=[TextContent(type="text", text=error_msg)], isError=True ) def _setup_transport_handlers(self): """Setup handlers for transport layer.""" # Register MCP method handlers with transport manager self.transport_manager.register_handler("tools/list", self._handle_list_tools) self.transport_manager.register_handler("tools/call", self._handle_call_tool) self.transport_manager.register_handler("server/status", self._handle_server_status) # Note: set_server attribute not available in BaseTransport # This would need to be added to specific transport implementations if needed async def _handle_list_tools(self, params: Dict) -> Dict: """Handle tools list request via transport.""" capabilities = await self.router.list_capabilities() return { "tools": [ { "name": cap["name"], "description": cap["description"], "inputSchema": cap.get("input_schema", {}) } for cap in capabilities ] } async def _handle_call_tool(self, params: Dict) -> Dict: """Handle tool call request via transport.""" try: tool_name = params.get("name") if tool_name is None: raise ValueError("Tool name is required") arguments = params.get("arguments", {}) # Skip validation for MVP # await self.security_validator.validate_tool_call(mock_request) # Route to capability result = await self.router.route_call(tool_name, arguments) return {"result": result} except Exception as e: logger.error(f"Tool call failed: {e}") error_msg = await self.router.format_error(e) return {"error": error_msg} async def _handle_server_status(self, params: Dict) -> Dict: """Handle server status request.""" return { "server": "katamari-mcp", "version": "0.1.0", "transports": self.transport_manager.get_transport_status(), "acp_enabled": self.acp_controller is not None, "phase3_enabled": any([ self.workflow_optimizer is not None, self.predictive_engine is not None, self.knowledge_transfer is not None, self.self_healing is not None ]), "phase3_components": { "workflow_optimizer": self.workflow_optimizer is not None, "predictive_engine": self.predictive_engine is not None, "knowledge_transfer": self.knowledge_transfer is not None, "self_healing": self.self_healing is not None } } async def _handle_mcp_request(self, method: str, params: Dict) -> Dict: """Handle MCP request directly (for SSE transport).""" if method == "tools/list": capabilities = await self.router.list_capabilities() return { "result": { "tools": [ { "name": cap["name"], "description": cap["description"], "inputSchema": cap.get("input_schema", {}) } for cap in capabilities ] } } elif method == "tools/call": try: tool_name = params.get("name") if tool_name is None: raise ValueError("Tool name is required") arguments = params.get("arguments", {}) result = await self.router.route_call(tool_name, arguments) return { "result": { "content": [{"type": "text", "text": result}], "isError": False } } except Exception as e: logger.error(f"Tool call failed: {e}") error_msg = await self.router.format_error(e) return { "result": { "content": [{"type": "text", "text": error_msg}], "isError": True } } else: return {"error": f"Unknown method: {method}"} async def run(self, transport_names: Optional[List[str]] = None): """Run server with specified transports.""" # Start transport manager await self.transport_manager.start(transport_names) logger.info(f"Katamari MCP server running with transports: {self.transport_manager.active_transports}") # Keep server running try: while self.transport_manager.is_running: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down server...") await self.transport_manager.stop() async def main(): """Main entry point.""" import sys logging.basicConfig(level=logging.INFO) # Parse command line arguments for transport selection transport_names = None transport_config = {} if len(sys.argv) > 1: transport_names = sys.argv[1].split(",") # Build transport config based on requested transports transport_config = { 'transports': { 'stdio': {'enabled': 'stdio' in transport_names}, 'sse': {'enabled': 'sse' in transport_names, 'host': 'localhost', 'port': 49152}, 'websocket': {'enabled': 'websocket' in transport_names, 'host': 'localhost', 'port': 49153} } } server = KatamariServer(transport_config) # Use MCP library's stdio runner for now from mcp.server.stdio import stdio_server from mcp.server.lowlevel.server import NotificationOptions async with stdio_server() as (read_stream, write_stream): await server.server.run( read_stream, write_stream, InitializationOptions( server_name="katamari-mcp", server_version="0.1.0", capabilities=server.server.get_capabilities( notification_options=NotificationOptions( prompts_changed=False, resources_changed=False, tools_changed=False ), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ciphernaut/katamari-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server