Skip to main content
Glama

Plex MCP Server

by djbriane
plex_mcp.py21.8 kB
""" Module: plex_mcp This module provides tools for interacting with a Plex server via FastMCP. It includes functions to search for movies, retrieve movie details, manage playlists, and obtain recent movies and movie genres. Logging and asynchronous execution are used to handle non-blocking I/O and to provide informative error messages. """ # --- Import Statements --- from typing import Any, Dict, List, Optional from dataclasses import dataclass, asdict import os import asyncio import logging from plexapi.server import PlexServer from plexapi.exceptions import NotFound, Unauthorized from mcp.server.fastmcp import FastMCP # --- Logging Setup --- logging.basicConfig( level=logging.INFO, # Use DEBUG for more verbosity during development format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # --- FastMCP Server Initialization --- mcp = FastMCP("plex") # --- Utility Formatting Functions --- def format_movie(movie) -> str: """ Format a movie object into a human-readable string. Parameters: movie: A Plex movie object. Returns: A formatted string containing movie details. """ title = getattr(movie, 'title', 'Unknown Title') year = getattr(movie, 'year', 'Unknown Year') summary = getattr(movie, 'summary', 'No summary available') duration = getattr(movie, 'duration', 0) // 60000 if hasattr(movie, 'duration') else 0 rating = getattr(movie, 'rating', 'Unrated') studio = getattr(movie, 'studio', 'Unknown Studio') directors = [director.tag for director in getattr(movie, 'directors', [])[:3]] actors = [role.tag for role in getattr(movie, 'roles', [])[:5]] return ( f"Title: {title} ({year})\n" f"Rating: {rating}\n" f"Duration: {duration} minutes\n" f"Studio: {studio}\n" f"Directors: {', '.join(directors) if directors else 'Unknown'}\n" f"Starring: {', '.join(actors) if actors else 'Unknown'}\n" f"Summary: {summary}\n" ) def format_playlist(playlist) -> str: """ Format a playlist into a human-readable string. Parameters: playlist: A Plex playlist object. Returns: A formatted string containing playlist details. """ duration_mins = sum(item.duration for item in playlist.items()) // 60000 if playlist.items() else 0 updated = ( playlist.updatedAt.strftime('%Y-%m-%d %H:%M:%S') if hasattr(playlist, 'updatedAt') else 'Unknown' ) return ( f"Playlist: {playlist.title}\n" f"Items: {len(playlist.items())}\n" f"Duration: {duration_mins} minutes\n" f"Last Updated: {updated}\n" ) # --- Plex Client Class --- class PlexClient: """ Encapsulate the Plex connection logic. This class handles initialization and caching of the PlexServer instance. """ def __init__(self, server_url: str = None, token: str = None): self.server_url = server_url or os.environ.get("PLEX_SERVER_URL", "").rstrip("/") self.token = token or os.environ.get("PLEX_TOKEN") if not self.server_url or not self.token: raise ValueError("Missing required configuration: Ensure PLEX_SERVER_URL and PLEX_TOKEN are set.") self._server = None def get_server(self) -> PlexServer: """ Return a cached PlexServer instance or initialize one if not already available. Returns: A connected PlexServer instance. Raises: Exception: If connection initialization fails. """ if self._server is None: try: logger.info("Initializing PlexServer with URL: %s", self.server_url) self._server = PlexServer(self.server_url, self.token) logger.info("Successfully initialized PlexServer.") # Validate the connection self._server.library.sections() # Attempt to fetch library sections logger.info("Plex server connection validated.") except Unauthorized as exc: logger.error("Unauthorized: Invalid Plex token provided.") raise Exception("Unauthorized: Invalid Plex token provided.") from exc except Exception as exc: logger.exception("Error initializing Plex server: %s", exc) raise Exception(f"Error initializing Plex server: {exc}") return self._server # --- Data Classes --- @dataclass class MovieSearchParams: title: Optional[str] = None year: Optional[int] = None director: Optional[str] = None studio: Optional[str] = None genre: Optional[str] = None actor: Optional[str] = None rating: Optional[str] = None country: Optional[str] = None language: Optional[str] = None watched: Optional[bool] = None # True=only watched, False=only unwatched min_duration: Optional[int] = None # in minutes max_duration: Optional[int] = None # in minutes def to_filters(self) -> Dict[str, Any]: FIELD_MAP = { "title": "title", "year": "year", "director": "director", "studio": "studio", "genre": "genre", "actor": "actor", "rating": "rating", "country": "country", "language": "language", "watched": "unwatched", "min_duration": "minDuration", "max_duration": "maxDuration", } filters: Dict[str, Any] = {"libtype": "movie"} for field_name, plex_arg in FIELD_MAP.items(): value = getattr(self, field_name) if value is None: continue if field_name == "watched": # invert for Plex 'unwatched' flag filters["unwatched"] = not value continue if field_name in ("min_duration", "max_duration"): # convert minutes to milliseconds filters[plex_arg] = value * 60_000 continue filters[plex_arg] = value return filters # --- Global Singleton and Access Functions --- _plex_client_instance: PlexClient = None def get_plex_client() -> PlexClient: """ Return the singleton PlexClient instance, initializing it if necessary. Returns: A PlexClient instance. """ global _plex_client_instance if _plex_client_instance is None: _plex_client_instance = PlexClient() return _plex_client_instance async def get_plex_server() -> PlexServer: """ Asynchronously get a PlexServer instance via the singleton PlexClient. Returns: A PlexServer instance. Raises: Exception: When the Plex server connection fails. """ try: plex_client = get_plex_client() # Singleton accessor plex = await asyncio.to_thread(plex_client.get_server) return plex except Exception as e: logger.exception("Failed to get Plex server instance") raise e # --- Tool Methods --- @mcp.tool() async def search_movies( title: Optional[str] = None, year: Optional[int] = None, director: Optional[str] = None, studio: Optional[str] = None, genre: Optional[str] = None, actor: Optional[str] = None, rating: Optional[str] = None, country: Optional[str] = None, language: Optional[str] = None, watched: Optional[bool] = None, min_duration: Optional[int] = None, max_duration: Optional[int] = None, limit: Optional[int] = 5, ) -> str: """ Search for movies in your Plex library using optional filters. Parameters: title: Optional title or substring to match. year: Optional release year to filter by. director: Optional director name to filter by. studio: Optional studio name to filter by. genre: Optional genre tag to filter by. actor: Optional actor name to filter by. rating: Optional rating (e.g., "PG-13") to filter by. country: Optional country of origin to filter by. language: Optional audio or subtitle language to filter by. watched: Optional boolean; True returns only watched movies, False only unwatched. min_duration: Optional minimum duration in minutes. max_duration: Optional maximum duration in minutes. Returns: A formatted string of up to 5 matching movies (with a count of any additional results), or an error message if the search fails or no movies are found. """ # Validate the limit parameter limit = max(1, limit) if limit else 5 # Default to 5 if limit is 0 or negative params = MovieSearchParams( title, year, director, studio, genre, actor, rating, country, language, watched, min_duration, max_duration ) filters = params.to_filters() logger.info("Searching Plex with filters: %r", filters) try: plex = await get_plex_server() movies = await asyncio.to_thread(plex.library.search, **filters) except Exception as e: logger.exception("search_movies failed connecting to Plex") return f"ERROR: Could not search Plex. {e}" if not movies: return f"No movies found matching filters {filters!r}." logger.info("Found %d movies matching filters: %r", len(movies), filters) results: List[str] = [] for i, m in enumerate(movies[:limit], start=1): results.append(f"Result #{i}:\nKey: {m.ratingKey}\n{format_movie(m)}") if len(movies) > limit: results.append(f"\n... and {len(movies)-limit} more results.") return "\n---\n".join(results) @mcp.tool() async def get_movie_details(movie_key: str) -> str: """ Get detailed information about a specific movie. Parameters: movie_key: The key identifying the movie. Returns: A formatted string with movie details or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: key = int(movie_key) all_movies = await asyncio.to_thread(lambda: plex.library.search(libtype="movie")) movie = next((m for m in all_movies if m.ratingKey == key), None) if not movie: return f"No movie found with key {movie_key}." return format_movie(movie) except NotFound: return f"ERROR: Movie with key {movie_key} not found." except Exception as e: logger.exception("Failed to fetch movie details for key '%s'", movie_key) return f"ERROR: Failed to fetch movie details. {str(e)}" @mcp.tool() async def list_playlists() -> str: """ List all playlists in the Plex server. Returns: A formatted string of playlists or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: playlists = await asyncio.to_thread(plex.playlists) if not playlists: return "No playlists found in your Plex server." formatted_playlists = [] for i, playlist in enumerate(playlists, 1): formatted_playlists.append( f"Playlist #{i}:\nKey: {playlist.ratingKey}\n{format_playlist(playlist)}" ) return "\n---\n".join(formatted_playlists) except Exception as e: logger.exception("Failed to fetch playlists") return f"ERROR: Failed to fetch playlists. {str(e)}" @mcp.tool() async def get_playlist_items(playlist_key: str) -> str: """ Get the items in a specific playlist. Parameters: playlist_key: The key of the playlist to retrieve items from. Returns: A formatted string of playlist items or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: key = int(playlist_key) all_playlists = await asyncio.to_thread(plex.playlists) playlist = next((p for p in all_playlists if p.ratingKey == key), None) if not playlist: return f"No playlist found with key {playlist_key}." items = playlist.items() if not items: return "No items found in this playlist." formatted_items = [] for i, item in enumerate(items, 1): title = item.title year = getattr(item, 'year', '') type_str = item.type.capitalize() formatted_items.append(f"{i}. {title} ({year}) - {type_str}") return "\n".join(formatted_items) except NotFound: return f"ERROR: Playlist with key {playlist_key} not found." except Exception as e: logger.exception("Failed to fetch items for playlist key '%s'", playlist_key) return f"ERROR: Failed to fetch playlist items. {str(e)}" @mcp.tool() async def create_playlist(name: str, movie_keys: str) -> str: """ Create a new playlist with specified movies. Parameters: name: The desired name for the new playlist. movie_keys: A comma-separated string of movie keys to include. Returns: A success message with playlist details or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: movie_key_list = [int(key.strip()) for key in movie_keys.split(",") if key.strip()] if not movie_key_list: return "ERROR: No valid movie keys provided." logger.info("Creating playlist '%s' with movie keys: %s", name, movie_keys) all_movies = await asyncio.to_thread(lambda: plex.library.search(libtype="movie")) logger.info("Found %d total movies in library", len(all_movies)) movie_map = {movie.ratingKey: movie for movie in all_movies} movies = [] not_found_keys = [] for key in movie_key_list: if key in movie_map: movies.append(movie_map[key]) logger.info("Found movie: %s (Key: %d)", movie_map[key].title, key) else: not_found_keys.append(key) logger.warning("Could not find movie with key: %d", key) if not_found_keys: return f"ERROR: Some movie keys were not found: {', '.join(str(k) for k in not_found_keys)}" if not movies: return "ERROR: No valid movies found with the provided keys." try: playlist_future = asyncio.create_task( asyncio.to_thread(lambda: plex.createPlaylist(name, items=movies)) ) playlist = await asyncio.wait_for(playlist_future, timeout=15.0) logger.info("Playlist created successfully: %s", playlist.title) return f"Successfully created playlist '{name}' with {len(movies)} movie(s).\nPlaylist Key: {playlist.ratingKey}" except asyncio.TimeoutError: logger.warning("Playlist creation is taking longer than expected for '%s'", name) return ("PENDING: Playlist creation is taking longer than expected. " "The operation might still complete in the background. " "Please check your Plex server to confirm.") except ValueError as e: logger.error("Invalid input format for movie keys: %s", e) return f"ERROR: Invalid input format. Please check movie keys are valid numbers. {str(e)}" except Exception as e: logger.exception("Error creating playlist") return f"ERROR: Failed to create playlist. {str(e)}" @mcp.tool() async def delete_playlist(playlist_key: str) -> str: """ Delete a playlist from the Plex server. Parameters: playlist_key: The key of the playlist to delete. Returns: A success message if deletion is successful, or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: key = int(playlist_key) all_playlists = await asyncio.to_thread(plex.playlists) playlist = next((p for p in all_playlists if p.ratingKey == key), None) if not playlist: return f"No playlist found with key {playlist_key}." await asyncio.to_thread(playlist.delete) logger.info("Playlist '%s' with key %s successfully deleted.", playlist.title, playlist_key) return f"Successfully deleted playlist '{playlist.title}' with key {playlist_key}." except NotFound: return f"ERROR: Playlist with key {playlist_key} not found." except Exception as e: logger.exception("Failed to delete playlist with key '%s'", playlist_key) return f"ERROR: Failed to delete playlist. {str(e)}" @mcp.tool() async def add_to_playlist(playlist_key: str, movie_key: str) -> str: """ Add a movie to an existing playlist. Parameters: playlist_key: The key of the playlist. movie_key: The key of the movie to add. Returns: A success message if the movie is added, or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: p_key = int(playlist_key) m_key = int(movie_key) # Find the playlist all_playlists = await asyncio.to_thread(plex.playlists) playlist = next((p for p in all_playlists if p.ratingKey == p_key), None) if not playlist: return f"No playlist found with key {playlist_key}." # Perform a global search for the movie movies = await asyncio.to_thread(lambda: plex.library.search(libtype="movie", ratingKey=m_key)) if not movies: return f"No movie found with key {movie_key}." movie = movies[0] # Since the search is scoped to the ratingKey, there should be at most one result # Add the movie to the playlist await asyncio.to_thread(lambda p=playlist, m=movie: p.addItems([m])) logger.info("Added movie '%s' to playlist '%s'", movie.title, playlist.title) return f"Successfully added '{movie.title}' to playlist '{playlist.title}'." except ValueError: return "ERROR: Invalid playlist or movie key. Please provide valid numbers." except Exception as e: logger.exception("Failed to add movie to playlist") return f"ERROR: Failed to add movie to playlist. {str(e)}" @mcp.tool() async def recent_movies(count: int = 5) -> str: """ Get recently added movies from the Plex library. Parameters: count: The maximum number of recent movies to return. Returns: A formatted string of recent movies or an error message. """ if count <= 0: return "ERROR: Count must be a positive integer." try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: # Perform a global search for recently added movies all_recent = await asyncio.to_thread(lambda: plex.library.search(libtype="movie", sort="addedAt:desc")) recent_movies_list = all_recent[:count] if not recent_movies_list: return "No recent movies found in your Plex library." formatted_movies = [] for i, movie in enumerate(recent_movies_list, 1): formatted_movies.append( f"Recent Movie #{i}:\nKey: {movie.ratingKey}\nAdded: {movie.addedAt.strftime('%Y-%m-%d')}\n{format_movie(movie)}" ) return "\n---\n".join(formatted_movies) except Exception as e: logger.exception("Failed to fetch recent movies") return f"ERROR: Failed to fetch recent movies. {str(e)}" @mcp.tool() async def get_movie_genres(movie_key: str) -> str: """ Get genres for a specific movie. Parameters: movie_key: The key of the movie. Returns: A formatted string of movie genres or an error message. """ try: plex = await get_plex_server() except Exception as e: return f"ERROR: Could not connect to Plex server. {str(e)}" try: key = int(movie_key) # Perform a global search for the movie all_movies = await asyncio.to_thread(lambda: plex.library.search(libtype="movie")) movie = next((m for m in all_movies if m.ratingKey == key), None) if not movie: return f"No movie found with key {movie_key}." # Extract genres genres = [genre.tag for genre in movie.genres] if hasattr(movie, 'genres') else [] if not genres: return f"No genres found for movie '{movie.title}'." return f"Genres for '{movie.title}':\n{', '.join(genres)}" except ValueError: return f"ERROR: Invalid movie key '{movie_key}'. Please provide a valid number." except Exception as e: logger.exception("Failed to fetch genres for movie with key '%s'", movie_key) return f"ERROR: Failed to fetch movie genres. {str(e)}" # --- Main Execution --- if __name__ == "__main__": mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/djbriane/plex-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server