media_server.py•18.1 kB
#!/usr/bin/env python3
"""
MCP Media Server - Lists and plays media files from ~/Media/MOVIES
Supports stdio, HTTP, and HTTPS transports
Compatible with Claude and n8n
"""
import os
import sys
import asyncio
import argparse
import subprocess
import socket
import json
import tempfile
from pathlib import Path
from typing import Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import mcp.server.stdio
# For HTTP/HTTPS support
try:
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.responses import Response
from mcp.server.sse import SseServerTransport
import uvicorn
HTTP_AVAILABLE = True
except ImportError:
HTTP_AVAILABLE = False
print("Warning: starlette/uvicorn not available. HTTP/HTTPS transport disabled.", file=sys.stderr)
# Initialize the MCP server
app = Server("media-server")
# Media directory path
MEDIA_DIR = Path.home() / "Media" / "MOVIES"
# Supported media file extensions
MEDIA_EXTENSIONS = {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mpg', '.mpeg'}
# MPV IPC socket path
MPV_SOCKET = Path(tempfile.gettempdir()) / "mpv-ipc-socket"
# Track current MPV process
current_mpv_process = None
def send_mpv_command(command: list, expect_response: bool = True) -> tuple[bool, str]:
"""
Send a command to MPV via IPC socket
Args:
command: List containing the command and arguments
expect_response: Whether to wait for a response
Returns:
Tuple of (success: bool, message: str)
"""
if not MPV_SOCKET.exists():
return False, "MPV is not running or IPC socket not found"
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(str(MPV_SOCKET))
# Send command as JSON-RPC
request = {"command": command}
sock.sendall((json.dumps(request) + "\n").encode('utf-8'))
if expect_response:
# Read response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
if b'\n' in response:
break
sock.close()
if response:
result = json.loads(response.decode('utf-8'))
if result.get('error') == 'success':
return True, f"Command executed: {' '.join(map(str, command))}"
else:
return False, f"MPV error: {result.get('error', 'unknown')}"
else:
sock.close()
return True, f"Command sent: {' '.join(map(str, command))}"
except FileNotFoundError:
return False, "MPV IPC socket not found"
except ConnectionRefusedError:
return False, "Could not connect to MPV"
except Exception as e:
return False, f"Error communicating with MPV: {str(e)}"
def get_media_files():
"""Get list of media files in the MOVIES directory"""
if not MEDIA_DIR.exists():
return []
media_files = []
try:
for item in MEDIA_DIR.iterdir():
if item.is_file() and item.suffix.lower() in MEDIA_EXTENSIONS:
media_files.append({
'name': item.name,
'path': str(item),
'size': item.stat().st_size,
'extension': item.suffix
})
except Exception as e:
print(f"Error scanning directory: {e}", file=sys.stderr)
return sorted(media_files, key=lambda x: x['name'])
def play_media_file(filename: str, loop: bool = False) -> tuple[bool, str]:
"""
Play a media file using MPV with IPC control
Args:
filename: Name of the file to play
loop: Whether to loop the video
Returns:
Tuple of (success: bool, message: str)
"""
global current_mpv_process
file_path = MEDIA_DIR / filename
if not file_path.exists():
return False, f"File not found: {filename}"
if file_path.suffix.lower() not in MEDIA_EXTENSIONS:
return False, f"Not a supported media file: {filename}"
try:
# Remove old socket if it exists
if MPV_SOCKET.exists():
MPV_SOCKET.unlink()
# Detect platform and use appropriate player
if sys.platform == 'linux':
# Try MPV first with IPC support
if subprocess.run(['which', 'mpv'], capture_output=True).returncode == 0:
mpv_args = [
'mpv',
f'--input-ipc-server={MPV_SOCKET}',
str(file_path)
]
if loop:
mpv_args.insert(1, '--loop=inf')
current_mpv_process = subprocess.Popen(
mpv_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return True, f"Playing {filename} with MPV (IPC enabled)"
# Fallback to other players
players = ['vlc', 'mplayer', 'xdg-open']
for player in players:
if subprocess.run(['which', player], capture_output=True).returncode == 0:
subprocess.Popen([player, str(file_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return True, f"Playing {filename} with {player} (no IPC control)"
return False, "No media player found (tried: mpv, vlc, mplayer, xdg-open)"
elif sys.platform == 'darwin': # macOS
# Try MPV on macOS
if subprocess.run(['which', 'mpv'], capture_output=True).returncode == 0:
mpv_args = [
'mpv',
f'--input-ipc-server={MPV_SOCKET}',
str(file_path)
]
if loop:
mpv_args.insert(1, '--loop=inf')
current_mpv_process = subprocess.Popen(
mpv_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return True, f"Playing {filename} with MPV (IPC enabled)"
else:
subprocess.Popen(['open', str(file_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return True, f"Playing {filename} (no IPC control)"
elif sys.platform == 'win32': # Windows
# Try MPV on Windows
if subprocess.run(['where', 'mpv'], capture_output=True).returncode == 0:
mpv_args = [
'mpv',
f'--input-ipc-server={MPV_SOCKET}',
str(file_path)
]
if loop:
mpv_args.insert(1, '--loop=inf')
current_mpv_process = subprocess.Popen(
mpv_args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return True, f"Playing {filename} with MPV (IPC enabled)"
else:
os.startfile(str(file_path))
return True, f"Playing {filename} (no IPC control)"
else:
return False, f"Unsupported platform: {sys.platform}"
except Exception as e:
return False, f"Error playing file: {str(e)}"
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="list_movies",
description=f"List all media files available in {MEDIA_DIR}. Returns name, path, size, and extension for each file.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="play_movie",
description="Play a specific movie file using MPV with IPC control. Supports optional loop parameter.",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The name of the movie file to play (e.g., 'movie.mp4')"
},
"loop": {
"type": "boolean",
"description": "Whether to loop the video infinitely (default: false)"
}
},
"required": ["filename"]
}
),
Tool(
name="pause_playback",
description="Pause or unpause the currently playing video in MPV. Toggles between pause and play.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="stop_playback",
description="Stop playback and quit MPV.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="seek_forward",
description="Skip forward in the video by a specified number of seconds.",
inputSchema={
"type": "object",
"properties": {
"seconds": {
"type": "number",
"description": "Number of seconds to skip forward (default: 10)"
}
},
"required": []
}
),
Tool(
name="seek_backward",
description="Skip backward in the video by a specified number of seconds.",
inputSchema={
"type": "object",
"properties": {
"seconds": {
"type": "number",
"description": "Number of seconds to skip backward (default: 10)"
}
},
"required": []
}
),
Tool(
name="next_chapter",
description="Jump to the next chapter or scene in the video.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="previous_chapter",
description="Jump to the previous chapter or scene in the video.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="toggle_loop",
description="Toggle loop mode on/off for the current video.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="restart_playback",
description="Restart the video from the beginning.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls"""
if name == "list_movies":
media_files = get_media_files()
if not media_files:
if not MEDIA_DIR.exists():
return [TextContent(
type="text",
text=f"Media directory does not exist: {MEDIA_DIR}"
)]
else:
return [TextContent(
type="text",
text=f"No media files found in {MEDIA_DIR}"
)]
# Format the response
response = f"Found {len(media_files)} media file(s) in {MEDIA_DIR}:\n\n"
for i, file in enumerate(media_files, 1):
size_mb = file['size'] / (1024 * 1024)
response += f"{i}. {file['name']}\n"
response += f" Path: {file['path']}\n"
response += f" Size: {size_mb:.2f} MB\n"
response += f" Type: {file['extension']}\n\n"
return [TextContent(type="text", text=response)]
elif name == "play_movie":
filename = arguments.get("filename")
loop = arguments.get("loop", False)
if not filename:
return [TextContent(
type="text",
text="Error: filename parameter is required"
)]
success, message = play_media_file(filename, loop)
return [TextContent(
type="text",
text=message
)]
elif name == "pause_playback":
success, message = send_mpv_command(["cycle", "pause"])
return [TextContent(type="text", text=message)]
elif name == "stop_playback":
success, message = send_mpv_command(["quit"])
return [TextContent(type="text", text=message)]
elif name == "seek_forward":
seconds = arguments.get("seconds", 10)
success, message = send_mpv_command(["seek", seconds, "relative"])
return [TextContent(type="text", text=message)]
elif name == "seek_backward":
seconds = arguments.get("seconds", 10)
success, message = send_mpv_command(["seek", -seconds, "relative"])
return [TextContent(type="text", text=message)]
elif name == "next_chapter":
success, message = send_mpv_command(["add", "chapter", 1])
return [TextContent(type="text", text=message)]
elif name == "previous_chapter":
success, message = send_mpv_command(["add", "chapter", -1])
return [TextContent(type="text", text=message)]
elif name == "toggle_loop":
success, message = send_mpv_command(["cycle", "loop-file"])
return [TextContent(type="text", text=message)]
elif name == "restart_playback":
success, message = send_mpv_command(["seek", 0, "absolute"])
return [TextContent(type="text", text=message)]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
async def run_stdio():
"""Run the server using stdio transport"""
async with stdio_server() as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
def create_sse_app():
"""Create Starlette app for SSE transport (HTTP/HTTPS)"""
if not HTTP_AVAILABLE:
raise RuntimeError("HTTP transport not available. Install: pip install starlette uvicorn sse-starlette")
sse = SseServerTransport("/messages/")
async def handle_sse(request):
async with sse.connect_sse(
request.scope,
request.receive,
request._send
) as streams:
await app.run(
streams[0],
streams[1],
app.create_initialization_options()
)
return Response()
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse, methods=["GET"]),
Mount("/messages/", app=sse.handle_post_message),
]
)
return starlette_app
def run_http_server(host: str, port: int, use_ssl: bool = False, certfile: str = None, keyfile: str = None):
"""Run the server using HTTP/HTTPS with SSE transport"""
if not HTTP_AVAILABLE:
print("Error: HTTP transport not available. Install: pip install starlette uvicorn sse-starlette", file=sys.stderr)
sys.exit(1)
starlette_app = create_sse_app()
config = {
"app": starlette_app,
"host": host,
"port": port,
}
if use_ssl:
if not certfile or not keyfile:
print("Error: SSL enabled but certificate/key files not provided", file=sys.stderr)
sys.exit(1)
config["ssl_certfile"] = certfile
config["ssl_keyfile"] = keyfile
protocol = "https"
else:
protocol = "http"
print(f"Starting MCP Media Server on {protocol}://{host}:{port}", file=sys.stderr)
print(f"SSE endpoint: {protocol}://{host}:{port}/sse", file=sys.stderr)
print(f"Media directory: {MEDIA_DIR}", file=sys.stderr)
uvicorn.run(**config)
def main():
"""Main entry point with argument parsing"""
parser = argparse.ArgumentParser(
description="MCP Media Server - List and play media files",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run with stdio (for Claude Desktop, etc.)
python media_server.py --transport stdio
# Run with HTTP
python media_server.py --transport http --host 0.0.0.0 --port 8000
# Run with HTTPS
python media_server.py --transport https --host 0.0.0.0 --port 8443 \\
--certfile cert.pem --keyfile key.pem
"""
)
parser.add_argument(
"--transport",
choices=["stdio", "http", "https"],
default="http",
help="Transport protocol to use (default: stdio)"
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Host to bind to for HTTP/HTTPS (default: 127.0.0.1)"
)
parser.add_argument(
"--port",
type=int,
default=3000,
help="Port to bind to for HTTP/HTTPS (default: 8000)"
)
parser.add_argument(
"--certfile",
help="SSL certificate file for HTTPS"
)
parser.add_argument(
"--keyfile",
help="SSL key file for HTTPS"
)
args = parser.parse_args()
if args.transport == "stdio":
asyncio.run(run_stdio())
elif args.transport in ["http", "https"]:
use_ssl = args.transport == "https"
run_http_server(args.host, args.port, use_ssl, args.certfile, args.keyfile)
if __name__ == "__main__":
main()