Skip to main content
Glama

MCP Media Server

by neal3000
media_server.py18.1 kB
#!/usr/bin/env python3 """ MCP Media Server - Lists and plays media files from ~/Media/MOVIES Supports stdio, HTTP, and HTTPS transports Compatible with Claude and n8n """ import os import sys import asyncio import argparse import subprocess import socket import json import tempfile from pathlib import Path from typing import Optional from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import mcp.server.stdio # For HTTP/HTTPS support try: from starlette.applications import Starlette from starlette.routing import Route, Mount from starlette.responses import Response from mcp.server.sse import SseServerTransport import uvicorn HTTP_AVAILABLE = True except ImportError: HTTP_AVAILABLE = False print("Warning: starlette/uvicorn not available. HTTP/HTTPS transport disabled.", file=sys.stderr) # Initialize the MCP server app = Server("media-server") # Media directory path MEDIA_DIR = Path.home() / "Media" / "MOVIES" # Supported media file extensions MEDIA_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg'} # MPV IPC socket path MPV_SOCKET = Path(tempfile.gettempdir()) / "mpv-ipc-socket" # Track current MPV process current_mpv_process = None def send_mpv_command(command: list, expect_response: bool = True) -> tuple[bool, str]: """ Send a command to MPV via IPC socket Args: command: List containing the command and arguments expect_response: Whether to wait for a response Returns: Tuple of (success: bool, message: str) """ if not MPV_SOCKET.exists(): return False, "MPV is not running or IPC socket not found" try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(str(MPV_SOCKET)) # Send command as JSON-RPC request = {"command": command} sock.sendall((json.dumps(request) + "\n").encode('utf-8')) if expect_response: # Read response response = b"" while True: chunk = sock.recv(4096) if not chunk: break response += chunk if b'\n' in response: break sock.close() if response: result = json.loads(response.decode('utf-8')) if result.get('error') == 'success': return True, f"Command executed: {' '.join(map(str, command))}" else: return False, f"MPV error: {result.get('error', 'unknown')}" else: sock.close() return True, f"Command sent: {' '.join(map(str, command))}" except FileNotFoundError: return False, "MPV IPC socket not found" except ConnectionRefusedError: return False, "Could not connect to MPV" except Exception as e: return False, f"Error communicating with MPV: {str(e)}" def get_media_files(): """Get list of media files in the MOVIES directory""" if not MEDIA_DIR.exists(): return [] media_files = [] try: for item in MEDIA_DIR.iterdir(): if item.is_file() and item.suffix.lower() in MEDIA_EXTENSIONS: media_files.append({ 'name': item.name, 'path': str(item), 'size': item.stat().st_size, 'extension': item.suffix }) except Exception as e: print(f"Error scanning directory: {e}", file=sys.stderr) return sorted(media_files, key=lambda x: x['name']) def play_media_file(filename: str, loop: bool = False) -> tuple[bool, str]: """ Play a media file using MPV with IPC control Args: filename: Name of the file to play loop: Whether to loop the video Returns: Tuple of (success: bool, message: str) """ global current_mpv_process file_path = MEDIA_DIR / filename if not file_path.exists(): return False, f"File not found: {filename}" if file_path.suffix.lower() not in MEDIA_EXTENSIONS: return False, f"Not a supported media file: {filename}" try: # Remove old socket if it exists if MPV_SOCKET.exists(): MPV_SOCKET.unlink() # Detect platform and use appropriate player if sys.platform == 'linux': # Try MPV first with IPC support if subprocess.run(['which', 'mpv'], capture_output=True).returncode == 0: mpv_args = [ 'mpv', f'--input-ipc-server={MPV_SOCKET}', str(file_path) ] if loop: mpv_args.insert(1, '--loop=inf') current_mpv_process = subprocess.Popen( mpv_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True, f"Playing {filename} with MPV (IPC enabled)" # Fallback to other players players = ['vlc', 'mplayer', 'xdg-open'] for player in players: if subprocess.run(['which', player], capture_output=True).returncode == 0: subprocess.Popen([player, str(file_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True, f"Playing {filename} with {player} (no IPC control)" return False, "No media player found (tried: mpv, vlc, mplayer, xdg-open)" elif sys.platform == 'darwin': # macOS # Try MPV on macOS if subprocess.run(['which', 'mpv'], capture_output=True).returncode == 0: mpv_args = [ 'mpv', f'--input-ipc-server={MPV_SOCKET}', str(file_path) ] if loop: mpv_args.insert(1, '--loop=inf') current_mpv_process = subprocess.Popen( mpv_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True, f"Playing {filename} with MPV (IPC enabled)" else: subprocess.Popen(['open', str(file_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True, f"Playing {filename} (no IPC control)" elif sys.platform == 'win32': # Windows # Try MPV on Windows if subprocess.run(['where', 'mpv'], capture_output=True).returncode == 0: mpv_args = [ 'mpv', f'--input-ipc-server={MPV_SOCKET}', str(file_path) ] if loop: mpv_args.insert(1, '--loop=inf') current_mpv_process = subprocess.Popen( mpv_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True, f"Playing {filename} with MPV (IPC enabled)" else: os.startfile(str(file_path)) return True, f"Playing {filename} (no IPC control)" else: return False, f"Unsupported platform: {sys.platform}" except Exception as e: return False, f"Error playing file: {str(e)}" @app.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="list_movies", description=f"List all media files available in {MEDIA_DIR}. Returns name, path, size, and extension for each file.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="play_movie", description="Play a specific movie file using MPV with IPC control. Supports optional loop parameter.", inputSchema={ "type": "object", "properties": { "filename": { "type": "string", "description": "The name of the movie file to play (e.g., 'movie.mp4')" }, "loop": { "type": "boolean", "description": "Whether to loop the video infinitely (default: false)" } }, "required": ["filename"] } ), Tool( name="pause_playback", description="Pause or unpause the currently playing video in MPV. Toggles between pause and play.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="stop_playback", description="Stop playback and quit MPV.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="seek_forward", description="Skip forward in the video by a specified number of seconds.", inputSchema={ "type": "object", "properties": { "seconds": { "type": "number", "description": "Number of seconds to skip forward (default: 10)" } }, "required": [] } ), Tool( name="seek_backward", description="Skip backward in the video by a specified number of seconds.", inputSchema={ "type": "object", "properties": { "seconds": { "type": "number", "description": "Number of seconds to skip backward (default: 10)" } }, "required": [] } ), Tool( name="next_chapter", description="Jump to the next chapter or scene in the video.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="previous_chapter", description="Jump to the previous chapter or scene in the video.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="toggle_loop", description="Toggle loop mode on/off for the current video.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="restart_playback", description="Restart the video from the beginning.", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" if name == "list_movies": media_files = get_media_files() if not media_files: if not MEDIA_DIR.exists(): return [TextContent( type="text", text=f"Media directory does not exist: {MEDIA_DIR}" )] else: return [TextContent( type="text", text=f"No media files found in {MEDIA_DIR}" )] # Format the response response = f"Found {len(media_files)} media file(s) in {MEDIA_DIR}:\n\n" for i, file in enumerate(media_files, 1): size_mb = file['size'] / (1024 * 1024) response += f"{i}. {file['name']}\n" response += f" Path: {file['path']}\n" response += f" Size: {size_mb:.2f} MB\n" response += f" Type: {file['extension']}\n\n" return [TextContent(type="text", text=response)] elif name == "play_movie": filename = arguments.get("filename") loop = arguments.get("loop", False) if not filename: return [TextContent( type="text", text="Error: filename parameter is required" )] success, message = play_media_file(filename, loop) return [TextContent( type="text", text=message )] elif name == "pause_playback": success, message = send_mpv_command(["cycle", "pause"]) return [TextContent(type="text", text=message)] elif name == "stop_playback": success, message = send_mpv_command(["quit"]) return [TextContent(type="text", text=message)] elif name == "seek_forward": seconds = arguments.get("seconds", 10) success, message = send_mpv_command(["seek", seconds, "relative"]) return [TextContent(type="text", text=message)] elif name == "seek_backward": seconds = arguments.get("seconds", 10) success, message = send_mpv_command(["seek", -seconds, "relative"]) return [TextContent(type="text", text=message)] elif name == "next_chapter": success, message = send_mpv_command(["add", "chapter", 1]) return [TextContent(type="text", text=message)] elif name == "previous_chapter": success, message = send_mpv_command(["add", "chapter", -1]) return [TextContent(type="text", text=message)] elif name == "toggle_loop": success, message = send_mpv_command(["cycle", "loop-file"]) return [TextContent(type="text", text=message)] elif name == "restart_playback": success, message = send_mpv_command(["seek", 0, "absolute"]) return [TextContent(type="text", text=message)] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] async def run_stdio(): """Run the server using stdio transport""" async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) def create_sse_app(): """Create Starlette app for SSE transport (HTTP/HTTPS)""" if not HTTP_AVAILABLE: raise RuntimeError("HTTP transport not available. Install: pip install starlette uvicorn sse-starlette") sse = SseServerTransport("/messages/") async def handle_sse(request): async with sse.connect_sse( request.scope, request.receive, request._send ) as streams: await app.run( streams[0], streams[1], app.create_initialization_options() ) return Response() starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse, methods=["GET"]), Mount("/messages/", app=sse.handle_post_message), ] ) return starlette_app def run_http_server(host: str, port: int, use_ssl: bool = False, certfile: str = None, keyfile: str = None): """Run the server using HTTP/HTTPS with SSE transport""" if not HTTP_AVAILABLE: print("Error: HTTP transport not available. Install: pip install starlette uvicorn sse-starlette", file=sys.stderr) sys.exit(1) starlette_app = create_sse_app() config = { "app": starlette_app, "host": host, "port": port, } if use_ssl: if not certfile or not keyfile: print("Error: SSL enabled but certificate/key files not provided", file=sys.stderr) sys.exit(1) config["ssl_certfile"] = certfile config["ssl_keyfile"] = keyfile protocol = "https" else: protocol = "http" print(f"Starting MCP Media Server on {protocol}://{host}:{port}", file=sys.stderr) print(f"SSE endpoint: {protocol}://{host}:{port}/sse", file=sys.stderr) print(f"Media directory: {MEDIA_DIR}", file=sys.stderr) uvicorn.run(**config) def main(): """Main entry point with argument parsing""" parser = argparse.ArgumentParser( description="MCP Media Server - List and play media files", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run with stdio (for Claude Desktop, etc.) python media_server.py --transport stdio # Run with HTTP python media_server.py --transport http --host 0.0.0.0 --port 8000 # Run with HTTPS python media_server.py --transport https --host 0.0.0.0 --port 8443 \\ --certfile cert.pem --keyfile key.pem """ ) parser.add_argument( "--transport", choices=["stdio", "http", "https"], default="http", help="Transport protocol to use (default: stdio)" ) parser.add_argument( "--host", default="0.0.0.0", help="Host to bind to for HTTP/HTTPS (default: 127.0.0.1)" ) parser.add_argument( "--port", type=int, default=3000, help="Port to bind to for HTTP/HTTPS (default: 8000)" ) parser.add_argument( "--certfile", help="SSL certificate file for HTTPS" ) parser.add_argument( "--keyfile", help="SSL key file for HTTPS" ) args = parser.parse_args() if args.transport == "stdio": asyncio.run(run_stdio()) elif args.transport in ["http", "https"]: use_ssl = args.transport == "https" run_http_server(args.host, args.port, use_ssl, args.certfile, args.keyfile) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/neal3000/mcp_media_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server