We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/zakahan/vedit-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import os
import shutil
import argparse
import subprocess
from pathlib import Path
from mcp.server.fastmcp import FastMCP
from loguru import logger
from logging import INFO
from typing import Optional
# The description of the argument is as follows.
# 1. `kb_dir`:(Required)It is used to set the base path where various video files for operation are located. (For details, please check the code.)
# 2. `using_logger`: (Optional) It is used to set whether to use the logger. If it is set to "True",
# the output will be generated. If it is not set or set to other values, no log will be output.
# 3. `logger_level`:(Optional) The log level. By default, it is set to the DEBUG level. You can set it
# to any one of "DEBUG", "INFO", "ERROR", and "WARNING" by yourself.
# 4. `logger_file_dir`: (Required when `using_logger="True"`), The directory for log output. It is required that this directory must already exist.
# -----------------------------------------------------------------------------
# Setting Arguments Variable
# Logging Configuration
parser = argparse.ArgumentParser(description='Configure script parameters via command-line arguments')
parser.add_argument('--using_logger', choices=['True', 'False'], default='False',
help='Whether to use the logger. Valid values are True or False, default is False')
parser.add_argument('--logger_file_dir', default=None,
help='Directory for the log file. If the logger is used, this directory must be specified')
parser.add_argument('--logger_level', default='DEBUG',
help='Logging level for the logger, default is DEBUG')
parser.add_argument('--kb_dir', default=None,
help='Base path for the video folder')
args = parser.parse_args()
if args.using_logger == "True":
USING_LOGGER = True
LOGGER_FILE_DIR = args.logger_file_dir
else:
USING_LOGGER = False # default
LOGGER_FILE_DIR = None
# ---
LOGGER_LEVEL = args.logger_level
# LOGGER_FILE_DIR: this folder must:
# This directory must already exist, and an mcp.log file will be created here to record logs.
# If you have any other ideas, please modify the code yourself.
# Video Folder Base
KB_DIR = args.kb_dir
KB_CLIP = "clip"
KB_MERGE = "merge"
KB_RESULT = "result"
KB_ADD = "add" # add bgm, and add ... what i don't know.....
# -----------------------------------------------------------------------------
# Config Logger
def get_logger():
if not USING_LOGGER:
logger.remove()
return logger
if LOGGER_LEVEL not in {"DEBUG", "INFO", "ERROR", "CRITICAL"}:
raise ValueError(f"`logger_level` Error: the logger level is not exists: {LOGGER_LEVEL}")
if LOGGER_FILE_DIR is None:
raise ValueError("`logger_file_dir` Error: If you set `using_logger` to `True`, then you must configure " \
"the argument variable `logger_file_dir` and " \
"ensure that this directory already exists. ")
elif not os.path.exists(os.path.abspath(LOGGER_FILE_DIR)):
raise ValueError(f"`logger_file_dir` Error: The argument `logger_file_dir`={LOGGER_FILE_DIR} must already" \
"exist, we can not find it now.")
log_path = os.path.join(LOGGER_FILE_DIR, 'logs' ,'mcp.log')
logger.add(log_path, rotation="500 MB", retention="10 days", level=LOGGER_LEVEL,
format="{time} | {level} | " + "__VEDIO_EDITOR_SERVER__" + ":{function}:{line} - {message}")
return logger
# Check the path
def check_paths():
if KB_DIR is None:
raise ValueError("`kb_dir` Error: KB_DIR is None, you must configure the argument variable " \
"KB_DIR and ensure that this path truly exists. The function of this path is to " \
"store the original video files, temporary files, and result files.")
elif not os.path.exists(os.path.abspath(KB_DIR)):
raise ValueError(
f"`kb_dir` Error: The argument `kb_dir`={KB_DIR} must already exist, we can not find it now.")
logger = get_logger()
check_paths()
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# start of functions -----------------------------------------------------------
# ------------------------------------------------------------------------------
# Underlying Implementation ----------------------------------------------------
# """
# ## Plan
# Actually, at a cursory glance, video editing can still be divided into the following operations: create, read, update, and delete.
# Create: Add filters, add subtitles, add background music, etc.
# Delete: Delete specified segments, delete subtitles, delete audio tracks, etc.
# Update: Merge multiple videos, cut out a small segment from the original video.
# Read: Query basic information of the original video, such as duration, frame rate, etc. (It seems this isn't very useful).
# ## What has been achieved so far
# 1. clip_video_tool: cut out a small segment from the original video.
# 2. move_videos_tool: merge multiple videos.
# 3. add_bgm_tool: add background music.
# """
# -----------------------------------------------------------------------------
# clip_video:
def clip_video(
original_video_path: str,
save_folder: str,
start_time: int,
stop_time: int,
title: str,
) -> tuple[bool, str, str]:
"""
Cut a video and save it to a specified folder.
:param original_video_path: The path of the original video file.
:param save_folder: The path of the folder where the cut video will be saved.
:param start_time: The start time for cutting (in seconds; this time unit is sufficient for most operations).
:param stop_time: The end time for cutting (in seconds).
:return: A tuple where the first element is a boolean indicating whether the operation was successful,
the second element is the output location, and the third element is the log information.
"""
logger.debug("-----------------------------------------------------------------------------")
logger.debug("Parameter check <clip_video> ------------------------------------------------")
logger.debug(f"origin_video_path: {original_video_path}")
logger.debug(f"save_folder: {save_folder}")
logger.debug(f"start_time: {start_time}")
logger.debug(f"stop_time: {stop_time}")
logger.debug("-----------------------------------------------------------------------------")
# Check if the original video file exists
if not os.path.isfile(original_video_path):
error_msg = f"Error: The original video file does not exist."
logger.error(error_msg)
return False, "", error_msg
_, file_extension = os.path.splitext(original_video_path)
# Check if the target folder exists. If it doesn't exist, create it.
if not os.path.exists(save_folder):
try:
os.makedirs(save_folder)
except OSError as e:
error_msg = f"Error: Failed to create the folder. Error message: {e}"
logger.error(error_msg)
return False, "", error_msg
# Generate the output file path
output_path = os.path.join(save_folder, f"{title}{file_extension}")
try:
# Build the FFmpeg command
command = [
'ffmpeg',
'-y',
'-ss', str(start_time),
'-to', str(stop_time),
'-i', original_video_path,
'-c', 'copy',
output_path
]
# Execute the FFmpeg command
subprocess.run(command, check=True)
success_msg = f"The video has been successfully cut and is being saved. "
logger.info(success_msg)
return True, output_path, success_msg
except subprocess.CalledProcessError as e:
error_msg = f"Error: An error occurred while executing the FFmpeg command. Error message: {e}"
logger.error(error_msg)
return False, "", error_msg
# merge_videos
def merge_videos(video_paths: list[str], save_folder: str) -> tuple[bool, str, str]:
"""
Merge multiple local video files.
:param video_paths: A list containing the paths of video files.
:param save_folder: The folder where the merged video will be saved.
:return: A tuple where the first element is a boolean indicating whether the operation was successful,
the second element is the output path, and the third element is the log information.
"""
logger.debug("-----------------------------------------------------------------------------")
logger.debug("Parameter check <merge_videos> ----------------------------------------------")
logger.debug(f"video_path: {str(video_paths)}")
logger.debug(f"save_folder: {save_folder}")
logger.debug("-----------------------------------------------------------------------------")
# Check if all video files exist
for path in video_paths:
if not os.path.isfile(path):
error_msg = f"Error: The video file does not exist."
logger.error(error_msg)
return False, "", error_msg
# Check if the target folder exists. If not, create it.
if not os.path.exists(save_folder):
try:
os.makedirs(save_folder)
except OSError as e:
error_msg = f"Error: Failed to create the folder. Error message: {e}"
logger.error(error_msg)
return False, "", error_msg
# Create a temporary file list
temp_file_list = 'temp_file_list.txt'
try:
with open(temp_file_list, 'w', encoding='utf-8') as f:
for path in video_paths:
f.write(f"file '{path}'\n")
# Generate the output file path
output_path = os.path.join(save_folder, f'result.mp4')
# Build the FFmpeg command
command = [
'ffmpeg',
'-y',
'-f', 'concat',
'-safe', '0',
'-i', temp_file_list,
'-c', 'copy',
output_path
]
# Execute the FFmpeg command
subprocess.run(command, check=True)
success_msg = f"The videos have been successfully merged and saved."
logger.info(success_msg)
return True, output_path, success_msg
except subprocess.CalledProcessError as e:
error_msg = f"Error: An error occurred while executing the FFmpeg command. Error message: {e}"
logger.error(error_msg)
return False, "", error_msg
except Exception as e:
error_msg = f"An unknown error occurred: {e}"
logger.error(error_msg)
return False, "", error_msg
finally:
# Delete the temporary file list
if os.path.exists(temp_file_list):
os.remove(temp_file_list)
# add_audio_to_video
def add_audio_to_video(video_path: str, audio_path: str, output_path: str, start_time: int = 0, audio_duration: Optional[int] = None) -> tuple[bool, str]:
logger.debug("-----------------------------------------------------------------------------")
logger.debug("Parameter check <add_audio_to_video> ----------------------------------------")
logger.debug(f"video_path: {video_path}")
logger.debug(f"audio_path: {audio_path}")
logger.debug(f"output_path: {output_path}")
logger.debug(f"start_time: {start_time}")
logger.debug(f"audio_duration: {audio_duration}")
logger.debug("-----------------------------------------------------------------------------")
# Get the duration of the video
ffprobe_cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
video_path
]
try:
video_duration = float(subprocess.check_output(ffprobe_cmd).decode().strip())
except subprocess.CalledProcessError as e:
logger.error(f"Error getting video duration: {e}")
return False, str(e)
except ValueError:
logger.error("Error parsing video duration.")
return False, str(e)
# If the audio duration is not specified, use the video duration
if audio_duration is None:
audio_duration = video_duration
# Build the FFmpeg command
ffmpeg_cmd = [
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-ss', str(start_time),
'-t', str(min(audio_duration, video_duration)),
'-filter_complex', '[0:a]volume=1[a1];[1:a]volume=1[a2];[a1][a2]amix=inputs=2:duration=first:dropout_transition=0[aout]',
'-map', '0:v',
'-map', '[aout]',
'-c:v', 'copy',
'-c:a', 'aac',
output_path
]
try:
# Execute the FFmpeg command
subprocess.run(ffmpeg_cmd, check=True)
logger.info(f"Successfully added audio to video. Output saved to {output_path}")
return True, "success"
except subprocess.CalledProcessError as e:
logger.error(f"Error adding audio to video: {e}")
return False, str(e)
# copy file and rename
def copy_file(source_file: str, target_folder: str, rename: str) -> tuple[bool, str]:
if not os.path.isfile(source_file):
return False, f"it is not a file, path {source_file}"
if not os.path.exists(target_folder):
os.makedirs(target_folder)
# 提取源文件的后缀
_, file_extension = os.path.splitext(source_file)
# 构建包含后缀的目标文件名
target_file = os.path.join(target_folder, rename + file_extension)
try:
# 复制文件
shutil.copy2(source_file, target_file)
return True, "success"
except Exception as e:
return False, f"Error occurred while copying: {str(e)}"
# end of functions -------------------------------------------------------------
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# MCP Service --------------------
mcp = FastMCP(
name="VideoEditorMCP",
description="A video editing MCP tool service that has implemented " \
"the basic functions among the fundamental functions.",
version="0.1.4",
)
# --------------------------------
# MCP Tools ----------------------
# --------------------------------------------------------------------------
# mcp-tools注册
@mcp.tool()
def clip_video_tool(
original_video_path: str,
task_id: str,
start_time: int,
stop_time: int,
title: str,
) -> dict:
"""
Clip a video based on the given start and stop times.
Parameters:
original_video_path (str): The path of the original video file.
task_id (str): The unique identifier for the clipping task.
start_time (int): The start time (in some appropriate unit) for the clipping.
stop_time (int): The stop time (in some appropriate unit) for the clipping.
title (str): title: the title of the clipping.(This item does not include suffix names)
Returns:
dict: A dictionary containing the result of the clipping operation.
The dictionary has the following keys:
- "success": A boolean indicating whether the operation was successful.
- "message": A string providing additional information about the operation.
- "output_path": The path of the clipped video file.
"""
# In fact, both the `original_video_path` and `output_path` are based on the `KB_DIR`,
# that is, in the form of `$KB_DIR/xxxxx`.
#
# It is necessary to ensure that each input is based on the `KB_DIR`,
# and each output is also based on the KB.
# The reason for this design is mainly due to the concern that errors may occur
# when generating paths and other operations.
# Therefore, every effort is made to avoid such instability.
_original_video_path = os.path.join(KB_DIR, original_video_path)
_save_folder = os.path.join(KB_DIR, KB_CLIP, task_id)
success, output_path, message = clip_video(
_original_video_path, _save_folder, start_time, stop_time, title)
return {"success": success, "message": message, "output_path": output_path[len(KB_DIR)+1:]}
@mcp.tool()
def merge_videos_tool(video_paths: list[str], task_id: str) -> dict:
"""
Merge multiple videos into one.
Parameters:
video_paths (list[str]): A list of paths of the video files to be merged.
task_id (str): The unique identifier for the merging task.
Returns:
dict: A dictionary containing the result of the merging operation.
The dictionary has the following keys:
- "success": A boolean indicating whether the operation was successful.
- "message": A string providing additional information about the operation.
- "output_path": The path of the merged video file.
"""
# the `video_paths` and `output_path` relative to the `KB_DIR``
_video_paths = []
for path in video_paths:
_video_paths.append(os.path.join(KB_DIR, path))
pass
_save_folder = os.path.join(KB_DIR, KB_MERGE, task_id)
success, output_path, message = merge_videos(_video_paths, _save_folder)
return {"success": success, "message": message, "output_path": output_path[len(KB_DIR)+1:]}
@mcp.tool()
def add_bgm_tool(
video_path: str, audio_path: str, start_time: int=0, audio_duration: Optional[int]=None
) -> dict:
"""
This function is used to add background music to a video.
Parameters:
video_path (str): The path to the video file, which should be a string.
audio_path (str): The path to the audio file, which should be a string.
start_time (int, optional): The start time (in seconds) from which the audio will be added to the video. The default value is 0.
audio_duration (Optional[int], optional): The duration (in seconds) for which the audio will be added to the video.
If it is None, the full duration of the audio will be used.
Returns:
str: Returns "success" if successful, or an error message if failed.
If an error occurs, notify the user of the reason for the error and apologize sincerely.
"""
_video_path = os.path.join(KB_DIR, video_path)
_audio_path = os.path.join(KB_DIR, audio_path)
_output_path = os.path.join(KB_DIR, KB_ADD)
_, msg = add_audio_to_video(_video_path, _audio_path, _output_path, start_time, audio_duration)
return msg
@mcp.tool()
def task_endding(task_id: str, source_file: str, title: str = "") -> str:
"""
This function should be called every time a task ends to push the result document after task processing to the result folder.
Parameters:
task_id (str): uniquely identifying the current task.
source_file(str): Indicates the location of the result file to be pushed,
which is the file location provided after the previous process of this task ends.
title(str): If you need to modify the file name (note: including the file extension), use this parameter.
Otherwise, keep the file name the same as that of the source_file or simply don't input this parameter as there is a default parameter here.
(This item does not include suffix names)
Returns:
str: Returns "success" if successful, or an error message if failed.
If an error occurs, notify the user of the reason for the error and apologize sincerely.
"""
_source_file = os.path.join(KB_DIR, source_file)
_target_dir = os.path.join(KB_DIR, KB_RESULT, task_id)
if not os.path.exists(_target_dir):
os.makedirs(_target_dir)
if not os.path.exists(_source_file):
return "This file does not exist. Please check if the path is correct."
if len(title) == 0:
_title, _ = os.path.splitext(os.path.basename(source_file))
else:
_title = title
try:
_, msg = copy_file(_source_file, _target_dir, _title)
return msg
except Exception as e:
return f"Error: {str(e)}"
if __name__ == "__main__":
logger.info("Video Edit MCP Server Running......")
mcp.run(transport='stdio')