"""
Maya MCP Plugin
Autodesk Maya plugin that enables MCP (Model Context Protocol) communication
This plugin should be placed in Maya's plug-ins directory for auto-loading
"""
import maya.api.OpenMaya as om
import maya.cmds as cmds
import maya.utils as utils
import sys
import traceback
import json
import io
import contextlib
from typing import Dict, Any, Optional, List, Callable
# Plugin information
PLUGIN_NAME = "maya_mcp"
PLUGIN_VERSION = "1.1.2"
PLUGIN_AUTHOR = "Maya MCP Server Team"
class ConsoleCapture:
"""Captures Maya console output for debugging purposes"""
def __init__(self, max_lines: int = 1000):
self.max_lines = max_lines
self.output_buffer: List[str] = []
self.error_buffer: List[str] = []
self.callbacks: List[Callable[[str, str], None]] = [] # (level, message) callbacks
self.original_stdout = None
self.original_stderr = None
self.capturing = False
def add_callback(self, callback: Callable[[str, str], None]):
"""Add a callback function for console output"""
self.callbacks.append(callback)
def remove_callback(self, callback: Callable[[str, str], None]):
"""Remove a callback function"""
if callback in self.callbacks:
self.callbacks.remove(callback)
def start_capture(self):
"""Start capturing console output"""
if not self.capturing:
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
sys.stdout = self._create_capture_wrapper(sys.stdout, "INFO")
sys.stderr = self._create_capture_wrapper(sys.stderr, "ERROR")
self.capturing = True
def stop_capture(self):
"""Stop capturing console output"""
if self.capturing:
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
self.capturing = False
def _create_capture_wrapper(self, original_stream, level: str):
"""Create a wrapper that captures output while still writing to original stream"""
class CaptureWrapper:
def __init__(self, original, capture_instance, level):
self.original = original
self.capture = capture_instance
self.level = level
def write(self, text):
# Write to original stream
self.original.write(text)
# Capture the output
if text.strip(): # Only capture non-empty lines
self.capture._add_to_buffer(text.strip(), self.level)
def flush(self):
self.original.flush()
def __getattr__(self, name):
return getattr(self.original, name)
return CaptureWrapper(original_stream, self, level)
def _add_to_buffer(self, message: str, level: str):
"""Add message to appropriate buffer and notify callbacks"""
# Add to buffer
if level == "ERROR":
self.error_buffer.append(message)
if len(self.error_buffer) > self.max_lines:
self.error_buffer.pop(0)
else:
self.output_buffer.append(message)
if len(self.output_buffer) > self.max_lines:
self.output_buffer.pop(0)
# Notify callbacks
for callback in self.callbacks:
try:
callback(level, message)
except Exception:
pass # Don't let callback errors break the capture
def get_output(self, lines: int = None) -> List[str]:
"""Get recent output lines"""
if lines is None:
return self.output_buffer.copy()
return self.output_buffer[-lines:] if lines > 0 else []
def get_errors(self, lines: int = None) -> List[str]:
"""Get recent error lines"""
if lines is None:
return self.error_buffer.copy()
return self.error_buffer[-lines:] if lines > 0 else []
def get_all_output(self, lines: int = None) -> Dict[str, List[str]]:
"""Get both output and errors"""
return {
"output": self.get_output(lines),
"errors": self.get_errors(lines)
}
def clear_buffers(self):
"""Clear all captured output"""
self.output_buffer.clear()
self.error_buffer.clear()
class MayaMCPPlugin:
"""Maya MCP Plugin main class"""
def __init__(self):
self.command_port = None
self.port = 7022 # Default Maya command port
self.alternative_ports = [7022, 7023, 7024, 7025, 7026] # Fallback ports
self.running = False
self.logger = self._setup_logging()
self.request_handlers = {}
self._setup_request_handlers()
# Console capture for debugging
self.console_capture = ConsoleCapture(max_lines=1000)
self.debug_mode = False
self.capture_enabled = False
# Connection methods
self.connection_methods = ["localhost", "127.0.0.1"] # Local connections only for security
# Security settings
self.allowed_commands = [
"polyCube", "sphere", "cylinder", "plane", "select", "ls",
"getAttr", "setAttr", "move", "rotate", "scale", "delete",
"currentTime", "playbackOptions", "file"
]
self.blocked_keywords = [
"import os", "import sys", "exec(", "eval(", "__import__",
"open(", "file(", "subprocess", "system", "reload", "compile"
]
def _setup_logging(self):
"""Setup logging for the plugin"""
import logging
logger = logging.getLogger('maya-mcp-plugin')
logger.setLevel(logging.INFO)
# Create console handler
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _setup_request_handlers(self):
"""Setup request handlers for different MCP commands"""
self.request_handlers = {
"maya_create": self._handle_create_object,
"maya_select": self._handle_select_objects,
"maya_transform": self._handle_transform_objects,
"maya_delete": self._handle_delete_objects,
"maya_execute": self._handle_execute_python,
"maya_get_selection": self._handle_get_selection,
"maya_get_scene_info": self._handle_get_scene_info,
"maya_get_object_info": self._handle_get_object_info,
"maya_list_objects": self._handle_list_objects,
# Console capture and debugging commands
"maya_get_console_output": self._handle_get_console_output,
"maya_get_log_file": self._handle_get_log_file,
"maya_enable_console_capture": self._handle_enable_console_capture,
"maya_disable_console_capture": self._handle_disable_console_capture,
"maya_clear_console_buffer": self._handle_clear_console_buffer,
"maya_set_debug_mode": self._handle_set_debug_mode,
"maya_get_debug_info": self._handle_get_debug_info,
# Outliner and DAG navigation
"maya_browse_outliner": self._handle_browse_outliner,
# Maya menu and UI access
"maya_get_menu_structure": self._handle_get_menu_structure,
"maya_execute_menu_command": self._handle_execute_menu_command,
"maya_get_shelf_tools": self._handle_get_shelf_tools,
# Plugin management
"maya_list_plugins": self._handle_list_plugins,
"maya_load_plugin": self._handle_load_plugin,
"maya_unload_plugin": self._handle_unload_plugin,
# Maya API direct access
"maya_api_call": self._handle_api_call,
# MayaPy script execution
"maya_execute_mayapy": self._handle_execute_mayapy,
# Advanced scene navigation
"maya_get_node_connections": self._handle_get_node_connections,
"maya_search_nodes": self._handle_search_nodes,
# Workspace and project tools
"maya_get_workspace_info": self._handle_get_workspace_info,
"maya_set_workspace": self._handle_set_workspace
}
def initialize(self):
"""Initialize the MCP plugin"""
try:
self.logger.info(f"Initializing {PLUGIN_NAME} v{PLUGIN_VERSION}")
# Check for existing plugin instances and clean up
self._cleanup_existing_instances()
# Start command port server
self._start_command_port()
# Set running flag
self.running = True
# Print status to Maya script editor
print(f"✓ {PLUGIN_NAME} v{PLUGIN_VERSION} initialized successfully")
print(f"✓ Command port active on port {self.port}")
print(f"✓ Available commands: {len(self.request_handlers)}")
cmds.warning(f"{PLUGIN_NAME} ready on port {self.port}")
self.logger.info(f"Available request handlers: {list(self.request_handlers.keys())}")
return True
except Exception as e:
self.logger.error(f"Failed to initialize plugin: {e}")
print(f"❌ Failed to initialize {PLUGIN_NAME}: {e}")
cmds.error(f"Failed to initialize {PLUGIN_NAME}: {e}")
return False
def _cleanup_existing_instances(self):
"""Clean up any existing plugin instances"""
try:
# Close any existing command ports
for port in [7022, 7023, 7024, 7025, 7026]:
try:
if cmds.commandPort(query=True, name=f":{port}"):
cmds.commandPort(name=f":{port}", close=True)
self.logger.debug(f"Closed existing command port on {port}")
except:
pass
except Exception as e:
self.logger.debug(f"Error during cleanup: {e}")
def _start_command_port(self):
"""Start Maya command port for MCP communication with fallback options"""
ports_to_try = [self.port] + self.alternative_ports
for port in ports_to_try:
try:
# Check if command port is already open
if cmds.commandPort(query=True, name=f":{port}"):
self.logger.info(f"Command port already open on port {port}")
self.port = port
self.command_port = port
return
# Try to open command port
cmds.commandPort(name=f":{port}", sourceType="python")
self.logger.info(f"Command port opened on port {port}")
self.port = port
self.command_port = port
# Test the connection
if self._test_command_port(port):
self.logger.info(f"Command port {port} is working correctly")
return
else:
self.logger.warning(f"Command port {port} opened but not responding")
except Exception as e:
self.logger.debug(f"Failed to start command port on port {port}: {e}")
continue
# If all ports failed, try localhost-only binding
self.logger.warning("All standard ports failed, trying localhost-only binding...")
try:
# Try localhost binding on default port
cmds.commandPort(name=f"localhost:{self.port}", sourceType="python")
self.logger.info(f"Command port opened on localhost:{self.port}")
self.command_port = self.port
except Exception as e:
self.logger.error(f"Failed to start any command port: {e}")
raise Exception("Could not start Maya command port on any available port")
def _test_command_port(self, port):
"""Test if command port is responding"""
try:
import socket
import time
# Give the port a moment to initialize
time.sleep(0.1)
# Try to connect to the port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex(('localhost', port))
sock.close()
return result == 0
except Exception:
return False
def shutdown(self):
"""Shutdown the MCP plugin"""
try:
self.logger.info(f"Shutting down {PLUGIN_NAME}")
# Stop console capture
if self.capture_enabled:
self.console_capture.stop_capture()
self.capture_enabled = False
self.logger.info("Console capture stopped")
# Close command port
if self.command_port:
try:
cmds.commandPort(name=f":{self.command_port}", close=True)
self.logger.info(f"Command port {self.command_port} closed")
except:
pass # Port might already be closed
self.running = False
cmds.warning(f"{PLUGIN_NAME} shutdown complete")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
def _handle_create_object(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle object creation requests"""
try:
object_type = arguments.get("object_type")
name = arguments.get("name")
transform = arguments.get("transform", {})
if not object_type:
return {"success": False, "error": "object_type is required", "result": None}
# Create the object
if name:
result = getattr(cmds, object_type)(name=name)
else:
result = getattr(cmds, object_type)()
created_object = result[0] if isinstance(result, list) else result
# Apply transform if provided
if transform:
if "translate" in transform:
cmds.move(*transform["translate"], created_object)
if "rotate" in transform:
cmds.rotate(*transform["rotate"], created_object)
if "scale" in transform:
cmds.scale(*transform["scale"], created_object)
return {
"success": True,
"error": None,
"result": f"Created {object_type}: {created_object}"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_select_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle object selection requests"""
try:
objects = arguments.get("objects", [])
add_to_selection = arguments.get("add", False)
if not objects:
return {"success": False, "error": "objects list is required", "result": None}
# Select objects
if add_to_selection:
cmds.select(objects, add=True)
else:
cmds.select(objects)
selected = cmds.ls(selection=True)
return {
"success": True,
"error": None,
"result": f"Selected objects: {selected}"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_transform_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle object transformation requests"""
try:
objects = arguments.get("objects", [])
translate = arguments.get("translate")
rotate = arguments.get("rotate")
scale = arguments.get("scale")
absolute = arguments.get("absolute", False)
if not objects:
return {"success": False, "error": "objects list is required", "result": None}
results = []
for obj in objects:
if translate:
if absolute:
cmds.move(*translate, obj, absolute=True)
else:
cmds.move(*translate, obj, relative=True)
results.append(f"Translated {obj}")
if rotate:
if absolute:
cmds.rotate(*rotate, obj, absolute=True)
else:
cmds.rotate(*rotate, obj, relative=True)
results.append(f"Rotated {obj}")
if scale:
if absolute:
cmds.scale(*scale, obj, absolute=True)
else:
cmds.scale(*scale, obj, relative=True)
results.append(f"Scaled {obj}")
return {
"success": True,
"error": None,
"result": "; ".join(results)
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_delete_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle object deletion requests"""
try:
objects = arguments.get("objects", [])
if not objects:
return {"success": False, "error": "objects list is required", "result": None}
# Delete objects
cmds.delete(objects)
return {
"success": True,
"error": None,
"result": f"Deleted objects: {objects}"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_execute_python(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Python code execution requests"""
try:
command = arguments.get("command", "")
safe_mode = arguments.get("safe_mode", True)
if not command:
return {"success": False, "error": "command is required", "result": None}
# Security check if safe mode is enabled
if safe_mode:
for keyword in self.blocked_keywords:
if keyword in command:
return {
"success": False,
"error": f"Command contains blocked keyword: {keyword}",
"result": None
}
# Execute command
try:
# Use exec for statements, eval for expressions
if any(stmt in command for stmt in ["=", "import", "def", "class", "for", "while", "if"]):
exec(command)
result = "Command executed successfully"
else:
result = eval(command)
return {
"success": True,
"error": None,
"result": str(result) if result is not None else "Command executed successfully"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_selection(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get selection requests"""
try:
long_names = arguments.get("long_names", False)
if long_names:
selected = cmds.ls(selection=True, long=True)
else:
selected = cmds.ls(selection=True)
return {
"success": True,
"error": None,
"result": selected
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_scene_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get scene info requests"""
try:
include_transforms = arguments.get("include_transforms", True)
include_attributes = arguments.get("include_attributes", False)
scene_info = {
"scene_name": cmds.file(query=True, sceneName=True) or "untitled",
"selected_objects": cmds.ls(selection=True),
"total_objects": len(cmds.ls(dag=True)),
"current_time": cmds.currentTime(query=True),
"playback_range": [
cmds.playbackOptions(query=True, minTime=True),
cmds.playbackOptions(query=True, maxTime=True)
]
}
if include_transforms:
transforms = cmds.ls(type="transform")
scene_info["transforms"] = len(transforms)
if include_attributes:
# This would be expensive, so limit to selected objects
selected = cmds.ls(selection=True)
if selected:
attrs = {}
for obj in selected[:5]: # Limit to first 5 objects
try:
attrs[obj] = cmds.listAttr(obj)
except:
pass
scene_info["attributes"] = attrs
return {
"success": True,
"error": None,
"result": scene_info
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_object_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get object info requests"""
try:
object_name = arguments.get("object_name")
include_attributes = arguments.get("include_attributes", True)
include_connections = arguments.get("include_connections", False)
if not object_name:
return {"success": False, "error": "object_name is required", "result": None}
# Check if object exists
if not cmds.objExists(object_name):
return {"success": False, "error": f"Object {object_name} does not exist", "result": None}
object_info = {
"name": object_name,
"type": cmds.objectType(object_name),
"exists": True
}
# Get transform information
try:
if cmds.objectType(object_name) == "transform":
object_info["transform"] = {
"translate": cmds.getAttr(f"{object_name}.translate")[0],
"rotate": cmds.getAttr(f"{object_name}.rotate")[0],
"scale": cmds.getAttr(f"{object_name}.scale")[0]
}
except:
pass
# Get attributes if requested
if include_attributes:
try:
object_info["attributes"] = cmds.listAttr(object_name)
except:
object_info["attributes"] = []
# Get connections if requested
if include_connections:
try:
object_info["connections"] = cmds.listConnections(object_name) or []
except:
object_info["connections"] = []
return {
"success": True,
"error": None,
"result": object_info
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_list_objects(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle list objects requests"""
try:
object_type = arguments.get("object_type", "all")
pattern = arguments.get("pattern")
if object_type == "all":
objects = cmds.ls(dag=True)
elif object_type == "transform":
objects = cmds.ls(type="transform")
elif object_type == "mesh":
objects = cmds.ls(type="mesh")
elif object_type == "camera":
objects = cmds.ls(type="camera")
elif object_type == "light":
objects = cmds.ls(type="light")
elif object_type == "material":
objects = cmds.ls(materials=True)
else:
objects = cmds.ls(type=object_type)
# Apply pattern filter if provided
if pattern:
objects = [obj for obj in objects if pattern in obj]
return {
"success": True,
"error": None,
"result": {
"objects": objects,
"count": len(objects),
"type": object_type
}
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_console_output(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get console output requests"""
try:
lines = arguments.get("lines", None)
include_errors = arguments.get("include_errors", True)
if include_errors:
result = self.console_capture.get_all_output(lines)
else:
result = {"output": self.console_capture.get_output(lines)}
result["capture_enabled"] = self.capture_enabled
result["total_lines"] = len(self.console_capture.output_buffer)
result["total_errors"] = len(self.console_capture.error_buffer)
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_enable_console_capture(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle enable console capture requests"""
try:
max_lines = arguments.get("max_lines", 1000)
# Update max lines if provided
self.console_capture.max_lines = max_lines
# Start capture if not already running
if not self.capture_enabled:
self.console_capture.start_capture()
self.capture_enabled = True
# Add callback for debug logging
def debug_callback(level: str, message: str):
if self.debug_mode:
self.logger.debug(f"Maya Console [{level}]: {message}")
self.console_capture.add_callback(debug_callback)
return {
"success": True,
"error": None,
"result": f"Console capture enabled with max {max_lines} lines"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_disable_console_capture(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle disable console capture requests"""
try:
if self.capture_enabled:
self.console_capture.stop_capture()
self.capture_enabled = False
return {
"success": True,
"error": None,
"result": "Console capture disabled"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_clear_console_buffer(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle clear console buffer requests"""
try:
self.console_capture.clear_buffers()
return {
"success": True,
"error": None,
"result": "Console buffers cleared"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_set_debug_mode(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle set debug mode requests"""
try:
enabled = arguments.get("enabled", True)
self.debug_mode = enabled
# Update logger level
if enabled:
self.logger.setLevel(logging.DEBUG)
else:
self.logger.setLevel(logging.INFO)
return {
"success": True,
"error": None,
"result": f"Debug mode {'enabled' if enabled else 'disabled'}"
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_debug_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get debug info requests"""
try:
debug_info = {
"plugin_name": PLUGIN_NAME,
"plugin_version": PLUGIN_VERSION,
"debug_mode": self.debug_mode,
"console_capture_enabled": self.capture_enabled,
"console_buffer_size": len(self.console_capture.output_buffer),
"error_buffer_size": len(self.console_capture.error_buffer),
"active_port": self.port,
"command_port_status": None,
"maya_version": cmds.about(version=True),
"maya_api_version": cmds.about(apiVersion=True),
"python_version": sys.version,
"recent_errors": self.console_capture.get_errors(5),
"recent_output": self.console_capture.get_output(5)
}
# Check command port status
try:
debug_info["command_port_status"] = cmds.commandPort(
query=True,
name=f":{self.port}"
)
except:
debug_info["command_port_status"] = False
return {
"success": True,
"error": None,
"result": debug_info
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def execute_command(self, command: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a command with given arguments"""
try:
if command in self.request_handlers:
handler = self.request_handlers[command]
return handler(arguments)
else:
return {
"success": False,
"error": f"Unknown command: {command}",
"result": None
}
except Exception as e:
self.logger.error(f"Error executing command {command}: {e}")
return {
"success": False,
"error": f"Internal error: {str(e)}",
"result": None
}
# Additional handler methods for new functionality
def _handle_get_log_file(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get Maya log file requests"""
try:
log_type = arguments.get("log_type", "output")
lines = arguments.get("lines", None)
import os
# Get Maya log directory
maya_app_dir = cmds.internalVar(userAppDir=True)
log_files = {
"output": os.path.join(maya_app_dir, "mayalog.txt"),
"error": os.path.join(maya_app_dir, "mayaerror.txt"),
"script_editor": os.path.join(maya_app_dir, "scriptEditorTemp.mel"),
"command_history": os.path.join(maya_app_dir, "commandHistory.txt")
}
log_file = log_files.get(log_type)
if not log_file or not os.path.exists(log_file):
return {
"success": False,
"error": f"Log file not found: {log_type}",
"result": None
}
# Read log file
try:
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
if lines:
# Read last N lines
all_lines = f.readlines()
content = ''.join(all_lines[-lines:])
else:
content = f.read()
return {
"success": True,
"error": None,
"result": {
"log_type": log_type,
"file_path": log_file,
"content": content,
"lines_returned": len(content.split('\n')) if content else 0
}
}
except Exception as e:
return {
"success": False,
"error": f"Error reading log file: {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_browse_outliner(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle browse outliner hierarchy requests"""
try:
root_node = arguments.get("root_node", "|")
depth = arguments.get("depth", 3)
include_shapes = arguments.get("include_shapes", True)
filter_type = arguments.get("filter_type", "all")
def get_hierarchy(node, current_depth, max_depth):
if current_depth >= max_depth:
return None
children = cmds.listRelatives(node, children=True, fullPath=True) or []
# Filter children based on type
if filter_type != "all":
filtered_children = []
for child in children:
try:
node_type = cmds.objectType(child)
if filter_type == "transform" and node_type == "transform":
filtered_children.append(child)
elif filter_type == "mesh" and node_type == "mesh":
filtered_children.append(child)
elif filter_type == "camera" and node_type == "camera":
filtered_children.append(child)
elif filter_type == "light" and "Light" in node_type:
filtered_children.append(child)
elif filter_type == "joint" and node_type == "joint":
filtered_children.append(child)
except:
pass
children = filtered_children
# Filter shapes if not included
if not include_shapes:
children = [child for child in children if cmds.objectType(child) != "mesh"]
hierarchy = {
"name": node.split("|")[-1],
"full_path": node,
"type": cmds.objectType(node) if cmds.objExists(node) else "unknown",
"children": []
}
for child in children:
child_hierarchy = get_hierarchy(child, current_depth + 1, max_depth)
if child_hierarchy:
hierarchy["children"].append(child_hierarchy)
return hierarchy
# Start from root node
if root_node == "|" or root_node == "world":
# Get top-level transforms
top_level = cmds.ls(assemblies=True)
result = {
"name": "world",
"full_path": "|",
"type": "world",
"children": []
}
for node in top_level:
child_hierarchy = get_hierarchy(node, 0, depth)
if child_hierarchy:
result["children"].append(child_hierarchy)
else:
result = get_hierarchy(root_node, 0, depth)
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_menu_structure(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get Maya menu structure requests"""
try:
menu_name = arguments.get("menu_name")
include_submenus = arguments.get("include_submenus", True)
# Basic menu structure info (simplified for safety)
if menu_name:
result = {
"menu_name": menu_name,
"message": "Menu structure access requires Maya UI to be active",
"available": False
}
else:
result = {
"main_menus": [
{"name": "File", "label": "File"},
{"name": "Edit", "label": "Edit"},
{"name": "Create", "label": "Create"},
{"name": "Select", "label": "Select"},
{"name": "Modify", "label": "Modify"},
{"name": "Display", "label": "Display"},
{"name": "Windows", "label": "Windows"},
{"name": "Help", "label": "Help"}
],
"message": "Basic menu list - detailed access requires Maya UI"
}
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_execute_menu_command(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle execute Maya menu command requests"""
try:
command = arguments.get("command")
cmd_arguments = arguments.get("arguments", {})
if not command:
return {"success": False, "error": "command is required", "result": None}
# Execute the command safely
try:
if hasattr(cmds, command):
if cmd_arguments:
result = getattr(cmds, command)(**cmd_arguments)
else:
result = getattr(cmds, command)()
return {
"success": True,
"error": None,
"result": f"Command '{command}' executed successfully: {result}"
}
else:
return {
"success": False,
"error": f"Command '{command}' not found in Maya cmds",
"result": None
}
except Exception as e:
return {
"success": False,
"error": f"Failed to execute command '{command}': {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_shelf_tools(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get Maya shelf tools requests"""
try:
shelf_name = arguments.get("shelf_name")
# Simplified shelf info (UI-dependent features)
result = {
"message": "Shelf access requires Maya UI to be active",
"available_shelves": [
"General", "Polygons", "Subdiv Surfaces", "Deformation",
"Animation", "Dynamics", "Rendering", "nDynamics",
"MASH", "XGen", "Arnold", "Toon"
],
"note": "Use maya_execute with shelf commands for programmatic access"
}
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_list_plugins(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle list Maya plugins requests"""
try:
loaded_only = arguments.get("loaded_only", False)
include_info = arguments.get("include_info", True)
# Get all plugins
all_plugins = cmds.pluginInfo(query=True, listPlugins=True) or []
result = {
"plugins": [],
"total_count": len(all_plugins),
"loaded_count": 0
}
for plugin in all_plugins:
try:
is_loaded = cmds.pluginInfo(plugin, query=True, loaded=True)
if loaded_only and not is_loaded:
continue
if is_loaded:
result["loaded_count"] += 1
plugin_info = {
"name": plugin,
"loaded": is_loaded
}
if include_info and is_loaded:
try:
plugin_info.update({
"version": cmds.pluginInfo(plugin, query=True, version=True) or "Unknown",
"vendor": cmds.pluginInfo(plugin, query=True, vendor=True) or "Unknown",
"path": cmds.pluginInfo(plugin, query=True, path=True) or ""
})
except:
pass
result["plugins"].append(plugin_info)
except Exception as e:
self.logger.debug(f"Error getting info for plugin {plugin}: {e}")
continue
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_load_plugin(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle load Maya plugin requests"""
try:
plugin_name = arguments.get("plugin_name")
quiet = arguments.get("quiet", False)
if not plugin_name:
return {"success": False, "error": "plugin_name is required", "result": None}
try:
# Check if already loaded
if cmds.pluginInfo(plugin_name, query=True, loaded=True):
return {
"success": True,
"error": None,
"result": f"Plugin '{plugin_name}' is already loaded"
}
# Load the plugin
cmds.loadPlugin(plugin_name, quiet=quiet)
return {
"success": True,
"error": None,
"result": f"Plugin '{plugin_name}' loaded successfully"
}
except Exception as e:
return {
"success": False,
"error": f"Failed to load plugin '{plugin_name}': {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_unload_plugin(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle unload Maya plugin requests"""
try:
plugin_name = arguments.get("plugin_name")
force = arguments.get("force", False)
if not plugin_name:
return {"success": False, "error": "plugin_name is required", "result": None}
try:
# Check if loaded
if not cmds.pluginInfo(plugin_name, query=True, loaded=True):
return {
"success": True,
"error": None,
"result": f"Plugin '{plugin_name}' is not loaded"
}
# Unload the plugin
cmds.unloadPlugin(plugin_name, force=force)
return {
"success": True,
"error": None,
"result": f"Plugin '{plugin_name}' unloaded successfully"
}
except Exception as e:
return {
"success": False,
"error": f"Failed to unload plugin '{plugin_name}': {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_api_call(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Maya API direct call requests"""
try:
api_module = arguments.get("api_module", "OpenMaya")
function_name = arguments.get("function_name")
api_arguments = arguments.get("arguments", [])
return_type = arguments.get("return_type", "object")
if not function_name:
return {"success": False, "error": "function_name is required", "result": None}
try:
# Import the API module
if api_module == "OpenMaya":
import maya.api.OpenMaya as api
elif api_module == "OpenMayaUI":
import maya.api.OpenMayaUI as api
elif api_module == "OpenMayaAnim":
import maya.api.OpenMayaAnim as api
elif api_module == "OpenMayaFX":
import maya.api.OpenMayaFX as api
elif api_module == "OpenMayaRender":
import maya.api.OpenMayaRender as api
else:
return {
"success": False,
"error": f"Unknown API module: {api_module}",
"result": None
}
# Get the function
if not hasattr(api, function_name):
return {
"success": False,
"error": f"Function '{function_name}' not found in {api_module}",
"result": None
}
api_function = getattr(api, function_name)
# Call the function
if api_arguments:
result = api_function(*api_arguments)
else:
result = api_function()
# Convert result based on return type
if return_type == "string":
result = str(result)
elif return_type == "int":
result = int(result)
elif return_type == "float":
result = float(result)
elif return_type == "bool":
result = bool(result)
# For "object", keep as is
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {
"success": False,
"error": f"API call failed: {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_execute_mayapy(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MayaPy script execution requests"""
try:
script_path = arguments.get("script_path")
script_content = arguments.get("script_content")
script_arguments = arguments.get("arguments", [])
working_directory = arguments.get("working_directory")
timeout = arguments.get("timeout", 30)
if not script_path and not script_content:
return {
"success": False,
"error": "Either script_path or script_content is required",
"result": None
}
# For security reasons, limit MayaPy execution in this context
return {
"success": False,
"error": "MayaPy execution is restricted for security reasons. Use maya_execute for Python code within Maya.",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_node_connections(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get node connections requests"""
try:
node_name = arguments.get("node_name")
connection_type = arguments.get("connection_type", "both")
include_attributes = arguments.get("include_attributes", True)
if not node_name:
return {"success": False, "error": "node_name is required", "result": None}
if not cmds.objExists(node_name):
return {"success": False, "error": f"Node '{node_name}' does not exist", "result": None}
result = {
"node_name": node_name,
"connections": {}
}
# Get incoming connections
if connection_type in ["incoming", "both"]:
incoming = cmds.listConnections(node_name, source=True, destination=False, plugs=include_attributes) or []
result["connections"]["incoming"] = incoming
# Get outgoing connections
if connection_type in ["outgoing", "both"]:
outgoing = cmds.listConnections(node_name, source=False, destination=True, plugs=include_attributes) or []
result["connections"]["outgoing"] = outgoing
# Get detailed attribute connections if requested
if include_attributes:
try:
all_connections = cmds.listConnections(node_name, connections=True, plugs=True) or []
# Parse connections into pairs
connection_pairs = []
for i in range(0, len(all_connections), 2):
if i + 1 < len(all_connections):
connection_pairs.append({
"source": all_connections[i],
"destination": all_connections[i + 1]
})
result["attribute_connections"] = connection_pairs
except:
pass
return {
"success": True,
"error": None,
"result": result
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_search_nodes(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle advanced node search requests"""
try:
search_pattern = arguments.get("search_pattern", "*")
node_type = arguments.get("node_type")
has_attribute = arguments.get("has_attribute")
attribute_value = arguments.get("attribute_value")
in_hierarchy = arguments.get("in_hierarchy")
# Start with basic search
if node_type:
nodes = cmds.ls(search_pattern, type=node_type)
else:
nodes = cmds.ls(search_pattern)
# Filter by hierarchy if specified
if in_hierarchy:
hierarchy_nodes = cmds.listRelatives(in_hierarchy, allDescendents=True, fullPath=True) or []
hierarchy_nodes.append(in_hierarchy) # Include root
nodes = [node for node in nodes if node in hierarchy_nodes]
# Filter by attribute existence
if has_attribute:
filtered_nodes = []
for node in nodes:
try:
if cmds.attributeQuery(has_attribute, node=node, exists=True):
filtered_nodes.append(node)
except:
pass
nodes = filtered_nodes
# Filter by attribute value
if has_attribute and attribute_value:
filtered_nodes = []
for node in nodes:
try:
current_value = str(cmds.getAttr(f"{node}.{has_attribute}"))
if attribute_value in current_value:
filtered_nodes.append(node)
except:
pass
nodes = filtered_nodes
# Prepare detailed results
detailed_results = []
for node in nodes[:100]: # Limit to 100 results
try:
node_info = {
"name": node,
"type": cmds.objectType(node),
"full_path": node
}
# Add attribute info if searching by attribute
if has_attribute:
try:
node_info["attribute_value"] = cmds.getAttr(f"{node}.{has_attribute}")
except:
pass
detailed_results.append(node_info)
except:
pass
return {
"success": True,
"error": None,
"result": {
"search_criteria": {
"pattern": search_pattern,
"type": node_type,
"attribute": has_attribute,
"attribute_value": attribute_value,
"hierarchy": in_hierarchy
},
"total_found": len(nodes),
"returned_count": len(detailed_results),
"nodes": detailed_results
}
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_get_workspace_info(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle get Maya workspace info requests"""
try:
include_rules = arguments.get("include_rules", True)
workspace_info = {
"current_workspace": cmds.workspace(query=True, rootDirectory=True),
"active": cmds.workspace(query=True, active=True)
}
if include_rules:
try:
# Get workspace rules
rules = {}
file_rule_list = cmds.workspace(query=True, fileRuleList=True) or []
for rule in file_rule_list:
try:
rule_entry = cmds.workspace(fileRule=rule, query=True)
rules[rule] = rule_entry
except:
pass
workspace_info["file_rules"] = rules
# Get variable list
variables = {}
var_list = cmds.workspace(query=True, variableList=True) or []
for var in var_list:
try:
var_entry = cmds.workspace(variable=var, query=True)
variables[var] = var_entry
except:
pass
workspace_info["variables"] = variables
except Exception as e:
workspace_info["rules_error"] = str(e)
return {
"success": True,
"error": None,
"result": workspace_info
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
def _handle_set_workspace(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle set Maya workspace requests"""
try:
workspace_path = arguments.get("workspace_path")
create_if_missing = arguments.get("create_if_missing", False)
if not workspace_path:
return {"success": False, "error": "workspace_path is required", "result": None}
import os
# Check if path exists
if not os.path.exists(workspace_path):
if create_if_missing:
try:
os.makedirs(workspace_path, exist_ok=True)
except Exception as e:
return {
"success": False,
"error": f"Failed to create workspace directory: {str(e)}",
"result": None
}
else:
return {
"success": False,
"error": f"Workspace path does not exist: {workspace_path}",
"result": None
}
try:
# Set the workspace
cmds.workspace(workspace_path, openWorkspace=True)
return {
"success": True,
"error": None,
"result": f"Workspace set to: {workspace_path}"
}
except Exception as e:
return {
"success": False,
"error": f"Failed to set workspace: {str(e)}",
"result": None
}
except Exception as e:
return {"success": False, "error": str(e), "result": None}
# Global plugin instance
_maya_mcp_plugin = None
def maya_useNewAPI():
"""Tell Maya to use the new API"""
pass
def initializePlugin(plugin):
"""Initialize the plugin when Maya loads it"""
global _maya_mcp_plugin
try:
# Create plugin instance
_maya_mcp_plugin = MayaMCPPlugin()
# Initialize plugin
if _maya_mcp_plugin.initialize():
print(f"Successfully loaded {PLUGIN_NAME} v{PLUGIN_VERSION}")
return True
else:
print(f"Failed to initialize {PLUGIN_NAME}")
return False
except Exception as e:
print(f"Error loading {PLUGIN_NAME}: {e}")
traceback.print_exc()
return False
def uninitializePlugin(plugin):
"""Uninitialize the plugin when Maya unloads it"""
global _maya_mcp_plugin
try:
if _maya_mcp_plugin:
_maya_mcp_plugin.shutdown()
_maya_mcp_plugin = None
print(f"Successfully unloaded {PLUGIN_NAME}")
return True
except Exception as e:
print(f"Error unloading {PLUGIN_NAME}: {e}")
return False
# Utility functions that can be called from MCP server
def mcp_get_plugin_info():
"""Get plugin information"""
return {
"name": PLUGIN_NAME,
"version": PLUGIN_VERSION,
"author": PLUGIN_AUTHOR,
"port": _maya_mcp_plugin.port if _maya_mcp_plugin else None,
"status": "running" if _maya_mcp_plugin and _maya_mcp_plugin.running else "stopped"
}
def mcp_execute_command(command: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a MCP command with arguments"""
global _maya_mcp_plugin
if _maya_mcp_plugin and _maya_mcp_plugin.running:
return _maya_mcp_plugin.execute_command(command, arguments)
else:
return {
"success": False,
"error": "Maya MCP Plugin is not running",
"result": None
}
def mcp_execute_safe_command(command_str):
"""Execute a command safely with error handling (legacy function)"""
return mcp_execute_command("maya_execute", {"command": command_str, "safe_mode": True})
def mcp_get_scene_status():
"""Get current Maya scene status (legacy function)"""
return mcp_execute_command("maya_get_scene_info", {"include_transforms": True})
def mcp_get_connection_status():
"""Get detailed connection status for diagnostics"""
global _maya_mcp_plugin
if not _maya_mcp_plugin:
return {
"plugin_loaded": False,
"error": "Plugin not loaded"
}
status = {
"plugin_loaded": True,
"plugin_running": _maya_mcp_plugin.running,
"plugin_name": PLUGIN_NAME,
"plugin_version": PLUGIN_VERSION,
"active_port": _maya_mcp_plugin.port if _maya_mcp_plugin.running else None,
"command_port_active": False,
"available_ports": [],
"firewall_status": "unknown"
}
# Check command port status
if _maya_mcp_plugin.running and _maya_mcp_plugin.command_port:
try:
status["command_port_active"] = cmds.commandPort(
query=True,
name=f":{_maya_mcp_plugin.command_port}"
)
except:
status["command_port_active"] = False
# Check available ports
for port in [7022, 7023, 7024, 7025, 7026]:
try:
if cmds.commandPort(query=True, name=f":{port}"):
status["available_ports"].append(port)
except:
pass
# Test firewall status
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', _maya_mcp_plugin.port))
sock.close()
status["firewall_status"] = "open" if result == 0 else "blocked"
except:
status["firewall_status"] = "error"
return status
def mcp_diagnose_connection():
"""Diagnose connection issues and provide solutions"""
status = mcp_get_connection_status()
diagnosis = {
"status": status,
"issues": [],
"solutions": []
}
# Check for issues
if not status["plugin_loaded"]:
diagnosis["issues"].append("Plugin not loaded")
diagnosis["solutions"].append("Load the maya_mcp plugin via Plugin Manager")
if not status["plugin_running"]:
diagnosis["issues"].append("Plugin not running")
diagnosis["solutions"].append("Restart the plugin or reload Maya")
if not status["command_port_active"]:
diagnosis["issues"].append("Command port not active")
diagnosis["solutions"].append("Check if port is blocked by firewall")
if status["firewall_status"] == "blocked":
diagnosis["issues"].append("Port blocked by firewall")
diagnosis["solutions"].extend([
"Add Maya to Windows Firewall exceptions",
"Try alternative ports: " + str([7022, 7023, 7024, 7025, 7026]),
"Use localhost-only binding"
])
if len(status["available_ports"]) == 0:
diagnosis["issues"].append("No command ports available")
diagnosis["solutions"].append("Restart Maya and reload plugin")
return diagnosis
# Auto-load the plugin when Maya starts (if placed in plug-ins directory)
if __name__ == "__main__":
# This will run if the script is executed directly
print(f"Loading {PLUGIN_NAME} v{PLUGIN_VERSION}")
# Try to load the plugin
try:
if not cmds.pluginInfo(PLUGIN_NAME, query=True, loaded=True):
cmds.loadPlugin(__file__)
except:
# Plugin might already be loaded or there might be an error
pass