Skip to main content
Glama
maya_mcp.py67 kB
""" Maya MCP Plugin Autodesk Maya plugin that enables MCP (Model Context Protocol) communication This plugin should be placed in Maya's plug-ins directory for auto-loading """ import maya.api.OpenMaya as om import maya.cmds as cmds import maya.utils as utils import sys import traceback import json import io import contextlib from typing import Dict, Any, Optional, List, Callable # Plugin information PLUGIN_NAME = "maya_mcp" PLUGIN_VERSION = "1.1.2" PLUGIN_AUTHOR = "Maya MCP Server Team" class ConsoleCapture: """Captures Maya console output for debugging purposes""" def __init__(self, max_lines: int = 1000): self.max_lines = max_lines self.output_buffer: List[str] = [] self.error_buffer: List[str] = [] self.callbacks: List[Callable[[str, str], None]] = [] # (level, message) callbacks self.original_stdout = None self.original_stderr = None self.capturing = False def add_callback(self, callback: Callable[[str, str], None]): """Add a callback function for console output""" self.callbacks.append(callback) def remove_callback(self, callback: Callable[[str, str], None]): """Remove a callback function""" if callback in self.callbacks: self.callbacks.remove(callback) def start_capture(self): """Start capturing console output""" if not self.capturing: self.original_stdout = sys.stdout self.original_stderr = sys.stderr sys.stdout = self._create_capture_wrapper(sys.stdout, "INFO") sys.stderr = self._create_capture_wrapper(sys.stderr, "ERROR") self.capturing = True def stop_capture(self): """Stop capturing console output""" if self.capturing: sys.stdout = self.original_stdout sys.stderr = self.original_stderr self.capturing = False def _create_capture_wrapper(self, original_stream, level: str): """Create a wrapper that captures output while still writing to original stream""" class CaptureWrapper: def __init__(self, original, capture_instance, level): self.original = original self.capture = capture_instance self.level = level def write(self, text): # Write to original stream self.original.write(text) # Capture the output if text.strip(): # Only capture non-empty lines self.capture._add_to_buffer(text.strip(), self.level) def flush(self): self.original.flush() def __getattr__(self, name): return getattr(self.original, name) return CaptureWrapper(original_stream, self, level) def _add_to_buffer(self, message: str, level: str): """Add message to appropriate buffer and notify callbacks""" # Add to buffer if level == "ERROR": self.error_buffer.append(message) if len(self.error_buffer) > self.max_lines: self.error_buffer.pop(0) else: self.output_buffer.append(message) if len(self.output_buffer) > self.max_lines: self.output_buffer.pop(0) # Notify callbacks for callback in self.callbacks: try: callback(level, message) except Exception: pass # Don't let callback errors break the capture def get_output(self, lines: int = None) -> List[str]: """Get recent output lines""" if lines is None: return self.output_buffer.copy() return self.output_buffer[-lines:] if lines > 0 else [] def get_errors(self, lines: int = None) -> List[str]: """Get recent error lines""" if lines is None: return self.error_buffer.copy() return self.error_buffer[-lines:] if lines > 0 else [] def get_all_output(self, lines: int = None) -> Dict[str, List[str]]: """Get both output and errors""" return { "output": self.get_output(lines), "errors": self.get_errors(lines) } def clear_buffers(self): """Clear all captured output""" self.output_buffer.clear() self.error_buffer.clear() class MayaMCPPlugin: """Maya MCP Plugin main class""" def __init__(self): self.command_port = None self.port = 7022 # Default Maya command port self.alternative_ports = [7022, 7023, 7024, 7025, 7026] # Fallback ports self.running = False self.logger = self._setup_logging() self.request_handlers = {} self._setup_request_handlers() # Console capture for debugging self.console_capture = ConsoleCapture(max_lines=1000) self.debug_mode = False self.capture_enabled = False # Connection methods self.connection_methods = ["localhost", "127.0.0.1"] # Local connections only for security # Security settings self.allowed_commands = [ "polyCube", "sphere", "cylinder", "plane", "select", "ls", "getAttr", "setAttr", "move", "rotate", "scale", "delete", "currentTime", "playbackOptions", "file" ] self.blocked_keywords = [ "import os", "import sys", "exec(", "eval(", "__import__", "open(", "file(", "subprocess", "system", "reload", "compile" ] def _setup_logging(self): """Setup logging for the plugin""" import logging logger = logging.getLogger('maya-mcp-plugin') logger.setLevel(logging.INFO) # Create console handler if not logger.handlers: handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def _setup_request_handlers(self): """Setup request handlers for different MCP commands""" self.request_handlers = { "maya_create": self._handle_create_object, "maya_select": self._handle_select_objects, "maya_transform": self._handle_transform_objects, "maya_delete": self._handle_delete_objects, "maya_execute": self._handle_execute_python, "maya_get_selection": self._handle_get_selection, "maya_get_scene_info": self._handle_get_scene_info, "maya_get_object_info": self._handle_get_object_info, "maya_list_objects": self._handle_list_objects, # Console capture and debugging commands "maya_get_console_output": self._handle_get_console_output, "maya_get_log_file": self._handle_get_log_file, "maya_enable_console_capture": self._handle_enable_console_capture, "maya_disable_console_capture": self._handle_disable_console_capture, "maya_clear_console_buffer": self._handle_clear_console_buffer, "maya_set_debug_mode": self._handle_set_debug_mode, "maya_get_debug_info": self._handle_get_debug_info, # Outliner and DAG navigation "maya_browse_outliner": self._handle_browse_outliner, # Maya menu and UI access "maya_get_menu_structure": self._handle_get_menu_structure, "maya_execute_menu_command": self._handle_execute_menu_command, "maya_get_shelf_tools": self._handle_get_shelf_tools, # Plugin management "maya_list_plugins": self._handle_list_plugins, "maya_load_plugin": self._handle_load_plugin, "maya_unload_plugin": self._handle_unload_plugin, # Maya API direct access "maya_api_call": self._handle_api_call, # MayaPy script execution "maya_execute_mayapy": self._handle_execute_mayapy, # Advanced scene navigation "maya_get_node_connections": self._handle_get_node_connections, "maya_search_nodes": self._handle_search_nodes, # Workspace and project tools "maya_get_workspace_info": self._handle_get_workspace_info, "maya_set_workspace": self._handle_set_workspace } def initialize(self): """Initialize the MCP plugin""" try: self.logger.info(f"Initializing {PLUGIN_NAME} v{PLUGIN_VERSION}") # Check for existing plugin instances and clean up self._cleanup_existing_instances() # Start command port server self._start_command_port() # Set running flag self.running = True # Print status to Maya script editor print(f"✓ {PLUGIN_NAME} v{PLUGIN_VERSION} initialized successfully") print(f"✓ Command port active on port {self.port}") print(f"✓ Available commands: {len(self.request_handlers)}") cmds.warning(f"{PLUGIN_NAME} ready on port {self.port}") self.logger.info(f"Available request handlers: {list(self.request_handlers.keys())}") return True except Exception as e: self.logger.error(f"Failed to initialize plugin: {e}") print(f"❌ Failed to initialize {PLUGIN_NAME}: {e}") cmds.error(f"Failed to initialize {PLUGIN_NAME}: {e}") return False def _cleanup_existing_instances(self): """Clean up any existing plugin instances""" try: # Close any existing command ports for port in [7022, 7023, 7024, 7025, 7026]: try: if cmds.commandPort(query=True, name=f":{port}"): cmds.commandPort(name=f":{port}", close=True) self.logger.debug(f"Closed existing command port on {port}") except: pass except Exception as e: self.logger.debug(f"Error during cleanup: {e}") def _start_command_port(self): """Start Maya command port for MCP communication with fallback options""" ports_to_try = [self.port] + self.alternative_ports for port in ports_to_try: try: # Check if command port is already open if cmds.commandPort(query=True, name=f":{port}"): self.logger.info(f"Command port already open on port {port}") self.port = port self.command_port = port return # Try to open command port cmds.commandPort(name=f":{port}", sourceType="python") self.logger.info(f"Command port opened on port {port}") self.port = port self.command_port = port # Test the connection if self._test_command_port(port): self.logger.info(f"Command port {port} is working correctly") return else: self.logger.warning(f"Command port {port} opened but not responding") except Exception as e: self.logger.debug(f"Failed to start command port on port {port}: {e}") continue # If all ports failed, try localhost-only binding self.logger.warning("All standard ports failed, trying localhost-only binding...") try: # Try localhost binding on default port cmds.commandPort(name=f"localhost:{self.port}", sourceType="python") self.logger.info(f"Command port opened on localhost:{self.port}") self.command_port = self.port except Exception as e: self.logger.error(f"Failed to start any command port: {e}") raise Exception("Could not start Maya command port on any available port") def _test_command_port(self, port): """Test if command port is responding""" try: import socket import time # Give the port a moment to initialize time.sleep(0.1) # Try to connect to the port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) result = sock.connect_ex(('localhost', port)) sock.close() return result == 0 except Exception: return False def shutdown(self): """Shutdown the MCP plugin""" try: self.logger.info(f"Shutting down {PLUGIN_NAME}") # Stop console capture if self.capture_enabled: self.console_capture.stop_capture() self.capture_enabled = False self.logger.info("Console capture stopped") # Close command port if self.command_port: try: cmds.commandPort(name=f":{self.command_port}", close=True) self.logger.info(f"Command port {self.command_port} closed") except: pass # Port might already be closed self.running = False cmds.warning(f"{PLUGIN_NAME} shutdown complete") except Exception as e: self.logger.error(f"Error during shutdown: {e}") def _handle_create_object(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle object creation requests""" try: object_type = arguments.get("object_type") name = arguments.get("name") transform = arguments.get("transform", {}) if not object_type: return {"success": False, "error": "object_type is required", "result": None} # Create the object if name: result = getattr(cmds, object_type)(name=name) else: result = getattr(cmds, object_type)() created_object = result[0] if isinstance(result, list) else result # Apply transform if provided if transform: if "translate" in transform: cmds.move(*transform["translate"], created_object) if "rotate" in transform: cmds.rotate(*transform["rotate"], created_object) if "scale" in transform: cmds.scale(*transform["scale"], created_object) return { "success": True, "error": None, "result": f"Created {object_type}: {created_object}" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_select_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle object selection requests""" try: objects = arguments.get("objects", []) add_to_selection = arguments.get("add", False) if not objects: return {"success": False, "error": "objects list is required", "result": None} # Select objects if add_to_selection: cmds.select(objects, add=True) else: cmds.select(objects) selected = cmds.ls(selection=True) return { "success": True, "error": None, "result": f"Selected objects: {selected}" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_transform_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle object transformation requests""" try: objects = arguments.get("objects", []) translate = arguments.get("translate") rotate = arguments.get("rotate") scale = arguments.get("scale") absolute = arguments.get("absolute", False) if not objects: return {"success": False, "error": "objects list is required", "result": None} results = [] for obj in objects: if translate: if absolute: cmds.move(*translate, obj, absolute=True) else: cmds.move(*translate, obj, relative=True) results.append(f"Translated {obj}") if rotate: if absolute: cmds.rotate(*rotate, obj, absolute=True) else: cmds.rotate(*rotate, obj, relative=True) results.append(f"Rotated {obj}") if scale: if absolute: cmds.scale(*scale, obj, absolute=True) else: cmds.scale(*scale, obj, relative=True) results.append(f"Scaled {obj}") return { "success": True, "error": None, "result": "; ".join(results) } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_delete_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle object deletion requests""" try: objects = arguments.get("objects", []) if not objects: return {"success": False, "error": "objects list is required", "result": None} # Delete objects cmds.delete(objects) return { "success": True, "error": None, "result": f"Deleted objects: {objects}" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_execute_python(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle Python code execution requests""" try: command = arguments.get("command", "") safe_mode = arguments.get("safe_mode", True) if not command: return {"success": False, "error": "command is required", "result": None} # Security check if safe mode is enabled if safe_mode: for keyword in self.blocked_keywords: if keyword in command: return { "success": False, "error": f"Command contains blocked keyword: {keyword}", "result": None } # Execute command try: # Use exec for statements, eval for expressions if any(stmt in command for stmt in ["=", "import", "def", "class", "for", "while", "if"]): exec(command) result = "Command executed successfully" else: result = eval(command) return { "success": True, "error": None, "result": str(result) if result is not None else "Command executed successfully" } except Exception as e: return {"success": False, "error": str(e), "result": None} except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_selection(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get selection requests""" try: long_names = arguments.get("long_names", False) if long_names: selected = cmds.ls(selection=True, long=True) else: selected = cmds.ls(selection=True) return { "success": True, "error": None, "result": selected } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_scene_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get scene info requests""" try: include_transforms = arguments.get("include_transforms", True) include_attributes = arguments.get("include_attributes", False) scene_info = { "scene_name": cmds.file(query=True, sceneName=True) or "untitled", "selected_objects": cmds.ls(selection=True), "total_objects": len(cmds.ls(dag=True)), "current_time": cmds.currentTime(query=True), "playback_range": [ cmds.playbackOptions(query=True, minTime=True), cmds.playbackOptions(query=True, maxTime=True) ] } if include_transforms: transforms = cmds.ls(type="transform") scene_info["transforms"] = len(transforms) if include_attributes: # This would be expensive, so limit to selected objects selected = cmds.ls(selection=True) if selected: attrs = {} for obj in selected[:5]: # Limit to first 5 objects try: attrs[obj] = cmds.listAttr(obj) except: pass scene_info["attributes"] = attrs return { "success": True, "error": None, "result": scene_info } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_object_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get object info requests""" try: object_name = arguments.get("object_name") include_attributes = arguments.get("include_attributes", True) include_connections = arguments.get("include_connections", False) if not object_name: return {"success": False, "error": "object_name is required", "result": None} # Check if object exists if not cmds.objExists(object_name): return {"success": False, "error": f"Object {object_name} does not exist", "result": None} object_info = { "name": object_name, "type": cmds.objectType(object_name), "exists": True } # Get transform information try: if cmds.objectType(object_name) == "transform": object_info["transform"] = { "translate": cmds.getAttr(f"{object_name}.translate")[0], "rotate": cmds.getAttr(f"{object_name}.rotate")[0], "scale": cmds.getAttr(f"{object_name}.scale")[0] } except: pass # Get attributes if requested if include_attributes: try: object_info["attributes"] = cmds.listAttr(object_name) except: object_info["attributes"] = [] # Get connections if requested if include_connections: try: object_info["connections"] = cmds.listConnections(object_name) or [] except: object_info["connections"] = [] return { "success": True, "error": None, "result": object_info } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_list_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle list objects requests""" try: object_type = arguments.get("object_type", "all") pattern = arguments.get("pattern") if object_type == "all": objects = cmds.ls(dag=True) elif object_type == "transform": objects = cmds.ls(type="transform") elif object_type == "mesh": objects = cmds.ls(type="mesh") elif object_type == "camera": objects = cmds.ls(type="camera") elif object_type == "light": objects = cmds.ls(type="light") elif object_type == "material": objects = cmds.ls(materials=True) else: objects = cmds.ls(type=object_type) # Apply pattern filter if provided if pattern: objects = [obj for obj in objects if pattern in obj] return { "success": True, "error": None, "result": { "objects": objects, "count": len(objects), "type": object_type } } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_console_output(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get console output requests""" try: lines = arguments.get("lines", None) include_errors = arguments.get("include_errors", True) if include_errors: result = self.console_capture.get_all_output(lines) else: result = {"output": self.console_capture.get_output(lines)} result["capture_enabled"] = self.capture_enabled result["total_lines"] = len(self.console_capture.output_buffer) result["total_errors"] = len(self.console_capture.error_buffer) return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_enable_console_capture(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle enable console capture requests""" try: max_lines = arguments.get("max_lines", 1000) # Update max lines if provided self.console_capture.max_lines = max_lines # Start capture if not already running if not self.capture_enabled: self.console_capture.start_capture() self.capture_enabled = True # Add callback for debug logging def debug_callback(level: str, message: str): if self.debug_mode: self.logger.debug(f"Maya Console [{level}]: {message}") self.console_capture.add_callback(debug_callback) return { "success": True, "error": None, "result": f"Console capture enabled with max {max_lines} lines" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_disable_console_capture(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle disable console capture requests""" try: if self.capture_enabled: self.console_capture.stop_capture() self.capture_enabled = False return { "success": True, "error": None, "result": "Console capture disabled" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_clear_console_buffer(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle clear console buffer requests""" try: self.console_capture.clear_buffers() return { "success": True, "error": None, "result": "Console buffers cleared" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_set_debug_mode(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle set debug mode requests""" try: enabled = arguments.get("enabled", True) self.debug_mode = enabled # Update logger level if enabled: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.INFO) return { "success": True, "error": None, "result": f"Debug mode {'enabled' if enabled else 'disabled'}" } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_debug_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get debug info requests""" try: debug_info = { "plugin_name": PLUGIN_NAME, "plugin_version": PLUGIN_VERSION, "debug_mode": self.debug_mode, "console_capture_enabled": self.capture_enabled, "console_buffer_size": len(self.console_capture.output_buffer), "error_buffer_size": len(self.console_capture.error_buffer), "active_port": self.port, "command_port_status": None, "maya_version": cmds.about(version=True), "maya_api_version": cmds.about(apiVersion=True), "python_version": sys.version, "recent_errors": self.console_capture.get_errors(5), "recent_output": self.console_capture.get_output(5) } # Check command port status try: debug_info["command_port_status"] = cmds.commandPort( query=True, name=f":{self.port}" ) except: debug_info["command_port_status"] = False return { "success": True, "error": None, "result": debug_info } except Exception as e: return {"success": False, "error": str(e), "result": None} def execute_command(self, command: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute a command with given arguments""" try: if command in self.request_handlers: handler = self.request_handlers[command] return handler(arguments) else: return { "success": False, "error": f"Unknown command: {command}", "result": None } except Exception as e: self.logger.error(f"Error executing command {command}: {e}") return { "success": False, "error": f"Internal error: {str(e)}", "result": None } # Additional handler methods for new functionality def _handle_get_log_file(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get Maya log file requests""" try: log_type = arguments.get("log_type", "output") lines = arguments.get("lines", None) import os # Get Maya log directory maya_app_dir = cmds.internalVar(userAppDir=True) log_files = { "output": os.path.join(maya_app_dir, "mayalog.txt"), "error": os.path.join(maya_app_dir, "mayaerror.txt"), "script_editor": os.path.join(maya_app_dir, "scriptEditorTemp.mel"), "command_history": os.path.join(maya_app_dir, "commandHistory.txt") } log_file = log_files.get(log_type) if not log_file or not os.path.exists(log_file): return { "success": False, "error": f"Log file not found: {log_type}", "result": None } # Read log file try: with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: if lines: # Read last N lines all_lines = f.readlines() content = ''.join(all_lines[-lines:]) else: content = f.read() return { "success": True, "error": None, "result": { "log_type": log_type, "file_path": log_file, "content": content, "lines_returned": len(content.split('\n')) if content else 0 } } except Exception as e: return { "success": False, "error": f"Error reading log file: {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_browse_outliner(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle browse outliner hierarchy requests""" try: root_node = arguments.get("root_node", "|") depth = arguments.get("depth", 3) include_shapes = arguments.get("include_shapes", True) filter_type = arguments.get("filter_type", "all") def get_hierarchy(node, current_depth, max_depth): if current_depth >= max_depth: return None children = cmds.listRelatives(node, children=True, fullPath=True) or [] # Filter children based on type if filter_type != "all": filtered_children = [] for child in children: try: node_type = cmds.objectType(child) if filter_type == "transform" and node_type == "transform": filtered_children.append(child) elif filter_type == "mesh" and node_type == "mesh": filtered_children.append(child) elif filter_type == "camera" and node_type == "camera": filtered_children.append(child) elif filter_type == "light" and "Light" in node_type: filtered_children.append(child) elif filter_type == "joint" and node_type == "joint": filtered_children.append(child) except: pass children = filtered_children # Filter shapes if not included if not include_shapes: children = [child for child in children if cmds.objectType(child) != "mesh"] hierarchy = { "name": node.split("|")[-1], "full_path": node, "type": cmds.objectType(node) if cmds.objExists(node) else "unknown", "children": [] } for child in children: child_hierarchy = get_hierarchy(child, current_depth + 1, max_depth) if child_hierarchy: hierarchy["children"].append(child_hierarchy) return hierarchy # Start from root node if root_node == "|" or root_node == "world": # Get top-level transforms top_level = cmds.ls(assemblies=True) result = { "name": "world", "full_path": "|", "type": "world", "children": [] } for node in top_level: child_hierarchy = get_hierarchy(node, 0, depth) if child_hierarchy: result["children"].append(child_hierarchy) else: result = get_hierarchy(root_node, 0, depth) return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_menu_structure(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get Maya menu structure requests""" try: menu_name = arguments.get("menu_name") include_submenus = arguments.get("include_submenus", True) # Basic menu structure info (simplified for safety) if menu_name: result = { "menu_name": menu_name, "message": "Menu structure access requires Maya UI to be active", "available": False } else: result = { "main_menus": [ {"name": "File", "label": "File"}, {"name": "Edit", "label": "Edit"}, {"name": "Create", "label": "Create"}, {"name": "Select", "label": "Select"}, {"name": "Modify", "label": "Modify"}, {"name": "Display", "label": "Display"}, {"name": "Windows", "label": "Windows"}, {"name": "Help", "label": "Help"} ], "message": "Basic menu list - detailed access requires Maya UI" } return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_execute_menu_command(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle execute Maya menu command requests""" try: command = arguments.get("command") cmd_arguments = arguments.get("arguments", {}) if not command: return {"success": False, "error": "command is required", "result": None} # Execute the command safely try: if hasattr(cmds, command): if cmd_arguments: result = getattr(cmds, command)(**cmd_arguments) else: result = getattr(cmds, command)() return { "success": True, "error": None, "result": f"Command '{command}' executed successfully: {result}" } else: return { "success": False, "error": f"Command '{command}' not found in Maya cmds", "result": None } except Exception as e: return { "success": False, "error": f"Failed to execute command '{command}': {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_shelf_tools(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get Maya shelf tools requests""" try: shelf_name = arguments.get("shelf_name") # Simplified shelf info (UI-dependent features) result = { "message": "Shelf access requires Maya UI to be active", "available_shelves": [ "General", "Polygons", "Subdiv Surfaces", "Deformation", "Animation", "Dynamics", "Rendering", "nDynamics", "MASH", "XGen", "Arnold", "Toon" ], "note": "Use maya_execute with shelf commands for programmatic access" } return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_list_plugins(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle list Maya plugins requests""" try: loaded_only = arguments.get("loaded_only", False) include_info = arguments.get("include_info", True) # Get all plugins all_plugins = cmds.pluginInfo(query=True, listPlugins=True) or [] result = { "plugins": [], "total_count": len(all_plugins), "loaded_count": 0 } for plugin in all_plugins: try: is_loaded = cmds.pluginInfo(plugin, query=True, loaded=True) if loaded_only and not is_loaded: continue if is_loaded: result["loaded_count"] += 1 plugin_info = { "name": plugin, "loaded": is_loaded } if include_info and is_loaded: try: plugin_info.update({ "version": cmds.pluginInfo(plugin, query=True, version=True) or "Unknown", "vendor": cmds.pluginInfo(plugin, query=True, vendor=True) or "Unknown", "path": cmds.pluginInfo(plugin, query=True, path=True) or "" }) except: pass result["plugins"].append(plugin_info) except Exception as e: self.logger.debug(f"Error getting info for plugin {plugin}: {e}") continue return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_load_plugin(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle load Maya plugin requests""" try: plugin_name = arguments.get("plugin_name") quiet = arguments.get("quiet", False) if not plugin_name: return {"success": False, "error": "plugin_name is required", "result": None} try: # Check if already loaded if cmds.pluginInfo(plugin_name, query=True, loaded=True): return { "success": True, "error": None, "result": f"Plugin '{plugin_name}' is already loaded" } # Load the plugin cmds.loadPlugin(plugin_name, quiet=quiet) return { "success": True, "error": None, "result": f"Plugin '{plugin_name}' loaded successfully" } except Exception as e: return { "success": False, "error": f"Failed to load plugin '{plugin_name}': {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_unload_plugin(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle unload Maya plugin requests""" try: plugin_name = arguments.get("plugin_name") force = arguments.get("force", False) if not plugin_name: return {"success": False, "error": "plugin_name is required", "result": None} try: # Check if loaded if not cmds.pluginInfo(plugin_name, query=True, loaded=True): return { "success": True, "error": None, "result": f"Plugin '{plugin_name}' is not loaded" } # Unload the plugin cmds.unloadPlugin(plugin_name, force=force) return { "success": True, "error": None, "result": f"Plugin '{plugin_name}' unloaded successfully" } except Exception as e: return { "success": False, "error": f"Failed to unload plugin '{plugin_name}': {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_api_call(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle Maya API direct call requests""" try: api_module = arguments.get("api_module", "OpenMaya") function_name = arguments.get("function_name") api_arguments = arguments.get("arguments", []) return_type = arguments.get("return_type", "object") if not function_name: return {"success": False, "error": "function_name is required", "result": None} try: # Import the API module if api_module == "OpenMaya": import maya.api.OpenMaya as api elif api_module == "OpenMayaUI": import maya.api.OpenMayaUI as api elif api_module == "OpenMayaAnim": import maya.api.OpenMayaAnim as api elif api_module == "OpenMayaFX": import maya.api.OpenMayaFX as api elif api_module == "OpenMayaRender": import maya.api.OpenMayaRender as api else: return { "success": False, "error": f"Unknown API module: {api_module}", "result": None } # Get the function if not hasattr(api, function_name): return { "success": False, "error": f"Function '{function_name}' not found in {api_module}", "result": None } api_function = getattr(api, function_name) # Call the function if api_arguments: result = api_function(*api_arguments) else: result = api_function() # Convert result based on return type if return_type == "string": result = str(result) elif return_type == "int": result = int(result) elif return_type == "float": result = float(result) elif return_type == "bool": result = bool(result) # For "object", keep as is return { "success": True, "error": None, "result": result } except Exception as e: return { "success": False, "error": f"API call failed: {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_execute_mayapy(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle MayaPy script execution requests""" try: script_path = arguments.get("script_path") script_content = arguments.get("script_content") script_arguments = arguments.get("arguments", []) working_directory = arguments.get("working_directory") timeout = arguments.get("timeout", 30) if not script_path and not script_content: return { "success": False, "error": "Either script_path or script_content is required", "result": None } # For security reasons, limit MayaPy execution in this context return { "success": False, "error": "MayaPy execution is restricted for security reasons. Use maya_execute for Python code within Maya.", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_node_connections(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get node connections requests""" try: node_name = arguments.get("node_name") connection_type = arguments.get("connection_type", "both") include_attributes = arguments.get("include_attributes", True) if not node_name: return {"success": False, "error": "node_name is required", "result": None} if not cmds.objExists(node_name): return {"success": False, "error": f"Node '{node_name}' does not exist", "result": None} result = { "node_name": node_name, "connections": {} } # Get incoming connections if connection_type in ["incoming", "both"]: incoming = cmds.listConnections(node_name, source=True, destination=False, plugs=include_attributes) or [] result["connections"]["incoming"] = incoming # Get outgoing connections if connection_type in ["outgoing", "both"]: outgoing = cmds.listConnections(node_name, source=False, destination=True, plugs=include_attributes) or [] result["connections"]["outgoing"] = outgoing # Get detailed attribute connections if requested if include_attributes: try: all_connections = cmds.listConnections(node_name, connections=True, plugs=True) or [] # Parse connections into pairs connection_pairs = [] for i in range(0, len(all_connections), 2): if i + 1 < len(all_connections): connection_pairs.append({ "source": all_connections[i], "destination": all_connections[i + 1] }) result["attribute_connections"] = connection_pairs except: pass return { "success": True, "error": None, "result": result } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_search_nodes(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle advanced node search requests""" try: search_pattern = arguments.get("search_pattern", "*") node_type = arguments.get("node_type") has_attribute = arguments.get("has_attribute") attribute_value = arguments.get("attribute_value") in_hierarchy = arguments.get("in_hierarchy") # Start with basic search if node_type: nodes = cmds.ls(search_pattern, type=node_type) else: nodes = cmds.ls(search_pattern) # Filter by hierarchy if specified if in_hierarchy: hierarchy_nodes = cmds.listRelatives(in_hierarchy, allDescendents=True, fullPath=True) or [] hierarchy_nodes.append(in_hierarchy) # Include root nodes = [node for node in nodes if node in hierarchy_nodes] # Filter by attribute existence if has_attribute: filtered_nodes = [] for node in nodes: try: if cmds.attributeQuery(has_attribute, node=node, exists=True): filtered_nodes.append(node) except: pass nodes = filtered_nodes # Filter by attribute value if has_attribute and attribute_value: filtered_nodes = [] for node in nodes: try: current_value = str(cmds.getAttr(f"{node}.{has_attribute}")) if attribute_value in current_value: filtered_nodes.append(node) except: pass nodes = filtered_nodes # Prepare detailed results detailed_results = [] for node in nodes[:100]: # Limit to 100 results try: node_info = { "name": node, "type": cmds.objectType(node), "full_path": node } # Add attribute info if searching by attribute if has_attribute: try: node_info["attribute_value"] = cmds.getAttr(f"{node}.{has_attribute}") except: pass detailed_results.append(node_info) except: pass return { "success": True, "error": None, "result": { "search_criteria": { "pattern": search_pattern, "type": node_type, "attribute": has_attribute, "attribute_value": attribute_value, "hierarchy": in_hierarchy }, "total_found": len(nodes), "returned_count": len(detailed_results), "nodes": detailed_results } } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_get_workspace_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get Maya workspace info requests""" try: include_rules = arguments.get("include_rules", True) workspace_info = { "current_workspace": cmds.workspace(query=True, rootDirectory=True), "active": cmds.workspace(query=True, active=True) } if include_rules: try: # Get workspace rules rules = {} file_rule_list = cmds.workspace(query=True, fileRuleList=True) or [] for rule in file_rule_list: try: rule_entry = cmds.workspace(fileRule=rule, query=True) rules[rule] = rule_entry except: pass workspace_info["file_rules"] = rules # Get variable list variables = {} var_list = cmds.workspace(query=True, variableList=True) or [] for var in var_list: try: var_entry = cmds.workspace(variable=var, query=True) variables[var] = var_entry except: pass workspace_info["variables"] = variables except Exception as e: workspace_info["rules_error"] = str(e) return { "success": True, "error": None, "result": workspace_info } except Exception as e: return {"success": False, "error": str(e), "result": None} def _handle_set_workspace(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle set Maya workspace requests""" try: workspace_path = arguments.get("workspace_path") create_if_missing = arguments.get("create_if_missing", False) if not workspace_path: return {"success": False, "error": "workspace_path is required", "result": None} import os # Check if path exists if not os.path.exists(workspace_path): if create_if_missing: try: os.makedirs(workspace_path, exist_ok=True) except Exception as e: return { "success": False, "error": f"Failed to create workspace directory: {str(e)}", "result": None } else: return { "success": False, "error": f"Workspace path does not exist: {workspace_path}", "result": None } try: # Set the workspace cmds.workspace(workspace_path, openWorkspace=True) return { "success": True, "error": None, "result": f"Workspace set to: {workspace_path}" } except Exception as e: return { "success": False, "error": f"Failed to set workspace: {str(e)}", "result": None } except Exception as e: return {"success": False, "error": str(e), "result": None} # Global plugin instance _maya_mcp_plugin = None def maya_useNewAPI(): """Tell Maya to use the new API""" pass def initializePlugin(plugin): """Initialize the plugin when Maya loads it""" global _maya_mcp_plugin try: # Create plugin instance _maya_mcp_plugin = MayaMCPPlugin() # Initialize plugin if _maya_mcp_plugin.initialize(): print(f"Successfully loaded {PLUGIN_NAME} v{PLUGIN_VERSION}") return True else: print(f"Failed to initialize {PLUGIN_NAME}") return False except Exception as e: print(f"Error loading {PLUGIN_NAME}: {e}") traceback.print_exc() return False def uninitializePlugin(plugin): """Uninitialize the plugin when Maya unloads it""" global _maya_mcp_plugin try: if _maya_mcp_plugin: _maya_mcp_plugin.shutdown() _maya_mcp_plugin = None print(f"Successfully unloaded {PLUGIN_NAME}") return True except Exception as e: print(f"Error unloading {PLUGIN_NAME}: {e}") return False # Utility functions that can be called from MCP server def mcp_get_plugin_info(): """Get plugin information""" return { "name": PLUGIN_NAME, "version": PLUGIN_VERSION, "author": PLUGIN_AUTHOR, "port": _maya_mcp_plugin.port if _maya_mcp_plugin else None, "status": "running" if _maya_mcp_plugin and _maya_mcp_plugin.running else "stopped" } def mcp_execute_command(command: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute a MCP command with arguments""" global _maya_mcp_plugin if _maya_mcp_plugin and _maya_mcp_plugin.running: return _maya_mcp_plugin.execute_command(command, arguments) else: return { "success": False, "error": "Maya MCP Plugin is not running", "result": None } def mcp_execute_safe_command(command_str): """Execute a command safely with error handling (legacy function)""" return mcp_execute_command("maya_execute", {"command": command_str, "safe_mode": True}) def mcp_get_scene_status(): """Get current Maya scene status (legacy function)""" return mcp_execute_command("maya_get_scene_info", {"include_transforms": True}) def mcp_get_connection_status(): """Get detailed connection status for diagnostics""" global _maya_mcp_plugin if not _maya_mcp_plugin: return { "plugin_loaded": False, "error": "Plugin not loaded" } status = { "plugin_loaded": True, "plugin_running": _maya_mcp_plugin.running, "plugin_name": PLUGIN_NAME, "plugin_version": PLUGIN_VERSION, "active_port": _maya_mcp_plugin.port if _maya_mcp_plugin.running else None, "command_port_active": False, "available_ports": [], "firewall_status": "unknown" } # Check command port status if _maya_mcp_plugin.running and _maya_mcp_plugin.command_port: try: status["command_port_active"] = cmds.commandPort( query=True, name=f":{_maya_mcp_plugin.command_port}" ) except: status["command_port_active"] = False # Check available ports for port in [7022, 7023, 7024, 7025, 7026]: try: if cmds.commandPort(query=True, name=f":{port}"): status["available_ports"].append(port) except: pass # Test firewall status try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('localhost', _maya_mcp_plugin.port)) sock.close() status["firewall_status"] = "open" if result == 0 else "blocked" except: status["firewall_status"] = "error" return status def mcp_diagnose_connection(): """Diagnose connection issues and provide solutions""" status = mcp_get_connection_status() diagnosis = { "status": status, "issues": [], "solutions": [] } # Check for issues if not status["plugin_loaded"]: diagnosis["issues"].append("Plugin not loaded") diagnosis["solutions"].append("Load the maya_mcp plugin via Plugin Manager") if not status["plugin_running"]: diagnosis["issues"].append("Plugin not running") diagnosis["solutions"].append("Restart the plugin or reload Maya") if not status["command_port_active"]: diagnosis["issues"].append("Command port not active") diagnosis["solutions"].append("Check if port is blocked by firewall") if status["firewall_status"] == "blocked": diagnosis["issues"].append("Port blocked by firewall") diagnosis["solutions"].extend([ "Add Maya to Windows Firewall exceptions", "Try alternative ports: " + str([7022, 7023, 7024, 7025, 7026]), "Use localhost-only binding" ]) if len(status["available_ports"]) == 0: diagnosis["issues"].append("No command ports available") diagnosis["solutions"].append("Restart Maya and reload plugin") return diagnosis # Auto-load the plugin when Maya starts (if placed in plug-ins directory) if __name__ == "__main__": # This will run if the script is executed directly print(f"Loading {PLUGIN_NAME} v{PLUGIN_VERSION}") # Try to load the plugin try: if not cmds.pluginInfo(PLUGIN_NAME, query=True, loaded=True): cmds.loadPlugin(__file__) except: # Plugin might already be loaded or there might be an error pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jeffreytsai1004/maya-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server