video-editing-mcp

import logging import os import subprocess import sys import threading from typing import List, Optional, Union, Any import json import webbrowser import mcp.server.stdio import mcp.types as types import osxphotos import requests from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from pydantic import AnyUrl from transformers import AutoModel from videojungle import ApiClient, VideoFilters from .search_local_videos import get_videos_by_keyword from .generate_charts import render_bar_chart from .generate_opentimeline import create_otio_timeline import numpy as np if os.environ.get("VJ_API_KEY"): VJ_API_KEY = os.environ.get("VJ_API_KEY") else: try: VJ_API_KEY = sys.argv[1] except Exception: VJ_API_KEY = None BROWSER_OPEN = False # Configure the logging logging.basicConfig( filename="app.log", # Name of the log file level=logging.INFO, # Log level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL) format="%(asctime)s - %(levelname)s - %(message)s", # Log format ) if not VJ_API_KEY: try: with open(".env", "r") as f: for line in f: if "VJ_API_KEY" in line: VJ_API_KEY = line.split("=")[1] except Exception: raise Exception( "VJ_API_KEY environment variable is required or a .env file with the key is required" ) raise Exception("VJ_API_KEY environment variable is required") vj = ApiClient(VJ_API_KEY) class PhotosDBLoader: def __init__(self): self._db: Optional[osxphotos.PhotosDB] = None self.start_loading() def start_loading(self): def load(): self._db = osxphotos.PhotosDB() logging.info("PhotosDB loaded") thread = threading.Thread(target=load) thread.daemon = True # Make thread exit when main program exits thread.start() @property def db(self) -> osxphotos.PhotosDB: if self._db is None: raise Exception("PhotosDB still loading") return self._db class EmbeddingModelLoader: def __init__(self, model_name: str = "jinaai/jina-clip-v1"): self._model: Optional[AutoModel] = None self.model_name = model_name self.start_loading() def start_loading(self): def load(): self._model = AutoModel.from_pretrained( self.model_name, trust_remote_code=True ) logging.info(f"Model {self.model_name} loaded") thread = threading.Thread(target=load) thread.daemon = True thread.start() @property def model(self) -> AutoModel: if self._model is None: raise Exception(f"Model {self.model_name} still loading") return self._model def encode_text( self, texts: Union[str, List[str]], truncate_dim: Optional[int] = None, task: Optional[str] = None, ) -> dict: """ Encode text and format the embeddings in the expected JSON structure """ embeddings = self.model.encode_text(texts, truncate_dim=truncate_dim, task=task) # Format the response in the expected structure return {"embeddings": embeddings.tolist(), "embedding_type": "text_embeddings"} def encode_image( self, images: Union[str, List[str]], truncate_dim: Optional[int] = None ) -> dict: """ Encode images and format the embeddings in the expected JSON structure """ embeddings = self.model.encode_image(images, truncate_dim=truncate_dim) return {"embeddings": embeddings.tolist(), "embedding_type": "image_embeddings"} def post_embeddings( self, embeddings: dict, endpoint_url: str, headers: Optional[dict] = None ) -> requests.Response: """ Post embeddings to the specified endpoint """ if headers is None: headers = {"Content-Type": "application/json"} response = requests.post(endpoint_url, json=embeddings, headers=headers) response.raise_for_status() return response # Create global loader instance, (requires access to host computer!) if sys.platform == "darwin" and os.environ.get("LOAD_PHOTOS_DB"): photos_loader = PhotosDBLoader() model_loader = EmbeddingModelLoader() server = Server("video-jungle-mcp") try: videos_at_start = vj.video_files.list() except Exception as e: logging.error(f"Error getting videos at start: {e}") videos_at_start = [] counter = 10 tools = [ "add-video", "search-local-videos", "search-remote-videos", "generate-edit-from-videos", "create-video-bar-chart-from-two-axis-data", "create-video-line-chart-from-two-axis-data", "generate-edit-from-single-video", "update-video-edit", # not until we're actually ready ] def validate_y_values(y_values: Any) -> bool: """ Validates that y_values is a single-dimensional array/list of numbers. Args: y_values: The input to validate Returns: bool: True if validation passes Raises: ValueError: If validation fails with a descriptive message """ # Check if input is a list or numpy array if not isinstance(y_values, (list, np.ndarray)): raise ValueError("y_values must be a list") # Convert to numpy array for easier handling y_array = np.array(y_values) # Check if it's multi-dimensional if len(y_array.shape) > 1: raise ValueError("y_values must be a single-dimensional array") # Check if all elements are numeric if not np.issubdtype(y_array.dtype, np.number): raise ValueError("all elements in y_values must be numbers") # Check for NaN or infinite values if np.any(np.isnan(y_array)) or np.any(np.isinf(y_array)): raise ValueError("y_values cannot contain NaN or infinite values") return True @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """ List available video files. Each video files is available at a specific url """ global counter, videos_at_start counter += 1 """ if counter % 10 == 0: videos = vj.video_files.list() videos_at_start = videos counter = 0 videos = [ types.Resource( uri=AnyUrl(f"vj://video-file/{video.id}"), name=f"Video Jungle Video: {video.name}", description=f"User provided description: {video.description}", mimeType="video/mp4", ) for video in videos_at_start ] projects = [ types.Resource( uri=AnyUrl(f"vj://project/{project.id}"), name=f"Video Jungle Project: {project.name}", description=f"Project description: {project.description}", mimeType="application/json", ) for project in projects ]""" return [] # videos # + projects @server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: """ Read a video's content by its URI. The video id is extracted from the URI host component. """ if uri.scheme != "vj": raise ValueError(f"Unsupported URI scheme: {uri.scheme}") id = uri.path if id is not None: id = id.lstrip("/video-file/") video = vj.video_files.get(id) logging.info(f"video is: {video}") return video.model_dump_json() raise ValueError(f"Video not found: {id}") @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: """ List available prompts. Each prompt can have optional arguments to customize its behavior. """ return [ types.Prompt( name="generate-local-search", description="Generate a local search for videos using appropriate label names from the Photos app.", arguments=[ types.PromptArgument( name="search_query", description="Natural language query to be translated into Photos app label names.", required=False, ) ], ) ] @server.get_prompt() async def handle_get_prompt( name: str, arguments: dict[str, str] | None ) -> types.GetPromptResult: """ Generate a prompt by combining arguments with server state. The prompt includes all current notes and can be customized via arguments. """ if name != "generate-local-search": raise ValueError(f"Unknown prompt: {name}") if not arguments: raise ValueError("Missing arguments") search_query = arguments.get("search_query") if not search_query: raise ValueError("Missing search_query") return types.GetPromptResult( description="Generate a local search for videos using appropriate label names from the Photos app.", messages=[ types.PromptMessage( role="user", content=types.TextContent( type="text", text=f"Here are the exact label names you need to match in your query:\n\n For the specific query: {search_query}, you should use the following labels: {photos_loader.db.labels_as_dict} for the search-local-videos tool", ), ) ], ) @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return [ types.Tool( name="add-video", description="Upload video from URL. Begins analysis of video to allow for later information retrieval for automatic video editing an search.", inputSchema={ "type": "object", "properties": { "name": {"type": "string"}, "url": {"type": "string"}, }, "required": ["name", "url"], }, ), types.Tool( name="search-remote-videos", description="Default method to search videos hosted on Video Jungle by a query", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "Text search query"}, "limit": { "type": "integer", "default": 10, "minimum": 1, "description": "Maximum number of results to return", }, "project_id": { "type": "string", "format": "uuid", "description": "Project ID to scope the search", }, "duration_min": { "type": "number", "minimum": 0, "description": "Minimum video duration in seconds", }, "duration_max": { "type": "number", "minimum": 0, "description": "Maximum video duration in seconds", }, }, "created_after": { "type": "string", "format": "date-time", "description": "Filter videos created after this datetime", }, "created_before": { "type": "string", "format": "date-time", "description": "Filter videos created before this datetime", }, "tags": { "type": "array", "items": {"type": "string"}, "uniqueItems": True, "description": "Set of tags to filter by", }, "include_segments": { "type": "boolean", "default": True, "description": "Whether to include video segments in results", }, "include_related": { "type": "boolean", "default": False, "description": "Whether to include related videos", }, "query_audio": { "type": "string", "description": "Audio search query", }, "query_img": { "type": "string", "description": "Image search query", }, "oneOf": [ {"required": ["query"]}, ], }, ), types.Tool( name="search-local-videos", description="Search user's local videos in Photos app by keyword", inputSchema={ "type": "object", "properties": { "keyword": {"type": "string"}, "start_date": { "type": "string", "description": "ISO 8601 formatted datetime string (e.g. 2024-01-21T15:30:00Z)", }, "end_date": { "type": "string", "description": "ISO 8601 formatted datetime string (e.g. 2024-01-21T15:30:00Z)", }, }, "required": ["keyword"], }, ), types.Tool( name="generate-edit-from-videos", description="Generate an edit from videos, from within a specific project. Creates a new project to work within no existing project ID (UUID) is passed ", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "Either an existing Project UUID or String. A UUID puts the edit in an existing project, and a string creates a new project with that name.", }, "name": {"type": "string", "description": "Video Edit name"}, "open_editor": { "type": "boolean", "description": "Open a live editor with the project's edit", }, "resolution": { "type": "string", "description": "Video resolution. Examples include '1920x1080', '1280x720'", }, "edit": { "type": "array", "cuts": { "video_id": {"type": "string", "description": "Video UUID"}, "video_start_time": { "type": "string", "description": "Clip start time in 00:00:00.000 format", }, "video_end_time": { "type": "string", "description": "Clip end time in 00:00:00.000 format", }, }, }, }, "required": ["edit", "cuts", "name", "project_id"], }, ), types.Tool( name="generate-edit-from-single-video", description="Generate a compressed video edit from a single video.", inputSchema={ "type": "object", "properties": { "project_id": {"type": "string"}, "resolution": {"type": "string"}, "video_id": {"type": "string"}, "edit": { "type": "array", "cuts": { "video_start_time": "time", "video_end_time": "time", }, }, }, "required": ["edit", "project_id", "video_id", "cuts"], }, ), types.Tool( name="update-video-edit", description="Update an existing video edit within a specific project.", inputSchema={ "type": "object", "properties": { "project_id": { "type": "string", "description": "UUID of the project containing the edit", }, "edit_id": { "type": "string", "description": "UUID of the video edit to update", }, "name": {"type": "string", "description": "Video Edit name"}, "description": { "type": "string", "description": "Description of the video edit", }, "video_output_format": { "type": "string", "description": "Output format for the video (e.g., 'mp4', 'webm')", }, "video_output_resolution": { "type": "string", "description": "Video resolution. Examples include '1920x1080', '1280x720'", }, "video_output_fps": { "type": "number", "description": "Frames per second for the output video", }, "video_series_sequential": { "type": "array", "description": "Array of video clips in sequential order", "items": { "type": "object", "properties": { "video_id": { "type": "string", "description": "Video UUID", }, "video_start_time": { "type": "string", "description": "Clip start time in 00:00:00.000 format", }, "video_end_time": { "type": "string", "description": "Clip end time in 00:00:00.000 format", }, }, }, }, "audio_overlay": { "type": "object", "description": "Audio overlay settings and assets", }, "rendered": { "type": "boolean", "description": "Whether the edit has been rendered", }, }, "required": ["project_id", "edit_id"], }, ), types.Tool( name="create-video-bar-chart-from-two-axis-data", description="Create a video bar chart from two-axis data", inputSchema={ "type": "object", "properties": { "x_values": {"type": "array", "items": {"type": "string"}}, "y_values": {"type": "array", "items": {"type": "number"}}, "x_label": {"type": "string"}, "y_label": {"type": "string"}, "title": {"type": "string"}, "filename": {"type": "string"}, }, "required": ["x_values", "y_values", "x_label", "y_label", "title"], }, ), types.Tool( name="create-video-line-chart-from-two-axis-data", description="Create a video line chart from two-axis data", inputSchema={ "type": "object", "properties": { "x_values": {"type": "array", "items": {"type": "string"}}, "y_values": {"type": "array", "items": {"type": "number"}}, "x_label": {"type": "string"}, "y_label": {"type": "string"}, "title": {"type": "string"}, "filename": {"type": "string"}, }, "required": ["x_values", "y_values", "x_label", "y_label", "title"], }, ), ] def format_single_video(video): """ Format a single video metadata tuple (metadata_dict, confidence_score) Returns a formatted string and a Python code string representation """ try: # Create human-readable format readable_format = f""" Video Embedding Result: ------------- Video ID: {video['video_id']} Description: {video['description']} Timestamp: {video['timepoint']} Detected Items: {', '.join(video['detected_items']) if video['detected_items'] else 'None'} """ except Exception as e: raise ValueError(f"Error formatting video: {str(e)}") return readable_format def filter_unique_videos_keep_first(json_results): seen = set() return [ item for item in json_results if item["video_id"] not in seen and not seen.add(item["video_id"]) ] def format_video_info(video): try: if video.get("script") is not None: if len(video.get("script")) > 200: script = video.get("script")[:200] + "..." else: script = video.get("script") else: script = "N/A" segments = [] for segment in video.get("matching_segments", []): segments.append( f"- Time: {segment.get('start_seconds', 'N/A')} to {segment.get('end_seconds', 'N/A')}" ) joined_segments = "\n".join(segments) return ( f"- Video Id: {video.get('video_id', 'N/A')}\n" f" Video name: {video.get('video', {}).get('name', 'N/A')}\n" f" URL to view video: {video.get('video', {}).get('url', 'N/A')}\n" f" Video manuscript: {script}" f" Matching scenes: {joined_segments}" f" Generated description: {video.get('video', 'N/A').get('generated_description', 'N/A')}" ) except Exception as e: return f"Error formatting video: {str(e)}" def format_video_info_long(video): try: if video.get("script") is not None: if len(video.get("script")) > 200: script = video.get("script")[:200] + "..." else: script = video.get("script") else: script = "N/A" return ( f"- Video Id: {video.get('video_id', 'N/A')}\n" f" Video name: {video.get('video', {}).get('name', 'N/A')}\n" f" URL to view video: {video.get('video', {}).get('url', 'N/A')}\n" f" Generated description: {video.get('video', 'N/A').get('generated_description', 'N/A')}" f" Video manuscript: {script}" f" Matching times: {video.get('scene_changes', 'N/A')}" ) except Exception as e: return f"Error formatting video: {str(e)}" @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests. Tools can modify server state and notify clients of changes. """ if name not in tools: raise ValueError(f"Unknown tool: {name}") if not arguments: raise ValueError("Missing arguments") if name == "add-video" and arguments: name = arguments.get("name") url = arguments.get("url") if not name or not url: raise ValueError("Missing name or content") # Update server state vj.video_files.create(name=name, filename=str(url), upload_method="url") # Notify clients that resources have changed await server.request_context.session.send_resource_list_changed() return [ types.TextContent( type="text", text=f"Added video '{name}' with url: {url}", ) ] if name == "search-remote-videos" and arguments: # Extract all possible search parameters query = arguments.get("query") # query_audio = arguments.get("query_audio") # query_img = arguments.get("query_img") limit = arguments.get("limit", 10) project_id = arguments.get("project_id") tags = arguments.get("tags", None) duration_min = arguments.get("duration_min", None) duration_max = arguments.get("duration_max", None) created_after = arguments.get("created_after", None) created_before = arguments.get("created_before", None) include_segments = arguments.get("include_segments", True) include_related = arguments.get("include_related", False) # Validate that at least one query type is provided if not query and not tags: raise ValueError("At least one query or tag must be provided") # Perform the main search with all parameters if tags: search_params = { "limit": limit, "include_segments": include_segments, "include_related": include_related, "tags": json.loads(tags), "duration_min": duration_min, "duration_max": duration_max, "created_after": created_after, "created_before": created_before, } else: search_params = { "limit": limit, "include_segments": include_segments, "include_related": include_related, "duration_min": duration_min, "duration_max": duration_max, "created_after": created_after, "created_before": created_before, } # Add optional parameters if query: search_params["query"] = query if project_id: search_params["project_id"] = project_id videos = [] if query: embeddings = model_loader.encode_text(query) # logging.info(f"Embeddings are: {embeddings}") response = model_loader.post_embeddings( embeddings, "https://api.video-jungle.com/video-file/embedding-search", headers={"Content-Type": "application/json", "X-API-KEY": VJ_API_KEY}, ) logging.info(f"Response is: {response.json()}") if response.status_code != 200: raise RuntimeError(f"Error searching for videos: {response.text}") videos = response.json() embedding_search_response = [format_single_video(video) for video in videos] # Format response based on number of results if len(videos) <= 3 and len(videos) >= 1: return [ types.TextContent( type="text", text=format_video_info_long(video), ) for video in videos ] videos = vj.video_files.search(**search_params) logging.info(f"num videos are: {len(videos)}") # Combine embedding search results and regular search results response_text = [] response_text.append(f"Number of Videos Returned: {len(videos)}") response_text.extend(format_video_info(video) for video in videos) if query: # Only include embedding search results if text query was used response_text.append( f"Number of embedding search results: {len(embedding_search_response)}" ) response_text.extend(embedding_search_response) logging.info(f"Videos returned are {videos}") return [ types.TextContent( type="text", text="\n".join(response_text), ) ] if name == "search-local-videos" and arguments: if not os.environ.get("LOAD_PHOTOS_DB"): raise ValueError( "You must set the LOAD_PHOTOS_DB environment variable to True to use this tool" ) keyword = arguments.get("keyword") if not keyword: raise ValueError("Missing keyword") start_date = None end_date = None if arguments.get("start_date") and arguments.get("end_date"): start_date = arguments.get("start_date") end_date = arguments.get("end_date") try: db = photos_loader.db videos = get_videos_by_keyword(db, keyword, start_date, end_date) return [ types.TextContent( type="text", text=( f"Number of Videos Returned: {len(videos)}. Here are the first 100 results: \n{videos[:100]}" ), ) ] except Exception: raise RuntimeError("Local Photos database not yet initialized") if name == "generate-edit-from-videos" and arguments: edit = arguments.get("edit") project = arguments.get("project_id") name = arguments.get("name") open_editor = arguments.get("open_editor") resolution = arguments.get("resolution") created = False logging.info(f"edit is: {edit} and the type is: {type(edit)}") if open_editor is None: open_editor = True if not edit: raise ValueError("Missing edit") if not project: raise ValueError("Missing project") if not resolution: resolution = "1080x1920" if not name: raise ValueError("Missing name for edit") if resolution == "1080p": resolution = "1920x1080" elif resolution == "720p": resolution = "1280x720" try: w, h = resolution.split("x") _ = f"{int(w)}x{int(w)}" except Exception as e: raise ValueError( f"Resolution must be in the format 'widthxheight' where width and height are integers: {e}" ) updated_edit = [ { "video_id": cut["video_id"], "video_start_time": cut["video_start_time"], "video_end_time": cut["video_end_time"], "type": "videofile", "audio_levels": [ { "audio_level": 0.5, "start_time": cut["video_start_time"], "end_time": cut["video_end_time"], } ], } for cut in edit ] logging.info(f"updated edit is: {updated_edit}") json_edit = { "video_edit_version": "1.0", "video_output_format": "mp4", "video_output_resolution": resolution, "video_output_fps": 60.0, "name": name, "video_output_filename": "output_video.mp4", "audio_overlay": [], # TODO: add this back in "video_series_sequential": updated_edit, "skip_rendering": True, } try: proj = vj.projects.get(project) except Exception as e: logging.info(f"project not found, creating new project because {e}") proj = vj.projects.create( name=project, description="Claude generated project" ) project = proj.id created = True logging.info(f"video edit is: {json_edit}") edit = vj.projects.render_edit(project, json_edit) with open(f"{project}.json", "w") as f: json.dump(json_edit, f, indent=4) webbrowser.open( f"https://app.video-jungle.com/projects/{proj.id}/edits/{edit['edit_id']}" ) global BROWSER_OPEN BROWSER_OPEN = True # the following generates an edit for opentimeline, allowing you to open it in # a desktop video editor like final cut pro, etc. try: env_vars = {"VJ_API_KEY": VJ_API_KEY, "PATH": os.environ["PATH"]} subprocess.Popen( [ "uv", "run", "python", "./src/video_editor_mcp/generate_opentimeline.py", "--file", f"{project}.json", "--output", f"{project}.otio", ], env=env_vars, ) os.chdir("./tools") logging.info(f"in directory: {os.getcwd()}") # don't block, because this might take a while env_vars = {"VJ_API_KEY": VJ_API_KEY, "PATH": os.environ["PATH"]} logging.info( f"launching viewer with: {edit['asset_id']} {project}.mp4 {proj.name}" ) subprocess.Popen( [ "uv", "run", "viewer", edit["asset_id"], f"video-edit-{project}.mp4", proj.name, ], env=env_vars, ) except Exception as e: logging.info(f"Error running viewer: {e}") if created: # we created a new project so let the user / LLM know return [ types.TextContent( type="text", text=f"Created new project {proj.name} with id '{proj.id}' and created edit {edit} with raw edit info: {updated_edit}", ) ] return [ types.TextContent( type="text", text=f"Generated edit in existing project {proj.name} with id '{proj.id}' with generated asset info: {edit} and raw edit info: {updated_edit}", ) ] if name == "generate-edit-from-single-video" and arguments: edit = arguments.get("edit") project = arguments.get("project_id") video_id = arguments.get("video_id") resolution = arguments.get("resolution") created = False logging.info(f"edit is: {edit} and the type is: {type(edit)}") if not edit: raise ValueError("Missing edit") if not project: raise ValueError("Missing project") if not video_id: raise ValueError("Missing video_id") if not resolution: resolution = "1080x1920" try: w, h = resolution.split("x") _ = f"{int(w)}x{int(w)}" except Exception as e: raise ValueError( f"Resolution must be in the format 'widthxheight' where width and height are integers: {e}" ) try: updated_edit = [ { "video_id": video_id, "video_start_time": cut["video_start_time"], "video_end_time": cut["video_end_time"], "type": "video-file", "audio_levels": [], } for cut in edit ] except Exception as e: raise ValueError(f"Error updating edit: {e}") logging.info(f"updated edit is: {updated_edit}") json_edit = { "video_edit_version": "1.0", "video_output_format": "mp4", "video_output_resolution": resolution, "video_output_fps": 60.0, "video_output_filename": "output_video.mp4", "audio_overlay": [], # TODO: add this back in "video_series_sequential": updated_edit, } try: proj = vj.projects.get(project) except Exception: proj = vj.projects.create( name=project, description="Claude generated project" ) project = proj.id created = True logging.info(f"video edit is: {json_edit}") try: edit = vj.projects.render_edit(project, json_edit) except Exception as e: logging.error(f"Error rendering edit: {e}") logging.info(f"edit is: {edit}") try: os.chdir("./tools") logging.info(f"in directory: {os.getcwd()}") # don't block, because this might take a while env_vars = {"VJ_API_KEY": VJ_API_KEY, "PATH": os.environ["PATH"]} logging.info( f"launching viewer with: {edit['asset_id']} {project}.mp4 {proj.name}" ) subprocess.Popen( [ "uv", "run", "viewer", edit["asset_id"], f"video-edit-{project}.mp4", proj.name, ], env=env_vars, ) except Exception as e: logging.info(f"Error running viewer: {e}") if created: # we created a new project so let the user / LLM know logging.info(f"created new project {proj.name} and created edit {edit}") return [ types.TextContent( type="text", text=f"Created new project {proj.name} with project id '{proj.id}' and raw edit info: {edit}", ) ] return [ types.TextContent( type="text", text=f"Generated edit in project {proj.name} with project id '{proj.id}' and raw edit info: {edit}", ) ] if name == "update-video-edit" and arguments: project_id = arguments.get("project_id") edit_id = arguments.get("edit_id") edit_name = arguments.get("name") description = arguments.get("description") video_output_format = arguments.get("video_output_format") video_output_resolution = arguments.get("video_output_resolution") video_output_fps = arguments.get("video_output_fps") video_series_sequential = arguments.get("video_series_sequential") audio_overlay = arguments.get("audio_overlay") rendered = arguments.get("rendered") # Validate required parameters if not project_id: raise ValueError("Missing project_id") if not edit_id: raise ValueError("Missing edit_id") # Process resolution format like in create function if video_output_resolution: if video_output_resolution == "1080p": video_output_resolution = "1920x1080" elif video_output_resolution == "720p": video_output_resolution = "1280x720" # Validate resolution format try: w, h = video_output_resolution.split("x") _ = f"{int(w)}x{int(h)}" except Exception as e: raise ValueError( f"Resolution must be in the format 'widthxheight' where width and height are integers: {e}" ) # Try to get the existing project try: proj = vj.projects.get(project_id) except Exception as e: raise ValueError(f"Project with ID {project_id} not found: {e}") # Try to get the existing edit try: existing_edit = vj.projects.get_edit(project_id, edit_id) except Exception as e: raise ValueError( f"Edit with ID {edit_id} not found in project {project_id}: {e}" ) # Process video clips if provided updated_video_series = None if video_series_sequential: updated_video_series = [ { "video_id": clip["video_id"], "video_start_time": clip["video_start_time"], "video_end_time": clip["video_end_time"], "type": "videofile", "audio_levels": [ { "audio_level": 0.5, "start_time": clip["video_start_time"], "end_time": clip["video_end_time"], } ], } for clip in video_series_sequential ] # Create an empty dictionary without type annotations update_json = dict() # Add fields one by one with explicit type handling update_json["video_edit_version"] = "1.0" if edit_name: update_json["name"] = edit_name if description: update_json["description"] = description if video_output_format: update_json["video_output_format"] = video_output_format if video_output_resolution: update_json["video_output_resolution"] = video_output_resolution if video_output_fps is not None: update_json["video_output_fps"] = float(video_output_fps) if updated_video_series is not None: # Cast to a list to ensure proper typing update_json["video_series_sequential"] = list(updated_video_series) if audio_overlay is not None: # Cast to a list to ensure proper typing update_json["audio_overlay"] = list(audio_overlay) if audio_overlay else [] # Skip rendering by default like in create function update_json["skip_rendering"] = bool(True) # If rendering is explicitly requested if rendered is True: update_json["skip_rendering"] = bool(False) logging.info(f"Updating edit {edit_id} with: {update_json}") # Call the API to update the edit updated_edit = vj.projects.update_edit(project_id, edit_id, update_json) # Optionally open the browser to the updated edit if not BROWSER_OPEN: webbrowser.open( f"https://app.video-jungle.com/projects/{project_id}/edits/{edit_id}" ) return [ types.TextContent( type="text", text=f"Updated edit {edit_id} in project {proj.name} with changes: {update_json}", ) ] if ( name == "create-video-bar-chart-from-two-axis-data" or "create-video-line-chart-from-two-axis-data" and arguments ): x_values = arguments.get("x_values") y_values = arguments.get("y_values") x_label = arguments.get("x_label") y_label = arguments.get("y_label") title = arguments.get("title") filename = arguments.get("filename") if not x_values or not y_values or not x_label or not y_label or not title: raise ValueError("Missing required arguments") if not filename: if name == "create-video-bar-chart-from-two-axis-data": filename = "bar_chart.mp4" elif name == "create-video-line-chart-from-two-axis-data": filename = "line_chart.mp4" else: raise ValueError("Invalid tool name") y_axis_safe = validate_y_values(y_values) if not y_axis_safe: raise ValueError("Y values are not valid") # Render the bar chart data = { "x_values": x_values, "y_values": y_values, "x_label": x_label, "y_label": y_label, "title": title, "filename": filename, } with open("chart_data.json", "w") as f: json.dump(data, f, indent=4) file_path = os.path.join(os.getcwd(), "media/videos/720p30/", filename) if name == "create-video-bar-chart-from-two-axis-data": subprocess.Popen( [ "uv", "run", "src/video_editor_mcp/generate_charts.py", "chart_data.json", "bar", ] ) return [ types.TextContent( type="text", text=f"Bar chart video generated.\nSaved to {file_path}", ) ] elif name == "create-video-line-chart-from-two-axis-data": subprocess.Popen( [ "uv", "run", "src/video_editor_mcp/generate_charts.py", "chart_data.json", "line", ] ) return [ types.TextContent( type="text", text=f"Line chart video generated.\nSaved to {file_path}", ) ] async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="video-jungle-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), )