Skip to main content
Glama

FFmpeg MCP Tool

by LBJWt
ffmpeg_mcp.py23.3 kB
#!/usr/bin/env python3 """ FFmpeg MCP Server 一个基于FFmpeg的媒体处理MCP工具,支持图片和视频的转码、压缩等功能 """ import asyncio import json import logging import os import subprocess import shutil from pathlib import Path from typing import Any, Sequence, Optional, Dict, List from PIL import Image import mcp.types as types from mcp import ClientSession, StdioServerParameters from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions import mcp.server.stdio # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ffmpeg_mcp") # 创建MCP服务器实例 server = Server("ffmpeg-mcp") class FFmpegProcessor: """FFmpeg处理器类""" @staticmethod def check_ffmpeg_available() -> bool: """检查FFmpeg是否可用""" try: result = subprocess.run( ["ffmpeg", "-version"], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except Exception: return False @staticmethod def get_media_info(file_path: str) -> Dict[str, Any]: """获取媒体文件信息""" try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", file_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return json.loads(result.stdout) else: raise Exception(f"FFprobe error: {result.stderr}") except Exception as e: raise Exception(f"Failed to get media info: {str(e)}") @staticmethod def compress_image(input_path: str, output_path: str, quality: int = 85) -> bool: """压缩图片""" try: # 确保输出目录存在 os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ "ffmpeg", "-y", "-i", input_path, "-q:v", str(quality), output_path ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False @staticmethod def convert_image_format(input_path: str, output_path: str, format: str = "jpg") -> bool: """转换图片格式""" try: os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = ["ffmpeg", "-y", "-i", input_path, output_path] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False @staticmethod def resize_image(input_path: str, output_path: str, width: int, height: int) -> bool: """调整图片尺寸""" try: os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", f"scale={width}:{height}", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False @staticmethod def compress_video(input_path: str, output_path: str, crf: int = 23, preset: str = "medium") -> bool: """压缩视频""" try: os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-crf", str(crf), "-preset", preset, "-c:a", "aac", "-b:a", "128k", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False @staticmethod def convert_video_format(input_path: str, output_path: str, format: str = "mp4") -> bool: """转换视频格式""" try: os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = ["ffmpeg", "-y", "-i", input_path, output_path] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False @staticmethod def resize_video(input_path: str, output_path: str, width: int, height: int) -> bool: """调整视频尺寸""" try: os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", f"scale={width}:{height}", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) return result.returncode == 0 except Exception: return False class FileManager: """文件管理器类""" @staticmethod def get_files_by_extension(directory: str, extensions: List[str]) -> List[str]: """根据扩展名获取文件列表""" files = [] path = Path(directory) if path.exists() and path.is_dir(): for ext in extensions: files.extend(path.glob(f"*.{ext}")) files.extend(path.glob(f"*.{ext.upper()}")) return [str(f) for f in files] @staticmethod def batch_process_files(input_dir: str, output_dir: str, process_func, **kwargs) -> Dict[str, Any]: """批量处理文件""" results = { "success": [], "failed": [], "total": 0 } # 图片扩展名 image_extensions = ["jpg", "jpeg", "png", "bmp", "gif", "tiff", "webp"] # 视频扩展名 video_extensions = ["mp4", "avi", "mkv", "mov", "wmv", "flv", "webm", "m4v"] all_extensions = image_extensions + video_extensions files = FileManager.get_files_by_extension(input_dir, all_extensions) results["total"] = len(files) for file_path in files: try: file_name = Path(file_path).name output_path = os.path.join(output_dir, file_name) if process_func(file_path, output_path, **kwargs): results["success"].append(file_name) else: results["failed"].append(file_name) except Exception as e: results["failed"].append(f"{Path(file_path).name}: {str(e)}") return results @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ 列出可用的工具函数 """ return [ types.Tool( name="compress_image", description="压缩单个图片文件", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入图片文件路径" }, "output_path": { "type": "string", "description": "输出图片文件路径" }, "quality": { "type": "integer", "description": "压缩质量 (1-100, 默认85)", "default": 85 } }, "required": ["input_path", "output_path"] }, ), types.Tool( name="batch_compress_images", description="批量压缩指定文件夹中的所有图片", inputSchema={ "type": "object", "properties": { "input_dir": { "type": "string", "description": "输入文件夹路径" }, "output_dir": { "type": "string", "description": "输出文件夹路径" }, "quality": { "type": "integer", "description": "压缩质量 (1-100, 默认85)", "default": 85 } }, "required": ["input_dir", "output_dir"] }, ), types.Tool( name="convert_image_format", description="转换图片格式", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入图片文件路径" }, "output_path": { "type": "string", "description": "输出图片文件路径" } }, "required": ["input_path", "output_path"] }, ), types.Tool( name="resize_image", description="调整图片尺寸", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入图片文件路径" }, "output_path": { "type": "string", "description": "输出图片文件路径" }, "width": { "type": "integer", "description": "目标宽度" }, "height": { "type": "integer", "description": "目标高度" } }, "required": ["input_path", "output_path", "width", "height"] }, ), types.Tool( name="compress_video", description="压缩单个视频文件", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入视频文件路径" }, "output_path": { "type": "string", "description": "输出视频文件路径" }, "crf": { "type": "integer", "description": "压缩质量 (18-28, 数值越小质量越高, 默认23)", "default": 23 }, "preset": { "type": "string", "description": "压缩速度预设 (ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow, 默认medium)", "default": "medium" } }, "required": ["input_path", "output_path"] }, ), types.Tool( name="batch_compress_videos", description="批量压缩指定文件夹中的所有视频", inputSchema={ "type": "object", "properties": { "input_dir": { "type": "string", "description": "输入文件夹路径" }, "output_dir": { "type": "string", "description": "输出文件夹路径" }, "crf": { "type": "integer", "description": "压缩质量 (18-28, 数值越小质量越高, 默认23)", "default": 23 }, "preset": { "type": "string", "description": "压缩速度预设 (ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow, 默认medium)", "default": "medium" } }, "required": ["input_dir", "output_dir"] }, ), types.Tool( name="convert_video_format", description="转换视频格式", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入视频文件路径" }, "output_path": { "type": "string", "description": "输出视频文件路径" } }, "required": ["input_path", "output_path"] }, ), types.Tool( name="resize_video", description="调整视频尺寸", inputSchema={ "type": "object", "properties": { "input_path": { "type": "string", "description": "输入视频文件路径" }, "output_path": { "type": "string", "description": "输出视频文件路径" }, "width": { "type": "integer", "description": "目标宽度" }, "height": { "type": "integer", "description": "目标高度" } }, "required": ["input_path", "output_path", "width", "height"] }, ), types.Tool( name="get_media_info", description="获取媒体文件信息", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "媒体文件路径" } }, "required": ["file_path"] }, ), types.Tool( name="check_ffmpeg_status", description="检查FFmpeg是否已安装并可用", inputSchema={ "type": "object", "properties": {}, "required": [] }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent]: """ 处理工具调用 """ if arguments is None: arguments = {} try: if name == "compress_image": input_path = arguments.get("input_path") output_path = arguments.get("output_path") quality = arguments.get("quality", 85) if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.compress_image(input_path, output_path, quality) if success: return [types.TextContent(type="text", text=f"图片压缩成功:{input_path} -> {output_path}")] else: return [types.TextContent(type="text", text=f"图片压缩失败:{input_path}")] elif name == "batch_compress_images": input_dir = arguments.get("input_dir") output_dir = arguments.get("output_dir") quality = arguments.get("quality", 85) if not os.path.exists(input_dir): return [types.TextContent(type="text", text=f"错误:输入目录不存在: {input_dir}")] results = FileManager.batch_process_files( input_dir, output_dir, FFmpegProcessor.compress_image, quality=quality ) message = f"批量图片压缩完成!\n总计: {results['total']} 个文件\n成功: {len(results['success'])} 个\n失败: {len(results['failed'])} 个" if results['failed']: message += f"\n失败的文件: {', '.join(results['failed'])}" return [types.TextContent(type="text", text=message)] elif name == "convert_image_format": input_path = arguments.get("input_path") output_path = arguments.get("output_path") if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.convert_image_format(input_path, output_path) if success: return [types.TextContent(type="text", text=f"图片格式转换成功:{input_path} -> {output_path}")] else: return [types.TextContent(type="text", text=f"图片格式转换失败:{input_path}")] elif name == "resize_image": input_path = arguments.get("input_path") output_path = arguments.get("output_path") width = arguments.get("width") height = arguments.get("height") if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.resize_image(input_path, output_path, width, height) if success: return [types.TextContent(type="text", text=f"图片尺寸调整成功:{input_path} -> {output_path} ({width}x{height})")] else: return [types.TextContent(type="text", text=f"图片尺寸调整失败:{input_path}")] elif name == "compress_video": input_path = arguments.get("input_path") output_path = arguments.get("output_path") crf = arguments.get("crf", 23) preset = arguments.get("preset", "medium") if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.compress_video(input_path, output_path, crf, preset) if success: return [types.TextContent(type="text", text=f"视频压缩成功:{input_path} -> {output_path}")] else: return [types.TextContent(type="text", text=f"视频压缩失败:{input_path}")] elif name == "batch_compress_videos": input_dir = arguments.get("input_dir") output_dir = arguments.get("output_dir") crf = arguments.get("crf", 23) preset = arguments.get("preset", "medium") if not os.path.exists(input_dir): return [types.TextContent(type="text", text=f"错误:输入目录不存在: {input_dir}")] results = FileManager.batch_process_files( input_dir, output_dir, FFmpegProcessor.compress_video, crf=crf, preset=preset ) message = f"批量视频压缩完成!\n总计: {results['total']} 个文件\n成功: {len(results['success'])} 个\n失败: {len(results['failed'])} 个" if results['failed']: message += f"\n失败的文件: {', '.join(results['failed'])}" return [types.TextContent(type="text", text=message)] elif name == "convert_video_format": input_path = arguments.get("input_path") output_path = arguments.get("output_path") if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.convert_video_format(input_path, output_path) if success: return [types.TextContent(type="text", text=f"视频格式转换成功:{input_path} -> {output_path}")] else: return [types.TextContent(type="text", text=f"视频格式转换失败:{input_path}")] elif name == "resize_video": input_path = arguments.get("input_path") output_path = arguments.get("output_path") width = arguments.get("width") height = arguments.get("height") if not os.path.exists(input_path): return [types.TextContent(type="text", text=f"错误:输入文件不存在: {input_path}")] success = FFmpegProcessor.resize_video(input_path, output_path, width, height) if success: return [types.TextContent(type="text", text=f"视频尺寸调整成功:{input_path} -> {output_path} ({width}x{height})")] else: return [types.TextContent(type="text", text=f"视频尺寸调整失败:{input_path}")] elif name == "get_media_info": file_path = arguments.get("file_path") if not os.path.exists(file_path): return [types.TextContent(type="text", text=f"错误:文件不存在: {file_path}")] try: info = FFmpegProcessor.get_media_info(file_path) return [types.TextContent(type="text", text=json.dumps(info, indent=2, ensure_ascii=False))] except Exception as e: return [types.TextContent(type="text", text=f"获取媒体信息失败:{str(e)}")] elif name == "check_ffmpeg_status": available = FFmpegProcessor.check_ffmpeg_available() if available: return [types.TextContent(type="text", text="FFmpeg 已安装并可用")] else: return [types.TextContent(type="text", text="FFmpeg 未安装或不可用,请先安装FFmpeg")] else: return [types.TextContent(type="text", text=f"未知的工具名称: {name}")] except Exception as e: return [types.TextContent(type="text", text=f"工具执行出错: {str(e)}")] async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="ffmpeg-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LBJWt/ffmpeg-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server