Supabase MCP Server
- src
- video_editor_mcp
import logging
import os
import subprocess
import sys
import threading
from typing import List, Optional, Union
import mcp.server.stdio
import mcp.types as types
import osxphotos
import requests
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from pydantic import AnyUrl
from transformers import AutoModel
from videojungle import ApiClient
from .search_local_videos import get_videos_by_keyword
if os.environ.get("VJ_API_KEY"):
VJ_API_KEY = os.environ.get("VJ_API_KEY")
else:
try:
VJ_API_KEY = sys.argv[1]
except Exception:
VJ_API_KEY = None
# Configure the logging
logging.basicConfig(
filename="app.log", # Name of the log file
level=logging.INFO, # Log level (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Log format
)
if not VJ_API_KEY:
try:
with open(".env", "r") as f:
for line in f:
if "VJ_API_KEY" in line:
VJ_API_KEY = line.split("=")[1]
except Exception:
raise Exception(
"VJ_API_KEY environment variable is required or a .env file with the key is required"
)
raise Exception("VJ_API_KEY environment variable is required")
vj = ApiClient(VJ_API_KEY)
class PhotosDBLoader:
def __init__(self):
self._db: Optional[osxphotos.PhotosDB] = None
self.start_loading()
def start_loading(self):
def load():
self._db = osxphotos.PhotosDB()
logging.info("PhotosDB loaded")
thread = threading.Thread(target=load)
thread.daemon = True # Make thread exit when main program exits
thread.start()
@property
def db(self) -> osxphotos.PhotosDB:
if self._db is None:
raise Exception("PhotosDB still loading")
return self._db
class EmbeddingModelLoader:
def __init__(self, model_name: str = "jinaai/jina-clip-v1"):
self._model: Optional[AutoModel] = None
self.model_name = model_name
self.start_loading()
def start_loading(self):
def load():
self._model = AutoModel.from_pretrained(
self.model_name, trust_remote_code=True
)
logging.info(f"Model {self.model_name} loaded")
thread = threading.Thread(target=load)
thread.daemon = True
thread.start()
@property
def model(self) -> AutoModel:
if self._model is None:
raise Exception(f"Model {self.model_name} still loading")
return self._model
def encode_text(
self,
texts: Union[str, List[str]],
truncate_dim: Optional[int] = None,
task: Optional[str] = None,
) -> dict:
"""
Encode text and format the embeddings in the expected JSON structure
"""
embeddings = self.model.encode_text(texts, truncate_dim=truncate_dim, task=task)
# Format the response in the expected structure
return {"embeddings": embeddings.tolist(), "embedding_type": "text_embeddings"}
def encode_image(
self, images: Union[str, List[str]], truncate_dim: Optional[int] = None
) -> dict:
"""
Encode images and format the embeddings in the expected JSON structure
"""
embeddings = self.model.encode_image(images, truncate_dim=truncate_dim)
return {"embeddings": embeddings.tolist(), "embedding_type": "image_embeddings"}
def post_embeddings(
self, embeddings: dict, endpoint_url: str, headers: Optional[dict] = None
) -> requests.Response:
"""
Post embeddings to the specified endpoint
"""
if headers is None:
headers = {"Content-Type": "application/json"}
response = requests.post(endpoint_url, json=embeddings, headers=headers)
response.raise_for_status()
return response
# Create global loader instance, (requires access to host computer!)
if sys.platform == "darwin" and os.environ.get("LOAD_PHOTOS_DB"):
photos_loader = PhotosDBLoader()
model_loader = EmbeddingModelLoader()
server = Server("video-jungle-mcp")
try:
videos_at_start = vj.video_files.list()
except Exception as e:
logging.error(f"Error getting videos at start: {e}")
videos_at_start = []
counter = 10
tools = [
"add-video",
"search-local-videos",
"search-remote-videos",
"generate-edit-from-videos",
"generate-edit-from-single-video",
]
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
"""
List available video files.
Each video files is available at a specific url
"""
global counter, videos_at_start
counter += 1
if counter % 10 == 0:
videos = vj.video_files.list()
videos_at_start = videos
counter = 0
videos = [
types.Resource(
uri=AnyUrl(f"vj://video-file/{video.id}"),
name=f"Video Jungle Video: {video.name}",
description=f"User provided description: {video.description}",
mimeType="video/mp4",
)
for video in videos_at_start
]
"""
projects = [
types.Resource(
uri=AnyUrl(f"vj://project/{project.id}"),
name=f"Video Jungle Project: {project.name}",
description=f"Project description: {project.description}",
mimeType="application/json",
)
for project in projects
]"""
return videos # + projects
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
"""
Read a video's content by its URI.
The video id is extracted from the URI host component.
"""
if uri.scheme != "vj":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
id = uri.path
if id is not None:
id = id.lstrip("/video-file/")
video = vj.video_files.get(id)
logging.info(f"video is: {video}")
return video.model_dump_json()
raise ValueError(f"Video not found: {id}")
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
"""
List available prompts.
Each prompt can have optional arguments to customize its behavior.
"""
return [
types.Prompt(
name="generate-local-search",
description="Generate a local search for videos using appropriate label names from the Photos app.",
arguments=[
types.PromptArgument(
name="search_query",
description="Natural language query to be translated into Photos app label names.",
required=False,
)
],
)
]
@server.get_prompt()
async def handle_get_prompt(
name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
"""
Generate a prompt by combining arguments with server state.
The prompt includes all current notes and can be customized via arguments.
"""
if name != "generate-local-search":
raise ValueError(f"Unknown prompt: {name}")
if not arguments:
raise ValueError("Missing arguments")
search_query = arguments.get("search_query")
if not search_query:
raise ValueError("Missing search_query")
return types.GetPromptResult(
description="Generate a local search for videos using appropriate label names from the Photos app.",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Here are the exact label names you need to match in your query:\n\n For the specific query: {search_query}, you should use the following labels: {photos_loader.db.labels_as_dict} for the search-local-videos tool",
),
)
],
)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Each tool specifies its arguments using JSON Schema validation.
"""
return [
types.Tool(
name="add-video",
description="Upload video from URL",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"url": {"type": "string"},
},
"required": ["name", "url"],
},
),
types.Tool(
name="search-remote-videos",
description="Search remote videos hosted on Video Jungle by query",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
},
"required": ["query"],
},
),
types.Tool(
name="search-local-videos",
description="Search local videos in Photos app by keyword",
inputSchema={
"type": "object",
"properties": {
"keyword": {"type": "string"},
},
"required": ["keyword"],
},
),
types.Tool(
name="generate-edit-from-videos",
description="Generate an edit from videos",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string"},
"resolution": {"type": "string"},
"edit": {
"type": "array",
"cuts": {
"video_id": "string",
"video_start_time": "time",
"video_end_time": "time",
},
},
},
"required": ["edit", "project_id", "cuts"],
},
),
types.Tool(
name="generate-edit-from-single-video",
description="Generate a video edit from a single video",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string"},
"resolution": {"type": "string"},
"video_id": {"type": "string"},
"edit": {
"type": "array",
"cuts": {
"video_start_time": "time",
"video_end_time": "time",
},
},
},
"required": ["edit", "project_id", "video_id", "cuts"],
},
),
]
def format_single_video(video):
"""
Format a single video metadata tuple (metadata_dict, confidence_score)
Returns a formatted string and a Python code string representation
"""
try:
# Create human-readable format
readable_format = f"""
Video Embedding Result:
-------------
Video ID: {video['video_id']}
Description: {video['description']}
Timestamp: {video['timepoint']}
Detected Items: {', '.join(video['detected_items']) if video['detected_items'] else 'None'}
"""
except Exception as e:
raise ValueError(f"Error formatting video: {str(e)}")
return readable_format
def format_video_info(video):
try:
if video.get("script") is not None:
if len(video.get("script")) > 200:
script = video.get("script")[:200] + "..."
else:
script = video.get("script")
else:
script = "N/A"
return (
f"- Video Id: {video.get('video_id', 'N/A')}\n"
f" Video name: {video.get('video', {}).get('name', 'N/A')}\n"
f" URL to view video: {video.get('video', {}).get('url', 'N/A')}\n"
f" Video manuscript: {script}"
f" Generated description: {video.get('generated_descriptioon', 'N/A')}"
)
except Exception as e:
return f"Error formatting video: {str(e)}"
def format_video_info_long(video):
try:
if video.get("script") is not None:
if len(video.get("script")) > 200:
script = video.get("script")[:200] + "..."
else:
script = video.get("script")
else:
script = "N/A"
return (
f"- Video Id: {video.get('video_id', 'N/A')}\n"
f" Video name: {video.get('video', {}).get('name', 'N/A')}\n"
f" URL to view video: {video.get('video', {}).get('url', 'N/A')}\n"
f" Video manuscript: {script}"
f" Video scenes: {video.get('scene_changes', 'N/A')}"
)
except Exception as e:
return f"Error formatting video: {str(e)}"
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool execution requests.
Tools can modify server state and notify clients of changes.
"""
if name not in tools:
raise ValueError(f"Unknown tool: {name}")
if not arguments:
raise ValueError("Missing arguments")
if name == "add-video" and arguments:
name = arguments.get("name")
url = arguments.get("url")
if not name or not url:
raise ValueError("Missing name or content")
# Update server state
vj.video_files.create(name=name, filename=str(url), upload_method="url")
# Notify clients that resources have changed
await server.request_context.session.send_resource_list_changed()
return [
types.TextContent(
type="text",
text=f"Added video '{name}' with url: {url}",
)
]
if name == "search-remote-videos" and arguments:
query = arguments.get("query")
if not query:
raise ValueError("Missing query")
embeddings = model_loader.encode_text(query)
logging.info(f"Embeddings are: {embeddings}")
response = model_loader.post_embeddings(
embeddings,
"https://api.video-jungle.com/video-file/embedding-search",
headers={"Content-Type": "application/json", "X-API-KEY": VJ_API_KEY},
)
logging.info(f"Response is: {response.json()}")
if response.status_code != 200:
raise RuntimeError(f"Error searching for videos: {response.text}")
else:
videos = response.json()
embedding_search_response = [format_single_video(video) for video in videos]
videos = vj.video_files.search(query)
logging.info(f"num videos are: {len(videos)}")
if videos:
logging.info(f"{videos[0]}")
if len(videos) <= 3:
return [
types.TextContent(
type="text",
text=format_video_info_long(video),
)
for video in videos
]
# try to fit into context window
b = [
types.TextContent(
type="text",
text=(
f"Number of embedding search results: {len(embedding_search_response)}\n\n"
+ "\n".join(embedding_search_response)
+ f"Number of Videos Returned: {len(videos)}\n\n"
+ "\n".join(format_video_info(video) for video in videos)
),
)
]
return b # type: ignore
if name == "search-local-videos" and arguments:
if not os.environ.get("LOAD_PHOTOS_DB"):
raise ValueError(
"You must set the LOAD_PHOTOS_DB environment variable to True to use this tool"
)
keyword = arguments.get("keyword")
if not keyword:
raise ValueError("Missing keyword")
try:
db = photos_loader.db
videos = get_videos_by_keyword(db, keyword)
return [
types.TextContent(
type="text",
text=(
f"Number of Videos Returned: {len(videos)}\n\nShowing first 100:"
+ "\n".join(f"- {video}" for video in videos[:100])
),
)
]
except Exception:
raise RuntimeError("Local Photos database not yet initialized")
if name == "generate-edit-from-videos" and arguments:
edit = arguments.get("edit")
project = arguments.get("project_id")
resolution = arguments.get("resolution")
created = False
logging.info(f"edit is: {edit} and the type is: {type(edit)}")
if not edit:
raise ValueError("Missing edit")
if not project:
raise ValueError("Missing project")
if not resolution:
resolution = "1080x1920"
if resolution == "1080p":
resolution = "1920x1080"
elif resolution == "720p":
resolution = "1280x720"
try:
w, h = resolution.split("x")
_ = f"{int(w)}x{int(w)}"
except Exception as e:
raise ValueError(
f"Resolution must be in the format 'widthxheight' where width and height are integers: {e}"
)
updated_edit = [
{
**cut,
"type": "videofile",
"audio_levels": [
{
"audio_level": "0.5",
"start_time": cut["video_start_time"],
"end_time": cut["video_end_time"],
}
],
}
for cut in edit
]
logging.info(f"updated edit is: {updated_edit}")
json_edit = {
"video_edit_version": "1.0",
"video_output_format": "mp4",
"video_output_resolution": resolution,
"video_output_fps": 60.0,
"video_output_filename": "output_video.mp4",
"audio_overlay": [], # TODO: add this back in
"video_series_sequential": updated_edit,
}
try:
proj = vj.projects.get(project)
except Exception as e:
logging.info(f"project not found, creating new project because {e}")
proj = vj.projects.create(
name=project, description="Claude generated project"
)
project = proj.id
created = True
logging.info(f"video edit is: {json_edit}")
edit = vj.projects.render_edit(project, json_edit)
try:
os.chdir("./tools")
logging.info(f"in directory: {os.getcwd()}")
# don't block, because this might take a while
env_vars = {"VJ_API_KEY": VJ_API_KEY, "PATH": os.environ["PATH"]}
logging.info(
f"launching viewer with: {edit['asset_id']} {project}.mp4 {proj.name}"
)
subprocess.Popen(
[
"uv",
"run",
"viewer",
edit["asset_id"],
f"video-edit-{project}.mp4",
proj.name,
],
env=env_vars,
)
except Exception as e:
logging.info(f"Error running viewer: {e}")
if created:
# we created a new project so let the user / LLM know
return [
types.TextContent(
type="text",
text=f"Created new project {proj.name} and created edit {edit} with raw edit info: {updated_edit}",
)
]
return [
types.TextContent(
type="text",
text=f"Generated edit in existing project {proj.name} with generated asset info: {edit} and raw edit info: {updated_edit}",
)
]
if name == "generate-edit-from-single-video" and arguments:
edit = arguments.get("edit")
project = arguments.get("project_id")
video_id = arguments.get("video_id")
resolution = arguments.get("resolution")
created = False
logging.info(f"edit is: {edit} and the type is: {type(edit)}")
if not edit:
raise ValueError("Missing edit")
if not project:
raise ValueError("Missing project")
if not video_id:
raise ValueError("Missing video_id")
if not resolution:
resolution = "1080x1920"
try:
w, h = resolution.split("x")
_ = f"{int(w)}x{int(w)}"
except Exception as e:
raise ValueError(
f"Resolution must be in the format 'widthxheight' where width and height are integers: {e}"
)
try:
updated_edit = [
{
"video_id": video_id,
"video_start_time": cut["video_start_time"],
"video_end_time": cut["video_end_time"],
"type": "videofile",
"audio_levels": [
{
"audio_level": "0.5",
"start_time": cut["video_start_time"],
"end_time": cut["video_end_time"],
}
],
}
for cut in edit
]
except Exception as e:
raise ValueError(f"Error updating edit: {e}")
logging.info(f"updated edit is: {updated_edit}")
json_edit = {
"video_edit_version": "1.0",
"video_output_format": "mp4",
"video_output_resolution": resolution,
"video_output_fps": 60.0,
"video_output_filename": "output_video.mp4",
"audio_overlay": [], # TODO: add this back in
"video_series_sequential": updated_edit,
}
try:
proj = vj.projects.get(project)
except Exception:
proj = vj.projects.create(
name=project, description="Claude generated project"
)
project = proj.id
created = True
logging.info(f"video edit is: {json_edit}")
edit = vj.projects.render_edit(project, json_edit)
logging.info(f"edit is: {edit}")
try:
os.chdir("./tools")
logging.info(f"in directory: {os.getcwd()}")
# don't block, because this might take a while
env_vars = {"VJ_API_KEY": VJ_API_KEY, "PATH": os.environ["PATH"]}
logging.info(
f"launching viewer with: {edit['asset_id']} {project}.mp4 {proj.name}"
)
subprocess.Popen(
[
"uv",
"run",
"viewer",
edit["asset_id"],
f"video-edit-{project}.mp4",
proj.name,
],
env=env_vars,
)
except Exception as e:
logging.info(f"Error running viewer: {e}")
if created:
# we created a new project so let the user / LLM know
logging.info(f"created new project {proj.name} and created edit {edit}")
return [
types.TextContent(
type="text",
text=f"Created new project {proj.name} with raw edit info: {edit}",
)
]
return [
types.TextContent(
type="text",
text=f"Generated edit in project {proj.name} with raw edit info: {edit}",
)
]
async def main():
# Run the server using stdin/stdout streams
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="video-jungle-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)