server.py•10.2 kB
"""
Main MCP server implementation - MVP version with no ML dependencies.
"""
import asyncio
import logging
from typing import Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
CallToolRequest,
CallToolResult,
ListToolsRequest,
ListToolsResult,
Tool,
TextContent,
)
# Intelligent router with LLM-powered routing
from katamari_mcp.router.intelligent_router import IntelligentRouter
from katamari_mcp.security.validator import SecurityValidator
from katamari_mcp.utils.config import Config
from katamari_mcp.transport.manager import TransportManager
from katamari_mcp.transport.base import TransportMessage
# Phase 3 components disabled for MVP
# from katamari_mcp.acp.workflow_optimizer import WorkflowOptimizer
# from katamari_mcp.acp.predictive_engine import PredictiveEngine
# from katamari_mcp.acp.knowledge_transfer import KnowledgeTransferEngine
# from katamari_mcp.acp.self_healing import SelfHealingEngine
logger = logging.getLogger(__name__)
class KatamariServer:
"""Main Katamari MCP server with multi-transport support - MVP version."""
def __init__(self, transport_config: Optional[Dict] = None):
self.server = Server("katamari-mcp")
self.router = IntelligentRouter() # Use intelligent router with LLM
self.security_validator = SecurityValidator()
self.config = Config()
# ACP and Phase 3 components disabled for MVP
self.acp_controller = None
self.workflow_optimizer = None
self.predictive_engine = None
self.knowledge_transfer = None
self.self_healing = None
# Initialize transport manager
self.transport_manager = TransportManager(transport_config or self.config.get("transport", {}))
self._setup_handlers()
self._setup_transport_handlers()
def _setup_handlers(self):
"""Setup MCP server handlers."""
@self.server.list_tools()
async def list_tools(request: ListToolsRequest) -> ListToolsResult:
"""List all available capabilities as tools."""
capabilities = await self.router.list_capabilities()
tools = [
Tool(
name=cap["name"],
description=cap["description"],
inputSchema=cap.get("input_schema", {})
)
for cap in capabilities
]
return ListToolsResult(tools=tools)
@self.server.call_tool()
async def call_tool(tool_name: str, arguments: dict) -> CallToolResult:
"""Route tool call to appropriate capability."""
try:
# Route to capability
result = await self.router.route_call(
tool_name=tool_name,
arguments=arguments or {}
)
return CallToolResult(
content=[TextContent(type="text", text=result)],
isError=False
)
except Exception as e:
logger.error(f"Tool call failed: {e}")
error_msg = await self.router.format_error(e)
return CallToolResult(
content=[TextContent(type="text", text=error_msg)],
isError=True
)
def _setup_transport_handlers(self):
"""Setup handlers for transport layer."""
# Register MCP method handlers with transport manager
self.transport_manager.register_handler("tools/list", self._handle_list_tools)
self.transport_manager.register_handler("tools/call", self._handle_call_tool)
self.transport_manager.register_handler("server/status", self._handle_server_status)
# Note: set_server attribute not available in BaseTransport
# This would need to be added to specific transport implementations if needed
async def _handle_list_tools(self, params: Dict) -> Dict:
"""Handle tools list request via transport."""
capabilities = await self.router.list_capabilities()
return {
"tools": [
{
"name": cap["name"],
"description": cap["description"],
"inputSchema": cap.get("input_schema", {})
}
for cap in capabilities
]
}
async def _handle_call_tool(self, params: Dict) -> Dict:
"""Handle tool call request via transport."""
try:
tool_name = params.get("name")
if tool_name is None:
raise ValueError("Tool name is required")
arguments = params.get("arguments", {})
# Skip validation for MVP
# await self.security_validator.validate_tool_call(mock_request)
# Route to capability
result = await self.router.route_call(tool_name, arguments)
return {"result": result}
except Exception as e:
logger.error(f"Tool call failed: {e}")
error_msg = await self.router.format_error(e)
return {"error": error_msg}
async def _handle_server_status(self, params: Dict) -> Dict:
"""Handle server status request."""
return {
"server": "katamari-mcp",
"version": "0.1.0",
"transports": self.transport_manager.get_transport_status(),
"acp_enabled": self.acp_controller is not None,
"phase3_enabled": any([
self.workflow_optimizer is not None,
self.predictive_engine is not None,
self.knowledge_transfer is not None,
self.self_healing is not None
]),
"phase3_components": {
"workflow_optimizer": self.workflow_optimizer is not None,
"predictive_engine": self.predictive_engine is not None,
"knowledge_transfer": self.knowledge_transfer is not None,
"self_healing": self.self_healing is not None
}
}
async def _handle_mcp_request(self, method: str, params: Dict) -> Dict:
"""Handle MCP request directly (for SSE transport)."""
if method == "tools/list":
capabilities = await self.router.list_capabilities()
return {
"result": {
"tools": [
{
"name": cap["name"],
"description": cap["description"],
"inputSchema": cap.get("input_schema", {})
}
for cap in capabilities
]
}
}
elif method == "tools/call":
try:
tool_name = params.get("name")
if tool_name is None:
raise ValueError("Tool name is required")
arguments = params.get("arguments", {})
result = await self.router.route_call(tool_name, arguments)
return {
"result": {
"content": [{"type": "text", "text": result}],
"isError": False
}
}
except Exception as e:
logger.error(f"Tool call failed: {e}")
error_msg = await self.router.format_error(e)
return {
"result": {
"content": [{"type": "text", "text": error_msg}],
"isError": True
}
}
else:
return {"error": f"Unknown method: {method}"}
async def run(self, transport_names: Optional[List[str]] = None):
"""Run server with specified transports."""
# Start transport manager
await self.transport_manager.start(transport_names)
logger.info(f"Katamari MCP server running with transports: {self.transport_manager.active_transports}")
# Keep server running
try:
while self.transport_manager.is_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down server...")
await self.transport_manager.stop()
async def main():
"""Main entry point."""
import sys
logging.basicConfig(level=logging.INFO)
# Parse command line arguments for transport selection
transport_names = None
transport_config = {}
if len(sys.argv) > 1:
transport_names = sys.argv[1].split(",")
# Build transport config based on requested transports
transport_config = {
'transports': {
'stdio': {'enabled': 'stdio' in transport_names},
'sse': {'enabled': 'sse' in transport_names, 'host': 'localhost', 'port': 49152},
'websocket': {'enabled': 'websocket' in transport_names, 'host': 'localhost', 'port': 49153}
}
}
server = KatamariServer(transport_config)
# Use MCP library's stdio runner for now
from mcp.server.stdio import stdio_server
from mcp.server.lowlevel.server import NotificationOptions
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="katamari-mcp",
server_version="0.1.0",
capabilities=server.server.get_capabilities(
notification_options=NotificationOptions(
prompts_changed=False,
resources_changed=False,
tools_changed=False
),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())