DaVinci Resolve MCP

by samuelgursky
Verified
#!/usr/bin/env python3 """ DaVinci Resolve MCP Server A server that connects to DaVinci Resolve via the Model Context Protocol (MCP) Version: 1.3.5 - Enhanced Cursor Integration """ import os import sys import logging from typing import List, Dict, Any, Optional # Add src directory to Python path current_dir = os.path.dirname(os.path.abspath(__file__)) src_dir = os.path.join(current_dir, 'src') if src_dir not in sys.path: sys.path.insert(0, src_dir) # Import platform utilities from src.utils.platform import setup_environment, get_platform, get_resolve_paths # Setup platform-specific paths and environment variables paths = get_resolve_paths() RESOLVE_API_PATH = paths["api_path"] RESOLVE_LIB_PATH = paths["lib_path"] RESOLVE_MODULES_PATH = paths["modules_path"] os.environ["RESOLVE_SCRIPT_API"] = RESOLVE_API_PATH os.environ["RESOLVE_SCRIPT_LIB"] = RESOLVE_LIB_PATH # Add the module path to Python's path if it's not already there if RESOLVE_MODULES_PATH not in sys.path: sys.path.append(RESOLVE_MODULES_PATH) # Import MCP from mcp.server.fastmcp import FastMCP # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger("davinci-resolve-mcp") # Log server version and platform VERSION = "1.3.5" logger.info(f"Starting DaVinci Resolve MCP Server v{VERSION}") logger.info(f"Detected platform: {get_platform()}") logger.info(f"Using Resolve API path: {RESOLVE_API_PATH}") logger.info(f"Using Resolve library path: {RESOLVE_LIB_PATH}") # Create MCP server instance mcp = FastMCP("DaVinciResolveMCP") # Initialize connection to DaVinci Resolve try: # Direct import from the Modules directory sys.path.insert(0, RESOLVE_MODULES_PATH) import DaVinciResolveScript as dvr_script resolve = dvr_script.scriptapp("Resolve") if resolve: logger.info(f"Connected to DaVinci Resolve: {resolve.GetProductName()} {resolve.GetVersionString()}") else: logger.error("Failed to get Resolve object. Is DaVinci Resolve running?") except ImportError as e: logger.error(f"Failed to import DaVinciResolveScript: {str(e)}") logger.error("Check that DaVinci Resolve is installed and running.") logger.error(f"RESOLVE_SCRIPT_API: {RESOLVE_API_PATH}") logger.error(f"RESOLVE_SCRIPT_LIB: {RESOLVE_LIB_PATH}") logger.error(f"RESOLVE_MODULES_PATH: {RESOLVE_MODULES_PATH}") logger.error(f"sys.path: {sys.path}") resolve = None except Exception as e: logger.error(f"Unexpected error initializing Resolve: {str(e)}") resolve = None # ------------------ # MCP Tools/Resources # ------------------ @mcp.resource("resolve://version") def get_resolve_version() -> str: """Get DaVinci Resolve version information.""" if resolve is None: return "Error: Not connected to DaVinci Resolve" return f"{resolve.GetProductName()} {resolve.GetVersionString()}" @mcp.resource("resolve://current-page") def get_current_page() -> str: """Get the current page open in DaVinci Resolve (Edit, Color, Fusion, etc.).""" if resolve is None: return "Error: Not connected to DaVinci Resolve" return resolve.GetCurrentPage() @mcp.tool() def switch_page(page: str) -> str: """Switch to a specific page in DaVinci Resolve. Args: page: The page to switch to. Options: 'media', 'cut', 'edit', 'fusion', 'color', 'fairlight', 'deliver' """ if resolve is None: return "Error: Not connected to DaVinci Resolve" valid_pages = ['media', 'cut', 'edit', 'fusion', 'color', 'fairlight', 'deliver'] page = page.lower() if page not in valid_pages: return f"Error: Invalid page name. Must be one of: {', '.join(valid_pages)}" result = resolve.OpenPage(page) if result: return f"Successfully switched to {page} page" else: return f"Failed to switch to {page} page" # ------------------ # Project Management # ------------------ @mcp.resource("resolve://projects") def list_projects() -> List[str]: """List all available projects in the current database.""" if resolve is None: return ["Error: Not connected to DaVinci Resolve"] project_manager = resolve.GetProjectManager() if not project_manager: return ["Error: Failed to get Project Manager"] projects = project_manager.GetProjectListInCurrentFolder() # Filter out any empty strings that might be in the list return [p for p in projects if p] @mcp.resource("resolve://current-project") def get_current_project_name() -> str: """Get the name of the currently open project.""" if resolve is None: return "Error: Not connected to DaVinci Resolve" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" current_project = project_manager.GetCurrentProject() if not current_project: return "No project currently open" return current_project.GetName() @mcp.resource("resolve://project-settings") def get_project_settings() -> Dict[str, Any]: """Get all project settings from the current project.""" if resolve is None: return {"error": "Not connected to DaVinci Resolve"} project_manager = resolve.GetProjectManager() if not project_manager: return {"error": "Failed to get Project Manager"} current_project = project_manager.GetCurrentProject() if not current_project: return {"error": "No project currently open"} try: # Get all settings return current_project.GetSetting('') except Exception as e: return {"error": f"Failed to get project settings: {str(e)}"} @mcp.resource("resolve://project-setting/{setting_name}") def get_project_setting(setting_name: str) -> Dict[str, Any]: """Get a specific project setting by name. Args: setting_name: The specific setting to retrieve. """ if resolve is None: return {"error": "Not connected to DaVinci Resolve"} project_manager = resolve.GetProjectManager() if not project_manager: return {"error": "Failed to get Project Manager"} current_project = project_manager.GetCurrentProject() if not current_project: return {"error": "No project currently open"} try: # Get specific setting value = current_project.GetSetting(setting_name) return {setting_name: value} except Exception as e: return {"error": f"Failed to get project setting '{setting_name}': {str(e)}"} @mcp.tool() def set_project_setting(setting_name: str, setting_value: str) -> str: """Set a project setting to the specified value. Args: setting_name: The name of the setting to change setting_value: The new value for the setting """ if resolve is None: return "Error: Not connected to DaVinci Resolve" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" current_project = project_manager.GetCurrentProject() if not current_project: return "Error: No project currently open" try: result = current_project.SetSetting(setting_name, setting_value) if result: return f"Successfully set project setting '{setting_name}' to '{setting_value}'" else: return f"Failed to set project setting '{setting_name}'" except Exception as e: return f"Error setting project setting: {str(e)}" @mcp.tool() def open_project(name: str) -> str: """Open a project by name. Args: name: The name of the project to open """ if resolve is None: return "Error: Not connected to DaVinci Resolve" if not name: return "Error: Project name cannot be empty" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" # Check if project exists projects = project_manager.GetProjectListInCurrentFolder() if name not in projects: return f"Error: Project '{name}' not found. Available projects: {', '.join(projects)}" result = project_manager.LoadProject(name) if result: return f"Successfully opened project '{name}'" else: return f"Failed to open project '{name}'" @mcp.tool() def create_project(name: str) -> str: """Create a new project with the given name. Args: name: The name for the new project """ if resolve is None: return "Error: Not connected to DaVinci Resolve" if not name: return "Error: Project name cannot be empty" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" # Check if project already exists projects = project_manager.GetProjectListInCurrentFolder() if name in projects: return f"Error: Project '{name}' already exists" result = project_manager.CreateProject(name) if result: return f"Successfully created project '{name}'" else: return f"Failed to create project '{name}'" @mcp.tool() def save_project() -> str: """Save the current project. Note that DaVinci Resolve typically auto-saves projects, so this may not be necessary. """ if resolve is None: return "Error: Not connected to DaVinci Resolve" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" current_project = project_manager.GetCurrentProject() if not current_project: return "Error: No project currently open" project_name = current_project.GetName() success = False error_message = None # Try multiple approaches to save the project try: # Method 1: Try direct save method if available try: if hasattr(current_project, "SaveProject"): result = current_project.SaveProject() if result: logger.info(f"Project '{project_name}' saved using SaveProject method") success = True except Exception as e: logger.error(f"Error in SaveProject method: {str(e)}") error_message = str(e) # Method 2: Try project manager save method if not success: try: if hasattr(project_manager, "SaveProject"): result = project_manager.SaveProject() if result: logger.info(f"Project '{project_name}' saved using ProjectManager.SaveProject method") success = True except Exception as e: logger.error(f"Error in ProjectManager.SaveProject method: {str(e)}") if not error_message: error_message = str(e) # Method 3: Try the export method as a backup approach if not success: try: # Get a temporary file path in the same location as other project files import tempfile import os temp_dir = tempfile.gettempdir() temp_file = os.path.join(temp_dir, f"{project_name}_temp.drp") # Try to export the project, which should trigger a save result = project_manager.ExportProject(project_name, temp_file) if result: logger.info(f"Project '{project_name}' saved via temporary export to {temp_file}") # Try to clean up temp file try: if os.path.exists(temp_file): os.remove(temp_file) except: pass success = True except Exception as e: logger.error(f"Error in export method: {str(e)}") if not error_message: error_message = str(e) # If all else fails, rely on auto-save if not success: return f"Automatic save likely in effect for project '{project_name}'. Manual save attempts failed: {error_message if error_message else 'Unknown error'}" else: return f"Successfully saved project '{project_name}'" except Exception as e: logger.error(f"Error saving project: {str(e)}") return f"Error saving project: {str(e)}" # ------------------ # Timeline Operations # ------------------ @mcp.resource("resolve://timelines") def list_timelines() -> List[str]: """List all timelines in the current project.""" logger.info("Received request to list timelines") if resolve is None: logger.error("Not connected to DaVinci Resolve") return ["Error: Not connected to DaVinci Resolve"] project_manager = resolve.GetProjectManager() if not project_manager: logger.error("Failed to get Project Manager") return ["Error: Failed to get Project Manager"] current_project = project_manager.GetCurrentProject() if not current_project: logger.error("No project currently open") return ["Error: No project currently open"] timeline_count = current_project.GetTimelineCount() logger.info(f"Timeline count: {timeline_count}") timelines = [] for i in range(1, timeline_count + 1): timeline = current_project.GetTimelineByIndex(i) if timeline: timeline_name = timeline.GetName() timelines.append(timeline_name) logger.info(f"Found timeline {i}: {timeline_name}") if not timelines: logger.info("No timelines found in the current project") return ["No timelines found in the current project"] logger.info(f"Returning {len(timelines)} timelines: {', '.join(timelines)}") return timelines @mcp.resource("resolve://current-timeline") def get_current_timeline() -> Dict[str, Any]: """Get information about the current timeline.""" if resolve is None: return {"error": "Not connected to DaVinci Resolve"} project_manager = resolve.GetProjectManager() if not project_manager: return {"error": "Failed to get Project Manager"} current_project = project_manager.GetCurrentProject() if not current_project: return {"error": "No project currently open"} current_timeline = current_project.GetCurrentTimeline() if not current_timeline: return {"error": "No timeline currently active"} # Get basic timeline information result = { "name": current_timeline.GetName(), "fps": current_timeline.GetSetting("timelineFrameRate"), "resolution": { "width": current_timeline.GetSetting("timelineResolutionWidth"), "height": current_timeline.GetSetting("timelineResolutionHeight") }, "duration": current_timeline.GetEndFrame() - current_timeline.GetStartFrame() + 1 } return result @mcp.tool() def create_timeline(name: str) -> str: """Create a new timeline with the given name. Args: name: The name for the new timeline """ if resolve is None: return "Error: Not connected to DaVinci Resolve" if not name: return "Error: Timeline name cannot be empty" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" current_project = project_manager.GetCurrentProject() if not current_project: return "Error: No project currently open" # Check if timeline already exists timeline_count = current_project.GetTimelineCount() for i in range(1, timeline_count + 1): timeline = current_project.GetTimelineByIndex(i) if timeline and timeline.GetName() == name: return f"Error: Timeline '{name}' already exists" # Create the timeline media_pool = current_project.GetMediaPool() if not media_pool: return "Error: Failed to get Media Pool" result = media_pool.CreateEmptyTimeline(name) if result: return f"Successfully created timeline '{name}'" else: return f"Failed to create timeline '{name}'" @mcp.tool() def set_current_timeline(name: str) -> str: """Switch to a timeline by name. Args: name: The name of the timeline to set as current """ if resolve is None: return "Error: Not connected to DaVinci Resolve" if not name: return "Error: Timeline name cannot be empty" project_manager = resolve.GetProjectManager() if not project_manager: return "Error: Failed to get Project Manager" current_project = project_manager.GetCurrentProject() if not current_project: return "Error: No project currently open" # Find the timeline by name timeline_count = current_project.GetTimelineCount() for i in range(1, timeline_count + 1): timeline = current_project.GetTimelineByIndex(i) if timeline and timeline.GetName() == name: result = current_project.SetCurrentTimeline(timeline) if result: return f"Successfully switched to timeline '{name}'" else: return f"Failed to switch to timeline '{name}'" return f"Error: Timeline '{name}' not found" @mcp.tool() def add_marker(frame: int = None, color: str = "Blue", note: str = "") -> str: """Add a marker at the specified frame in the current timeline. Args: frame: The frame number to add the marker at (defaults to current position if None) color: The marker color (Blue, Cyan, Green, Yellow, Red, Pink, Purple, Fuchsia, Rose, Lavender, Sky, Mint, Lemon, Sand, Cocoa, Cream) note: Text note to add to the marker """ from api.timeline_operations import add_marker as add_marker_func return add_marker_func(resolve, frame, color, note) # ------------------ # Media Pool Operations # ------------------ @mcp.resource("resolve://media-pool-clips") def list_media_pool_clips() -> List[Dict[str, Any]]: """List all clips in the root folder of the media pool.""" if resolve is None: return [{"error": "Not connected to DaVinci Resolve"}] project_manager = resolve.GetProjectManager() if not project_manager: return [{"error": "Failed to get Project Manager"}] current_project = project_manager.GetCurrentProject() if not current_project: return [{"error": "No project currently open"}] media_pool = current_project.GetMediaPool() if not media_pool: return [{"error": "Failed to get Media Pool"}] root_folder = media_pool.GetRootFolder() if not root_folder: return [{"error": "Failed to get root folder"}] clips = root_folder.GetClipList() if not clips: return [{"info": "No clips found in the root folder"}] # Return a simplified list with basic clip info result = [] for clip in clips: result.append({ "name": clip.GetName(), "duration": clip.GetDuration(), "fps": clip.GetClipProperty("FPS") }) return result @mcp.tool() def import_media(file_path: str) -> str: """Import media file into the current project's media pool. Args: file_path: The path to the media file to import """ from api.media_operations import import_media as import_media_func return import_media_func(resolve, file_path) @mcp.tool() def create_bin(name: str) -> str: """Create a new bin/folder in the media pool. Args: name: The name for the new bin """ from api.media_operations import create_bin as create_bin_func return create_bin_func(resolve, name) @mcp.resource("resolve://timeline-clips") def list_timeline_clips() -> List[Dict[str, Any]]: """List all clips in the current timeline.""" if resolve is None: return [{"error": "Not connected to DaVinci Resolve"}] project_manager = resolve.GetProjectManager() if not project_manager: return [{"error": "Failed to get Project Manager"}] current_project = project_manager.GetCurrentProject() if not current_project: return [{"error": "No project currently open"}] current_timeline = current_project.GetCurrentTimeline() if not current_timeline: return [{"error": "No timeline currently active"}] try: # Get all tracks in the timeline # Video tracks are 1-based index (1 is first track) video_track_count = current_timeline.GetTrackCount("video") audio_track_count = current_timeline.GetTrackCount("audio") clips = [] # Process video tracks for track_index in range(1, video_track_count + 1): track_items = current_timeline.GetItemListInTrack("video", track_index) if track_items: for item in track_items: clips.append({ "name": item.GetName(), "type": "video", "track": track_index, "start_frame": item.GetStart(), "end_frame": item.GetEnd(), "duration": item.GetDuration() }) # Process audio tracks for track_index in range(1, audio_track_count + 1): track_items = current_timeline.GetItemListInTrack("audio", track_index) if track_items: for item in track_items: clips.append({ "name": item.GetName(), "type": "audio", "track": track_index, "start_frame": item.GetStart(), "end_frame": item.GetEnd(), "duration": item.GetDuration() }) if not clips: return [{"info": "No clips found in the current timeline"}] return clips except Exception as e: return [{"error": f"Error listing timeline clips: {str(e)}"}] @mcp.tool() def list_timelines_tool() -> List[str]: """List all timelines in the current project as a tool.""" logger.info("Received request to list timelines via tool") return list_timelines() @mcp.tool() def add_clip_to_timeline(clip_name: str, timeline_name: str = None) -> str: """Add a media pool clip to the timeline. Args: clip_name: Name of the clip in the media pool timeline_name: Optional timeline to target (uses current if not specified) """ from api.media_operations import add_clip_to_timeline as add_clip_func return add_clip_func(resolve, clip_name, timeline_name) # Start the server if __name__ == "__main__": try: if resolve is None: logger.error("Cannot start server without connection to DaVinci Resolve") sys.exit(1) logger.info("Starting DaVinci Resolve MCP Server") # Start the MCP server with the simple run method # Note: The MCP CLI tool handles port configuration, not FastMCP directly mcp.run() except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error(f"Server error: {str(e)}") sys.exit(1)