Skip to main content
Glama

vedit-mcp

by zakahan
vedit_mcp.py20.2 kB
import os import shutil import argparse import subprocess from pathlib import Path from mcp.server.fastmcp import FastMCP from loguru import logger from logging import INFO from typing import Optional # The description of the argument is as follows. # 1. `kb_dir`:(Required)It is used to set the base path where various video files for operation are located. (For details, please check the code.) # 2. `using_logger`: (Optional) It is used to set whether to use the logger. If it is set to "True", # the output will be generated. If it is not set or set to other values, no log will be output. # 3. `logger_level`:(Optional) The log level. By default, it is set to the DEBUG level. You can set it # to any one of "DEBUG", "INFO", "ERROR", and "WARNING" by yourself. # 4. `logger_file_dir`: (Required when `using_logger="True"`), The directory for log output. It is required that this directory must already exist. # ----------------------------------------------------------------------------- # Setting Arguments Variable # Logging Configuration parser = argparse.ArgumentParser(description='Configure script parameters via command-line arguments') parser.add_argument('--using_logger', choices=['True', 'False'], default='False', help='Whether to use the logger. Valid values are True or False, default is False') parser.add_argument('--logger_file_dir', default=None, help='Directory for the log file. If the logger is used, this directory must be specified') parser.add_argument('--logger_level', default='DEBUG', help='Logging level for the logger, default is DEBUG') parser.add_argument('--kb_dir', default=None, help='Base path for the video folder') args = parser.parse_args() if args.using_logger == "True": USING_LOGGER = True LOGGER_FILE_DIR = args.logger_file_dir else: USING_LOGGER = False # default LOGGER_FILE_DIR = None # --- LOGGER_LEVEL = args.logger_level # LOGGER_FILE_DIR: this folder must: # This directory must already exist, and an mcp.log file will be created here to record logs. # If you have any other ideas, please modify the code yourself. # Video Folder Base KB_DIR = args.kb_dir KB_CLIP = "clip" KB_MERGE = "merge" KB_RESULT = "result" KB_ADD = "add" # add bgm, and add ... what i don't know..... # ----------------------------------------------------------------------------- # Config Logger def get_logger(): if not USING_LOGGER: logger.remove() return logger if LOGGER_LEVEL not in {"DEBUG", "INFO", "ERROR", "CRITICAL"}: raise ValueError(f"`logger_level` Error: the logger level is not exists: {LOGGER_LEVEL}") if LOGGER_FILE_DIR is None: raise ValueError("`logger_file_dir` Error: If you set `using_logger` to `True`, then you must configure " \ "the argument variable `logger_file_dir` and " \ "ensure that this directory already exists. ") elif not os.path.exists(os.path.abspath(LOGGER_FILE_DIR)): raise ValueError(f"`logger_file_dir` Error: The argument `logger_file_dir`={LOGGER_FILE_DIR} must already" \ "exist, we can not find it now.") log_path = os.path.join(LOGGER_FILE_DIR, 'logs' ,'mcp.log') logger.add(log_path, rotation="500 MB", retention="10 days", level=LOGGER_LEVEL, format="{time} | {level} | " + "__VEDIO_EDITOR_SERVER__" + ":{function}:{line} - {message}") return logger # Check the path def check_paths(): if KB_DIR is None: raise ValueError("`kb_dir` Error: KB_DIR is None, you must configure the argument variable " \ "KB_DIR and ensure that this path truly exists. The function of this path is to " \ "store the original video files, temporary files, and result files.") elif not os.path.exists(os.path.abspath(KB_DIR)): raise ValueError( f"`kb_dir` Error: The argument `kb_dir`={KB_DIR} must already exist, we can not find it now.") logger = get_logger() check_paths() # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # start of functions ----------------------------------------------------------- # ------------------------------------------------------------------------------ # Underlying Implementation ---------------------------------------------------- # """ # ## Plan # Actually, at a cursory glance, video editing can still be divided into the following operations: create, read, update, and delete. # Create: Add filters, add subtitles, add background music, etc. # Delete: Delete specified segments, delete subtitles, delete audio tracks, etc. # Update: Merge multiple videos, cut out a small segment from the original video. # Read: Query basic information of the original video, such as duration, frame rate, etc. (It seems this isn't very useful). # ## What has been achieved so far # 1. clip_video_tool: cut out a small segment from the original video. # 2. move_videos_tool: merge multiple videos. # 3. add_bgm_tool: add background music. # """ # ----------------------------------------------------------------------------- # clip_video: def clip_video( original_video_path: str, save_folder: str, start_time: int, stop_time: int, title: str, ) -> tuple[bool, str, str]: """ Cut a video and save it to a specified folder. :param original_video_path: The path of the original video file. :param save_folder: The path of the folder where the cut video will be saved. :param start_time: The start time for cutting (in seconds; this time unit is sufficient for most operations). :param stop_time: The end time for cutting (in seconds). :return: A tuple where the first element is a boolean indicating whether the operation was successful, the second element is the output location, and the third element is the log information. """ logger.debug("-----------------------------------------------------------------------------") logger.debug("Parameter check <clip_video> ------------------------------------------------") logger.debug(f"origin_video_path: {original_video_path}") logger.debug(f"save_folder: {save_folder}") logger.debug(f"start_time: {start_time}") logger.debug(f"stop_time: {stop_time}") logger.debug("-----------------------------------------------------------------------------") # Check if the original video file exists if not os.path.isfile(original_video_path): error_msg = f"Error: The original video file does not exist." logger.error(error_msg) return False, "", error_msg _, file_extension = os.path.splitext(original_video_path) # Check if the target folder exists. If it doesn't exist, create it. if not os.path.exists(save_folder): try: os.makedirs(save_folder) except OSError as e: error_msg = f"Error: Failed to create the folder. Error message: {e}" logger.error(error_msg) return False, "", error_msg # Generate the output file path output_path = os.path.join(save_folder, f"{title}{file_extension}") try: # Build the FFmpeg command command = [ 'ffmpeg', '-y', '-ss', str(start_time), '-to', str(stop_time), '-i', original_video_path, '-c', 'copy', output_path ] # Execute the FFmpeg command subprocess.run(command, check=True) success_msg = f"The video has been successfully cut and is being saved. " logger.info(success_msg) return True, output_path, success_msg except subprocess.CalledProcessError as e: error_msg = f"Error: An error occurred while executing the FFmpeg command. Error message: {e}" logger.error(error_msg) return False, "", error_msg # merge_videos def merge_videos(video_paths: list[str], save_folder: str) -> tuple[bool, str, str]: """ Merge multiple local video files. :param video_paths: A list containing the paths of video files. :param save_folder: The folder where the merged video will be saved. :return: A tuple where the first element is a boolean indicating whether the operation was successful, the second element is the output path, and the third element is the log information. """ logger.debug("-----------------------------------------------------------------------------") logger.debug("Parameter check <merge_videos> ----------------------------------------------") logger.debug(f"video_path: {str(video_paths)}") logger.debug(f"save_folder: {save_folder}") logger.debug("-----------------------------------------------------------------------------") # Check if all video files exist for path in video_paths: if not os.path.isfile(path): error_msg = f"Error: The video file does not exist." logger.error(error_msg) return False, "", error_msg # Check if the target folder exists. If not, create it. if not os.path.exists(save_folder): try: os.makedirs(save_folder) except OSError as e: error_msg = f"Error: Failed to create the folder. Error message: {e}" logger.error(error_msg) return False, "", error_msg # Create a temporary file list temp_file_list = 'temp_file_list.txt' try: with open(temp_file_list, 'w', encoding='utf-8') as f: for path in video_paths: f.write(f"file '{path}'\n") # Generate the output file path output_path = os.path.join(save_folder, f'result.mp4') # Build the FFmpeg command command = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', temp_file_list, '-c', 'copy', output_path ] # Execute the FFmpeg command subprocess.run(command, check=True) success_msg = f"The videos have been successfully merged and saved." logger.info(success_msg) return True, output_path, success_msg except subprocess.CalledProcessError as e: error_msg = f"Error: An error occurred while executing the FFmpeg command. Error message: {e}" logger.error(error_msg) return False, "", error_msg except Exception as e: error_msg = f"An unknown error occurred: {e}" logger.error(error_msg) return False, "", error_msg finally: # Delete the temporary file list if os.path.exists(temp_file_list): os.remove(temp_file_list) # add_audio_to_video def add_audio_to_video(video_path: str, audio_path: str, output_path: str, start_time: int = 0, audio_duration: Optional[int] = None) -> tuple[bool, str]: logger.debug("-----------------------------------------------------------------------------") logger.debug("Parameter check <add_audio_to_video> ----------------------------------------") logger.debug(f"video_path: {video_path}") logger.debug(f"audio_path: {audio_path}") logger.debug(f"output_path: {output_path}") logger.debug(f"start_time: {start_time}") logger.debug(f"audio_duration: {audio_duration}") logger.debug("-----------------------------------------------------------------------------") # Get the duration of the video ffprobe_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path ] try: video_duration = float(subprocess.check_output(ffprobe_cmd).decode().strip()) except subprocess.CalledProcessError as e: logger.error(f"Error getting video duration: {e}") return False, str(e) except ValueError: logger.error("Error parsing video duration.") return False, str(e) # If the audio duration is not specified, use the video duration if audio_duration is None: audio_duration = video_duration # Build the FFmpeg command ffmpeg_cmd = [ 'ffmpeg', '-i', video_path, '-i', audio_path, '-ss', str(start_time), '-t', str(min(audio_duration, video_duration)), '-filter_complex', '[0:a]volume=1[a1];[1:a]volume=1[a2];[a1][a2]amix=inputs=2:duration=first:dropout_transition=0[aout]', '-map', '0:v', '-map', '[aout]', '-c:v', 'copy', '-c:a', 'aac', output_path ] try: # Execute the FFmpeg command subprocess.run(ffmpeg_cmd, check=True) logger.info(f"Successfully added audio to video. Output saved to {output_path}") return True, "success" except subprocess.CalledProcessError as e: logger.error(f"Error adding audio to video: {e}") return False, str(e) # copy file and rename def copy_file(source_file: str, target_folder: str, rename: str) -> tuple[bool, str]: if not os.path.isfile(source_file): return False, f"it is not a file, path {source_file}" if not os.path.exists(target_folder): os.makedirs(target_folder) # 提取源文件的后缀 _, file_extension = os.path.splitext(source_file) # 构建包含后缀的目标文件名 target_file = os.path.join(target_folder, rename + file_extension) try: # 复制文件 shutil.copy2(source_file, target_file) return True, "success" except Exception as e: return False, f"Error occurred while copying: {str(e)}" # end of functions ------------------------------------------------------------- # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------ # MCP Service -------------------- mcp = FastMCP( name="VideoEditorMCP", description="A video editing MCP tool service that has implemented " \ "the basic functions among the fundamental functions.", version="0.1.4", ) # -------------------------------- # MCP Tools ---------------------- # -------------------------------------------------------------------------- # mcp-tools注册 @mcp.tool() def clip_video_tool( original_video_path: str, task_id: str, start_time: int, stop_time: int, title: str, ) -> dict: """ Clip a video based on the given start and stop times. Parameters: original_video_path (str): The path of the original video file. task_id (str): The unique identifier for the clipping task. start_time (int): The start time (in some appropriate unit) for the clipping. stop_time (int): The stop time (in some appropriate unit) for the clipping. title (str): title: the title of the clipping.(This item does not include suffix names) Returns: dict: A dictionary containing the result of the clipping operation. The dictionary has the following keys: - "success": A boolean indicating whether the operation was successful. - "message": A string providing additional information about the operation. - "output_path": The path of the clipped video file. """ # In fact, both the `original_video_path` and `output_path` are based on the `KB_DIR`, # that is, in the form of `$KB_DIR/xxxxx`. # # It is necessary to ensure that each input is based on the `KB_DIR`, # and each output is also based on the KB. # The reason for this design is mainly due to the concern that errors may occur # when generating paths and other operations. # Therefore, every effort is made to avoid such instability. _original_video_path = os.path.join(KB_DIR, original_video_path) _save_folder = os.path.join(KB_DIR, KB_CLIP, task_id) success, output_path, message = clip_video( _original_video_path, _save_folder, start_time, stop_time, title) return {"success": success, "message": message, "output_path": output_path[len(KB_DIR)+1:]} @mcp.tool() def merge_videos_tool(video_paths: list[str], task_id: str) -> dict: """ Merge multiple videos into one. Parameters: video_paths (list[str]): A list of paths of the video files to be merged. task_id (str): The unique identifier for the merging task. Returns: dict: A dictionary containing the result of the merging operation. The dictionary has the following keys: - "success": A boolean indicating whether the operation was successful. - "message": A string providing additional information about the operation. - "output_path": The path of the merged video file. """ # the `video_paths` and `output_path` relative to the `KB_DIR`` _video_paths = [] for path in video_paths: _video_paths.append(os.path.join(KB_DIR, path)) pass _save_folder = os.path.join(KB_DIR, KB_MERGE, task_id) success, output_path, message = merge_videos(_video_paths, _save_folder) return {"success": success, "message": message, "output_path": output_path[len(KB_DIR)+1:]} @mcp.tool() def add_bgm_tool( video_path: str, audio_path: str, start_time: int=0, audio_duration: Optional[int]=None ) -> dict: """ This function is used to add background music to a video. Parameters: video_path (str): The path to the video file, which should be a string. audio_path (str): The path to the audio file, which should be a string. start_time (int, optional): The start time (in seconds) from which the audio will be added to the video. The default value is 0. audio_duration (Optional[int], optional): The duration (in seconds) for which the audio will be added to the video. If it is None, the full duration of the audio will be used. Returns: str: Returns "success" if successful, or an error message if failed. If an error occurs, notify the user of the reason for the error and apologize sincerely. """ _video_path = os.path.join(KB_DIR, video_path) _audio_path = os.path.join(KB_DIR, audio_path) _output_path = os.path.join(KB_DIR, KB_ADD) _, msg = add_audio_to_video(_video_path, _audio_path, _output_path, start_time, audio_duration) return msg @mcp.tool() def task_endding(task_id: str, source_file: str, title: str = "") -> str: """ This function should be called every time a task ends to push the result document after task processing to the result folder. Parameters: task_id (str): uniquely identifying the current task. source_file(str): Indicates the location of the result file to be pushed, which is the file location provided after the previous process of this task ends. title(str): If you need to modify the file name (note: including the file extension), use this parameter. Otherwise, keep the file name the same as that of the source_file or simply don't input this parameter as there is a default parameter here. (This item does not include suffix names) Returns: str: Returns "success" if successful, or an error message if failed. If an error occurs, notify the user of the reason for the error and apologize sincerely. """ _source_file = os.path.join(KB_DIR, source_file) _target_dir = os.path.join(KB_DIR, KB_RESULT, task_id) if not os.path.exists(_target_dir): os.makedirs(_target_dir) if not os.path.exists(_source_file): return "This file does not exist. Please check if the path is correct." if len(title) == 0: _title, _ = os.path.splitext(os.path.basename(source_file)) else: _title = title try: _, msg = copy_file(_source_file, _target_dir, _title) return msg except Exception as e: return f"Error: {str(e)}" if __name__ == "__main__": logger.info("Video Edit MCP Server Running......") mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zakahan/vedit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server