"""
MCP Server Implementation
Implements the Model Context Protocol server for Maya integration
"""
import asyncio
import logging
import json
import uuid
from typing import List, Dict, Any, Optional
from datetime import datetime
from models import MAYA_TOOLS, MayaRequest, MayaResponse, validate_tool_input, get_tool_by_name, get_tools_by_category
from config import ServerConfig
from maya_bridge import MayaBridge
class MayaMCPServer:
"""Maya MCP Server implementation"""
def __init__(self, config: ServerConfig, maya_bridge: MayaBridge):
self.config = config
self.maya_bridge = maya_bridge
self.logger = logging.getLogger('maya-mcp-server.mcp')
self._running = False
self.tools = MAYA_TOOLS
self.server_info = {
"name": "maya-mcp-server",
"version": "1.0.0",
"description": "MCP server for Autodesk Maya integration",
"author": "Maya MCP Server Team",
"capabilities": {
"tools": True,
"resources": False,
"prompts": False,
"logging": True
}
}
# Initialize message routing
self._setup_message_handlers()
def _setup_message_handlers(self) -> None:
"""Setup MCP message handlers"""
self.message_handlers = {
"initialize": self._handle_initialize,
"tools/list": self._handle_list_tools,
"tools/call": self._handle_call_tool,
"ping": self._handle_ping,
"notifications/initialized": self._handle_initialized
}
async def _handle_initialize(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP initialize request"""
self.logger.info("Handling initialize request")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"protocolVersion": "2024-11-05",
"capabilities": self.server_info["capabilities"],
"serverInfo": {
"name": self.server_info["name"],
"version": self.server_info["version"]
}
}
}
async def _handle_initialized(self, notification: Dict[str, Any]) -> None:
"""Handle initialized notification"""
self.logger.info("Client initialized")
async def _handle_ping(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle ping request"""
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {}
}
async def _handle_list_tools(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle list tools request"""
self.logger.debug("Handling list tools request")
# Check for category filter in request
params = request.get("params", {})
category = params.get("category", "all")
# Get tools by category
if category == "all":
filtered_tools = self.tools
else:
filtered_tools = get_tools_by_category(category)
tools = []
for maya_tool in filtered_tools:
tool_dict = {
"name": maya_tool.name,
"description": maya_tool.description,
"inputSchema": maya_tool.input_schema
}
tools.append(tool_dict)
self.logger.debug(f"Returning {len(tools)} tools for category '{category}'")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": tools,
"total": len(tools),
"category": category
}
}
async def _handle_call_tool(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Handle call tool request"""
params = request.get("params", {})
name = params.get("name")
arguments = params.get("arguments", {})
self.logger.debug(f"Handling call tool request: {name} with arguments: {arguments}")
# Find and validate the tool
tool = get_tool_by_name(name)
if not tool:
self.logger.error(f"Unknown tool: {name}")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32601,
"message": f"Unknown tool: {name}"
}
}
# Validate input arguments
is_valid, error_message = validate_tool_input(name, arguments)
if not is_valid:
self.logger.error(f"Invalid input for tool {name}: {error_message}")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32602,
"message": f"Invalid parameters: {error_message}"
}
}
# Execute the tool
try:
maya_request = MayaRequest(
command=name,
arguments=arguments,
request_id=str(uuid.uuid4())
)
response = await self.maya_bridge.execute_command(maya_request)
if response.success:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"content": [
{
"type": "text",
"text": str(response.result)
}
]
}
}
else:
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32000,
"message": f"Tool execution failed: {response.error}"
}
}
except Exception as e:
self.logger.error(f"Exception executing tool {name}: {e}")
return {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
async def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process an incoming MCP message"""
try:
method = message.get("method")
if method in self.message_handlers:
handler = self.message_handlers[method]
# Check if it's a notification (no id) or request (has id)
if "id" in message:
# Request - return response
return await handler(message)
else:
# Notification - no response
await handler(message)
return None
else:
self.logger.warning(f"Unknown method: {method}")
if "id" in message:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
return None
except Exception as e:
self.logger.error(f"Error processing message: {e}")
if "id" in message:
return {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
return None
async def list_tools(self) -> List[Dict[str, Any]]:
"""List available tools (legacy method)"""
self.logger.debug("Listing available tools")
tools = []
for maya_tool in self.tools:
tool_dict = {
"name": maya_tool.name,
"description": maya_tool.description,
"inputSchema": maya_tool.input_schema
}
tools.append(tool_dict)
self.logger.debug(f"Returning {len(tools)} tools")
return tools
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool by name"""
self.logger.debug(f"Tool call: {name} with arguments: {arguments}")
# Find the tool
tool_found = False
for maya_tool in self.tools:
if maya_tool.name == name:
tool_found = True
break
if not tool_found:
self.logger.error(f"Unknown tool: {name}")
return {
"content": [{"type": "text", "text": f"Unknown tool: {name}"}],
"isError": True
}
# Create and execute request
try:
request = MayaRequest(
command=name,
arguments=arguments,
request_id=f"{name}_{asyncio.get_event_loop().time()}"
)
response = await self.maya_bridge.execute_command(request)
if response.success:
return {
"content": [{"type": "text", "text": str(response.result)}],
"isError": False
}
else:
return {
"content": [{"type": "text", "text": f"Error: {response.error}"}],
"isError": True
}
except Exception as e:
self.logger.error(f"Exception executing tool {name}: {e}")
return {
"content": [{"type": "text", "text": f"Internal error: {str(e)}"}],
"isError": True
}
async def get_server_info(self) -> Dict[str, Any]:
"""Get server information"""
return {
"name": "maya-mcp-server",
"version": "1.0.0",
"description": "MCP server for Autodesk Maya integration",
"author": "Maya MCP Server Team"
}
async def load_configuration(self) -> None:
"""Load and apply server configuration"""
self.logger.info("Loading server configuration...")
# Log configuration details
self.logger.info(f"Server host: {self.config.host}")
self.logger.info(f"Server port: {self.config.port}")
self.logger.info(f"Maya port: {self.config.maya_port}")
self.logger.info(f"Debug mode: {self.config.debug}")
self.logger.info(f"Log level: {self.config.log_level}")
self.logger.info(f"Timeout: {self.config.timeout}s")
self.logger.info(f"Max retries: {self.config.max_retries}")
# Update server info with configuration
self.server_info.update({
"host": self.config.host,
"port": self.config.port,
"maya_port": self.config.maya_port,
"debug": self.config.debug
})
self.logger.info("Configuration loaded successfully")
async def start(self) -> None:
"""Start the MCP server"""
self.logger.info("Starting MCP server...")
try:
# Load configuration
await self.load_configuration()
# Initialize Maya bridge
await self.maya_bridge.initialize()
self._running = True
self.logger.info(f"MCP server started on {self.config.host}:{self.config.port}")
self.logger.info(f"Maya bridge connected on port {self.config.maya_port}")
except Exception as e:
self.logger.error(f"Failed to start MCP server: {e}")
raise
async def serve_forever(self) -> None:
"""Keep the server running"""
if not self._running:
raise RuntimeError("Server not started")
self.logger.info("MCP server is now serving requests...")
self.logger.info(f"Available tools: {[tool.name for tool in self.tools]}")
try:
# Start stdio communication handler
stdio_task = asyncio.create_task(self.handle_stdio_communication())
# Health check task
async def health_check():
while self._running:
await asyncio.sleep(30) # Check every 30 seconds
if not await self.maya_bridge.is_healthy():
self.logger.warning("Maya bridge is not healthy, attempting reconnection...")
try:
await self.maya_bridge.reconnect()
self.logger.info("Maya bridge reconnected successfully")
except Exception as e:
self.logger.error(f"Failed to reconnect to Maya: {e}")
health_task = asyncio.create_task(health_check())
# Wait for either task to complete
done, pending = await asyncio.wait(
[stdio_task, health_task],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
self.logger.info("Server cancelled")
except Exception as e:
self.logger.error(f"Server error: {e}")
raise
async def handle_stdio_communication(self) -> None:
"""Handle stdio-based MCP communication"""
self.logger.info("Starting stdio communication handler...")
try:
import sys
while self._running:
# Read from stdin
try:
line = await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline
)
if not line:
break
line = line.strip()
if not line:
continue
# Parse JSON message
try:
message = json.loads(line)
self.logger.debug(f"Received message: {message}")
# Process message
response = await self.process_message(message)
# Send response if needed
if response:
response_json = json.dumps(response)
print(response_json, flush=True)
self.logger.debug(f"Sent response: {response}")
except json.JSONDecodeError as e:
self.logger.error(f"Invalid JSON received: {e}")
except Exception as e:
self.logger.error(f"Error reading from stdin: {e}")
break
except Exception as e:
self.logger.error(f"Error in stdio communication: {e}")
self.logger.info("Stdio communication handler stopped")
def get_tool_info(self, tool_name: str) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific tool"""
tool = get_tool_by_name(tool_name)
if not tool:
return None
return {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema,
"category": self._get_tool_category(tool_name)
}
def _get_tool_category(self, tool_name: str) -> str:
"""Get the category of a tool"""
category_map = {
"maya_create": "creation",
"maya_select": "selection",
"maya_get_selection": "selection",
"maya_transform": "transformation",
"maya_delete": "management",
"maya_execute": "execution",
"maya_get_scene_info": "query",
"maya_get_object_info": "query",
"maya_list_objects": "query"
}
return category_map.get(tool_name, "other")
def get_available_categories(self) -> List[str]:
"""Get list of available tool categories"""
categories = set()
for tool in self.tools:
categories.add(self._get_tool_category(tool.name))
return sorted(list(categories))
def search_tools(self, query: str) -> List[Dict[str, Any]]:
"""Search tools by name or description"""
query_lower = query.lower()
matching_tools = []
for tool in self.tools:
if (query_lower in tool.name.lower() or
query_lower in tool.description.lower()):
matching_tools.append({
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema,
"category": self._get_tool_category(tool.name)
})
return matching_tools
async def shutdown(self) -> None:
"""Shutdown the MCP server with proper resource cleanup"""
self.logger.info("Shutting down MCP server...")
self._running = False
try:
# Cancel any pending operations
current_tasks = [task for task in asyncio.all_tasks() if not task.done()]
if current_tasks:
self.logger.info(f"Cancelling {len(current_tasks)} pending tasks...")
for task in current_tasks:
if not task.cancelled():
task.cancel()
# Wait for tasks to complete cancellation
try:
await asyncio.wait_for(
asyncio.gather(*current_tasks, return_exceptions=True),
timeout=5.0
)
except asyncio.TimeoutError:
self.logger.warning("Some tasks did not complete cancellation within timeout")
# Shutdown Maya bridge
await self.maya_bridge.shutdown()
# Clear any cached data
self.tools.clear() if hasattr(self.tools, 'clear') else None
self.message_handlers.clear()
# Log final statistics
self.logger.info("MCP server shutdown complete")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
raise
def is_running(self) -> bool:
"""Check if server is running"""
return self._running