Skip to main content
Glama
mcp_server.py19.5 kB
""" MCP Server Implementation Implements the Model Context Protocol server for Maya integration """ import asyncio import logging import json import uuid from typing import List, Dict, Any, Optional from datetime import datetime from models import MAYA_TOOLS, MayaRequest, MayaResponse, validate_tool_input, get_tool_by_name, get_tools_by_category from config import ServerConfig from maya_bridge import MayaBridge class MayaMCPServer: """Maya MCP Server implementation""" def __init__(self, config: ServerConfig, maya_bridge: MayaBridge): self.config = config self.maya_bridge = maya_bridge self.logger = logging.getLogger('maya-mcp-server.mcp') self._running = False self.tools = MAYA_TOOLS self.server_info = { "name": "maya-mcp-server", "version": "1.0.0", "description": "MCP server for Autodesk Maya integration", "author": "Maya MCP Server Team", "capabilities": { "tools": True, "resources": False, "prompts": False, "logging": True } } # Initialize message routing self._setup_message_handlers() def _setup_message_handlers(self) -> None: """Setup MCP message handlers""" self.message_handlers = { "initialize": self._handle_initialize, "tools/list": self._handle_list_tools, "tools/call": self._handle_call_tool, "ping": self._handle_ping, "notifications/initialized": self._handle_initialized } async def _handle_initialize(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP initialize request""" self.logger.info("Handling initialize request") return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "protocolVersion": "2024-11-05", "capabilities": self.server_info["capabilities"], "serverInfo": { "name": self.server_info["name"], "version": self.server_info["version"] } } } async def _handle_initialized(self, notification: Dict[str, Any]) -> None: """Handle initialized notification""" self.logger.info("Client initialized") async def _handle_ping(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle ping request""" return { "jsonrpc": "2.0", "id": request.get("id"), "result": {} } async def _handle_list_tools(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle list tools request""" self.logger.debug("Handling list tools request") # Check for category filter in request params = request.get("params", {}) category = params.get("category", "all") # Get tools by category if category == "all": filtered_tools = self.tools else: filtered_tools = get_tools_by_category(category) tools = [] for maya_tool in filtered_tools: tool_dict = { "name": maya_tool.name, "description": maya_tool.description, "inputSchema": maya_tool.input_schema } tools.append(tool_dict) self.logger.debug(f"Returning {len(tools)} tools for category '{category}'") return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "tools": tools, "total": len(tools), "category": category } } async def _handle_call_tool(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handle call tool request""" params = request.get("params", {}) name = params.get("name") arguments = params.get("arguments", {}) self.logger.debug(f"Handling call tool request: {name} with arguments: {arguments}") # Find and validate the tool tool = get_tool_by_name(name) if not tool: self.logger.error(f"Unknown tool: {name}") return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32601, "message": f"Unknown tool: {name}" } } # Validate input arguments is_valid, error_message = validate_tool_input(name, arguments) if not is_valid: self.logger.error(f"Invalid input for tool {name}: {error_message}") return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32602, "message": f"Invalid parameters: {error_message}" } } # Execute the tool try: maya_request = MayaRequest( command=name, arguments=arguments, request_id=str(uuid.uuid4()) ) response = await self.maya_bridge.execute_command(maya_request) if response.success: return { "jsonrpc": "2.0", "id": request.get("id"), "result": { "content": [ { "type": "text", "text": str(response.result) } ] } } else: return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32000, "message": f"Tool execution failed: {response.error}" } } except Exception as e: self.logger.error(f"Exception executing tool {name}: {e}") return { "jsonrpc": "2.0", "id": request.get("id"), "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } async def process_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process an incoming MCP message""" try: method = message.get("method") if method in self.message_handlers: handler = self.message_handlers[method] # Check if it's a notification (no id) or request (has id) if "id" in message: # Request - return response return await handler(message) else: # Notification - no response await handler(message) return None else: self.logger.warning(f"Unknown method: {method}") if "id" in message: return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32601, "message": f"Method not found: {method}" } } return None except Exception as e: self.logger.error(f"Error processing message: {e}") if "id" in message: return { "jsonrpc": "2.0", "id": message.get("id"), "error": { "code": -32603, "message": f"Internal error: {str(e)}" } } return None async def list_tools(self) -> List[Dict[str, Any]]: """List available tools (legacy method)""" self.logger.debug("Listing available tools") tools = [] for maya_tool in self.tools: tool_dict = { "name": maya_tool.name, "description": maya_tool.description, "inputSchema": maya_tool.input_schema } tools.append(tool_dict) self.logger.debug(f"Returning {len(tools)} tools") return tools async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute a tool by name""" self.logger.debug(f"Tool call: {name} with arguments: {arguments}") # Find the tool tool_found = False for maya_tool in self.tools: if maya_tool.name == name: tool_found = True break if not tool_found: self.logger.error(f"Unknown tool: {name}") return { "content": [{"type": "text", "text": f"Unknown tool: {name}"}], "isError": True } # Create and execute request try: request = MayaRequest( command=name, arguments=arguments, request_id=f"{name}_{asyncio.get_event_loop().time()}" ) response = await self.maya_bridge.execute_command(request) if response.success: return { "content": [{"type": "text", "text": str(response.result)}], "isError": False } else: return { "content": [{"type": "text", "text": f"Error: {response.error}"}], "isError": True } except Exception as e: self.logger.error(f"Exception executing tool {name}: {e}") return { "content": [{"type": "text", "text": f"Internal error: {str(e)}"}], "isError": True } async def get_server_info(self) -> Dict[str, Any]: """Get server information""" return { "name": "maya-mcp-server", "version": "1.0.0", "description": "MCP server for Autodesk Maya integration", "author": "Maya MCP Server Team" } async def load_configuration(self) -> None: """Load and apply server configuration""" self.logger.info("Loading server configuration...") # Log configuration details self.logger.info(f"Server host: {self.config.host}") self.logger.info(f"Server port: {self.config.port}") self.logger.info(f"Maya port: {self.config.maya_port}") self.logger.info(f"Debug mode: {self.config.debug}") self.logger.info(f"Log level: {self.config.log_level}") self.logger.info(f"Timeout: {self.config.timeout}s") self.logger.info(f"Max retries: {self.config.max_retries}") # Update server info with configuration self.server_info.update({ "host": self.config.host, "port": self.config.port, "maya_port": self.config.maya_port, "debug": self.config.debug }) self.logger.info("Configuration loaded successfully") async def start(self) -> None: """Start the MCP server""" self.logger.info("Starting MCP server...") try: # Load configuration await self.load_configuration() # Initialize Maya bridge await self.maya_bridge.initialize() self._running = True self.logger.info(f"MCP server started on {self.config.host}:{self.config.port}") self.logger.info(f"Maya bridge connected on port {self.config.maya_port}") except Exception as e: self.logger.error(f"Failed to start MCP server: {e}") raise async def serve_forever(self) -> None: """Keep the server running""" if not self._running: raise RuntimeError("Server not started") self.logger.info("MCP server is now serving requests...") self.logger.info(f"Available tools: {[tool.name for tool in self.tools]}") try: # Start stdio communication handler stdio_task = asyncio.create_task(self.handle_stdio_communication()) # Health check task async def health_check(): while self._running: await asyncio.sleep(30) # Check every 30 seconds if not await self.maya_bridge.is_healthy(): self.logger.warning("Maya bridge is not healthy, attempting reconnection...") try: await self.maya_bridge.reconnect() self.logger.info("Maya bridge reconnected successfully") except Exception as e: self.logger.error(f"Failed to reconnect to Maya: {e}") health_task = asyncio.create_task(health_check()) # Wait for either task to complete done, pending = await asyncio.wait( [stdio_task, health_task], return_when=asyncio.FIRST_COMPLETED ) # Cancel remaining tasks for task in pending: task.cancel() try: await task except asyncio.CancelledError: pass except asyncio.CancelledError: self.logger.info("Server cancelled") except Exception as e: self.logger.error(f"Server error: {e}") raise async def handle_stdio_communication(self) -> None: """Handle stdio-based MCP communication""" self.logger.info("Starting stdio communication handler...") try: import sys while self._running: # Read from stdin try: line = await asyncio.get_event_loop().run_in_executor( None, sys.stdin.readline ) if not line: break line = line.strip() if not line: continue # Parse JSON message try: message = json.loads(line) self.logger.debug(f"Received message: {message}") # Process message response = await self.process_message(message) # Send response if needed if response: response_json = json.dumps(response) print(response_json, flush=True) self.logger.debug(f"Sent response: {response}") except json.JSONDecodeError as e: self.logger.error(f"Invalid JSON received: {e}") except Exception as e: self.logger.error(f"Error reading from stdin: {e}") break except Exception as e: self.logger.error(f"Error in stdio communication: {e}") self.logger.info("Stdio communication handler stopped") def get_tool_info(self, tool_name: str) -> Optional[Dict[str, Any]]: """Get detailed information about a specific tool""" tool = get_tool_by_name(tool_name) if not tool: return None return { "name": tool.name, "description": tool.description, "inputSchema": tool.input_schema, "category": self._get_tool_category(tool_name) } def _get_tool_category(self, tool_name: str) -> str: """Get the category of a tool""" category_map = { "maya_create": "creation", "maya_select": "selection", "maya_get_selection": "selection", "maya_transform": "transformation", "maya_delete": "management", "maya_execute": "execution", "maya_get_scene_info": "query", "maya_get_object_info": "query", "maya_list_objects": "query" } return category_map.get(tool_name, "other") def get_available_categories(self) -> List[str]: """Get list of available tool categories""" categories = set() for tool in self.tools: categories.add(self._get_tool_category(tool.name)) return sorted(list(categories)) def search_tools(self, query: str) -> List[Dict[str, Any]]: """Search tools by name or description""" query_lower = query.lower() matching_tools = [] for tool in self.tools: if (query_lower in tool.name.lower() or query_lower in tool.description.lower()): matching_tools.append({ "name": tool.name, "description": tool.description, "inputSchema": tool.input_schema, "category": self._get_tool_category(tool.name) }) return matching_tools async def shutdown(self) -> None: """Shutdown the MCP server with proper resource cleanup""" self.logger.info("Shutting down MCP server...") self._running = False try: # Cancel any pending operations current_tasks = [task for task in asyncio.all_tasks() if not task.done()] if current_tasks: self.logger.info(f"Cancelling {len(current_tasks)} pending tasks...") for task in current_tasks: if not task.cancelled(): task.cancel() # Wait for tasks to complete cancellation try: await asyncio.wait_for( asyncio.gather(*current_tasks, return_exceptions=True), timeout=5.0 ) except asyncio.TimeoutError: self.logger.warning("Some tasks did not complete cancellation within timeout") # Shutdown Maya bridge await self.maya_bridge.shutdown() # Clear any cached data self.tools.clear() if hasattr(self.tools, 'clear') else None self.message_handlers.clear() # Log final statistics self.logger.info("MCP server shutdown complete") except Exception as e: self.logger.error(f"Error during shutdown: {e}") raise def is_running(self) -> bool: """Check if server is running""" return self._running

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jeffreytsai1004/maya-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server