Skip to main content
Glama

video-editing-mcp

generate_opentimeline.py9.64 kB
import opentimelineio as otio from videojungle import ApiClient import os import sys import json import argparse import logging import requests logging.basicConfig( filename="app.log", # Name of the log file level=logging.INFO, # Log level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) format="%(asctime)s - %(levelname)s - %(message)s", # Log format ) vj = ApiClient(os.environ.get("VJ_API_KEY")) def timecode_to_frames(timecode, fps=24.0): """ Convert HH:MM:SS.xxx format to frames, handling variable decimal places """ try: parts = timecode.split(":") hours = float(parts[0]) minutes = float(parts[1]) seconds = float(parts[2]) total_seconds = hours * 3600 + minutes * 60 + seconds return int(total_seconds * fps) except (ValueError, IndexError) as e: raise ValueError(f"Invalid timecode format: {timecode}") from e def create_rational_time(timecode, fps=24.0): """Create RationalTime object from HH:MM:SS.xxx format""" frames = timecode_to_frames(timecode, fps) return otio.opentime.RationalTime(frames, fps) def download_asset(asset_id, asset_type, download_dir="downloads"): """Download an asset using either the assets API or video files API based on type""" try: # Determine which API to use based on asset type if asset_type in ["user", "audio", "mp3", "wav", "aac", "m4a"]: # Use assets API for user uploads and audio files asset = vj.assets.get(asset_id) if not asset.download_url: logging.error(f"No download URL for asset {asset_id}") return None download_url = asset.download_url filename = ( asset.name if hasattr(asset, "name") and asset.name else str(asset_id) ) else: # Use video files API for video files video = vj.video_files.get(asset_id) if not video.download_url: logging.error(f"No download URL for video {asset_id}") return None download_url = video.download_url filename = ( video.name if hasattr(video, "name") and video.name else str(asset_id) ) # Determine file extension based on asset type ext_map = { "mp3": ".mp3", "wav": ".wav", "aac": ".aac", "m4a": ".m4a", "user": ".mp4", # Default for user videos "video": ".mp4", "audio": ".mp3", # Default for generic audio } ext = ext_map.get(asset_type, ".mp4") # Remove any existing extension and add the correct one if "." in filename: filename = filename.rsplit(".", 1)[0] local_file = os.path.join(download_dir, f"{filename}{ext}") # Check if file already exists if os.path.exists(local_file): logging.info(f"Asset already exists at {local_file}, skipping download") return local_file # Download the file if asset_type in ["user", "audio", "mp3", "wav", "aac", "m4a"]: # Use requests for assets API downloads response = requests.get(download_url, stream=True) response.raise_for_status() with open(local_file, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) else: # Use video files download method lf = vj.video_files.download(asset_id, local_file) logging.info(f"Downloaded video to {lf}") return lf logging.info(f"Downloaded asset {asset_id} to {local_file}") return local_file except Exception as e: logging.error(f"Error downloading asset {asset_id}: {e}") return None def create_otio_timeline( edit_spec, filename, download_dir="downloads" ) -> otio.schema.Timeline: if not os.path.exists(download_dir): os.makedirs(download_dir) timeline = otio.schema.Timeline(name=edit_spec.get("name", "Timeline")) # Create video track video_track = otio.schema.Track(name="V1", kind=otio.schema.TrackKind.Video) timeline.tracks.append(video_track) # Create audio track if there are audio overlays audio_track = None if "audio_overlay" in edit_spec and edit_spec["audio_overlay"]: audio_track = otio.schema.Track(name="A1", kind=otio.schema.TrackKind.Audio) timeline.tracks.append(audio_track) # Process video clips for cut in edit_spec["video_series_sequential"]: asset_type = cut.get("type", "video") local_file = download_asset(cut["video_id"], asset_type, download_dir) if not local_file: continue fps = edit_spec.get("video_output_fps", 24.0) start_time = create_rational_time(cut["video_start_time"], fps) end_time = create_rational_time(cut["video_end_time"], fps) clip = otio.schema.Clip( name=f"clip_{cut['video_id']}", media_reference=otio.schema.ExternalReference( target_url=os.path.abspath(local_file) ), source_range=otio.opentime.TimeRange(start_time, (end_time - start_time)), ) # TODO: Add audio level metadata if needed if "audio_levels" in cut and cut["audio_levels"]: # OpenTimelineIO doesn't have direct audio level support # This would need to be handled in the editing software clip.metadata["audio_levels"] = cut["audio_levels"] # Add crop metadata if present if "crop" in cut and cut["crop"]: # Store crop settings in metadata for the editing software to interpret clip.metadata["crop"] = { "zoom": cut["crop"].get("zoom", 1.0), "position_x": cut["crop"].get("position_x", 0.0), "position_y": cut["crop"].get("position_y", 0.0), } video_track.append(clip) # Process audio overlays if audio_track and "audio_overlay" in edit_spec: for audio_item in edit_spec["audio_overlay"]: audio_type = audio_item.get("type", "mp3") local_audio_file = download_asset( audio_item["audio_id"], audio_type, download_dir ) if not local_audio_file: continue fps = edit_spec.get("video_output_fps", 24.0) audio_start = create_rational_time(audio_item["audio_start_time"], fps) audio_end = create_rational_time(audio_item["audio_end_time"], fps) audio_clip = otio.schema.Clip( name=f"audio_{audio_item['audio_id']}", media_reference=otio.schema.ExternalReference( target_url=os.path.abspath(local_audio_file) ), source_range=otio.opentime.TimeRange( audio_start, (audio_end - audio_start) ), ) # Add audio level metadata if present if "audio_levels" in audio_item and audio_item["audio_levels"]: audio_clip.metadata["audio_levels"] = audio_item["audio_levels"] audio_track.append(audio_clip) otio.adapters.write_to_file(timeline, filename) logging.info(f"OTIO timeline saved to {filename}") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--file", help="JSON file path") parser.add_argument("--output", help="Output file path") parser.add_argument("--json", type=json.loads, help="JSON string") args = parser.parse_args() spec = None # Set DaVinci Resolve environment variables os.environ["RESOLVE_SCRIPT_API"] = ( "/Library/Application Support/Blackmagic Design/DaVinci Resolve/Developer/Scripting" ) os.environ["RESOLVE_SCRIPT_LIB"] = ( "/Applications/DaVinci Resolve/DaVinci Resolve.app/Contents/Libraries/Fusion/fusionscript.so" ) # Add Resolve's Python modules to the path script_module_path = os.path.join(os.environ["RESOLVE_SCRIPT_API"], "Modules") if script_module_path not in sys.path: sys.path.append(script_module_path) # Now import DaVinciResolveScript try: import DaVinciResolveScript as dvr_script except ImportError: # print(f"Error importing DaVinciResolveScript: {e}") # print("Make sure DaVinci Resolve is installed correctly.") # Re-raise the exception or set dvr_script to None as a fallback dvr_script = None if args.json: spec = args.json elif args.file: with open(args.file) as f: spec = json.load(f) elif not sys.stdin.isatty(): # Check if data is being piped spec = json.load(sys.stdin) else: parser.print_help() sys.exit(1) if args.output: output_file = args.output else: output_file = "output.otio" create_otio_timeline(spec, output_file) output_file_absolute = os.path.abspath(output_file) if dvr_script: resolve = dvr_script.scriptapp("Resolve") if resolve: project_manager = resolve.GetProjectManager() project = project_manager.GetCurrentProject() media_pool = project.GetMediaPool() media_pool.ImportTimelineFromFile( output_file_absolute, {"timelineName": spec["name"]} ) logging.info(f"Imported {output_file} into DaVinci Resolve") else: logging.error("Could not connect to DaVinci Resolve.")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/burningion/video-editing-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server