Skip to main content
Glama
server.py39.4 kB
import argparse import asyncio import logging from typing import Any import httpx from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.types import Tool, TextContent logger = logging.getLogger("sonarr-mcp") server = Server("sonarr-mcp") # Configuration - will be set from command line args SONARR_URL = "" SONARR_API_KEY = "" @server.list_tools() async def handle_list_tools() -> list[Tool]: """List available Sonarr tools.""" return [ Tool( name="search_series", description="Search for TV series in Sonarr", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "TV series title to search for" } }, "required": ["query"] } ), Tool( name="add_series", description="Add a TV series to Sonarr", inputSchema={ "type": "object", "properties": { "tvdb_id": { "type": "integer", "description": "TVDB ID of the series" }, "quality_profile_id": { "type": "integer", "description": "Quality profile ID (use get_quality_profiles to see available options)" }, "root_folder_path": { "type": "string", "description": "Root folder path for the series", "default": "/tv" }, "season_folder": { "type": "boolean", "description": "Whether to use season folders", "default": True }, "monitor_type": { "type": "string", "description": "Which episodes to monitor: 'all' (all episodes), 'future' (future episodes only), 'missing' (missing episodes), 'existing' (existing episodes), 'recent' (recent episodes), 'first' (first season), 'latest' (latest season), 'none' (no episodes), 'season_specific' (monitor specific seasons only - requires monitor_seasons)", "enum": ["all", "future", "missing", "existing", "recent", "first", "latest", "none", "season_specific"] }, "monitor_seasons": { "type": "array", "items": {"type": "integer"}, "description": "Specific season numbers to monitor (only used with monitor_type: 'season_specific'). Example: [1, 3, 5]" }, "search_for_missing_episodes": { "type": "boolean", "description": "Whether to automatically search for missing episodes after adding", "default": False } }, "required": ["tvdb_id", "quality_profile_id", "monitor_type"] } ), Tool( name="list_series", description="Get all TV series from Sonarr", inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_series", description="Get detailed information about a specific TV series", inputSchema={ "type": "object", "properties": { "series_id": { "type": "integer", "description": "Sonarr series ID (use the ID from list_series output)" } }, "required": ["series_id"] } ), Tool( name="get_quality_profiles", description="Get all quality profiles from Sonarr", inputSchema={ "type": "object", "properties": {} } ), Tool( name="interactive_search", description="Search for available releases for a TV series episode", inputSchema={ "type": "object", "properties": { "series_id": { "type": "integer", "description": "Sonarr series ID (use the ID from list_series output)" }, "season_number": { "type": "integer", "description": "Season number (optional, searches all seasons if not provided)" }, "episode_number": { "type": "integer", "description": "Episode number (optional, searches all episodes if not provided)" } }, "required": ["series_id"] } ), Tool( name="update_series", description="Update series settings (quality profile, monitoring, search)", inputSchema={ "type": "object", "properties": { "series_id": { "type": "integer", "description": "Sonarr series ID (use the ID from list_series output)" }, "quality_profile_id": { "type": "integer", "description": "New quality profile ID (optional, use get_quality_profiles to see options)" }, "monitor_type": { "type": "string", "description": "Which episodes to monitor: 'all', 'future', 'missing', 'existing', 'recent', 'first', 'latest', 'none'", "enum": ["all", "future", "missing", "existing", "recent", "first", "latest", "none"] }, "monitor_seasons": { "type": "array", "items": {"type": "integer"}, "description": "Specific season numbers to monitor (overrides monitor_type if provided). Example: [1, 3, 5]" }, "start_search": { "type": "boolean", "description": "Whether to search for missing episodes in monitored seasons after update", "default": False } }, "required": ["series_id"] } ), Tool( name="delete_series", description="Delete a TV series or specific seasons from Sonarr", inputSchema={ "type": "object", "properties": { "series_id": { "type": "integer", "description": "Sonarr series ID (use the ID from list_series output)" }, "delete_seasons": { "type": "array", "items": {"type": "integer"}, "description": "Specific season numbers to delete (if not provided, deletes entire series). Example: [2, 4]" }, "delete_files": { "type": "boolean", "description": "Whether to delete files from disk (applies to entire series or specific seasons)", "default": False } }, "required": ["series_id"] } ), Tool( name="download_release", description="Download a specific release for an episode", inputSchema={ "type": "object", "properties": { "release_guid": { "type": "string", "description": "Release GUID from interactive_search results" }, "series_id": { "type": "integer", "description": "Sonarr series ID" } }, "required": ["release_guid", "series_id"] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[TextContent]: """Handle tool calls.""" if not SONARR_URL or not SONARR_API_KEY: return [TextContent(type="text", text="Error: Sonarr URL and API key must be configured")] if arguments is None: arguments = {} async with httpx.AsyncClient() as client: headers = {"X-Api-Key": SONARR_API_KEY} if name == "search_series": query = arguments.get("query", "") url = f"{SONARR_URL}/api/v3/series/lookup" params = {"term": query} try: response = await client.get(url, headers=headers, params=params) response.raise_for_status() series_list = response.json() if not series_list: return [TextContent(type="text", text=f"No TV series found for '{query}'")] result = "Found TV series:\n" for series in series_list[:5]: # Limit to first 5 results title = series.get("title", "Unknown") year = series.get("year", "Unknown") tvdb_id = series.get("tvdbId", "Unknown") overview = series.get("overview", "No overview available")[:100] + "..." result += f"\n• {title} ({year}) [TVDB: {tvdb_id}]\n {overview}\n" return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error searching TV series: {e}")] elif name == "list_series": url = f"{SONARR_URL}/api/v3/series" try: response = await client.get(url, headers=headers) response.raise_for_status() series_list = response.json() if not series_list: return [TextContent(type="text", text="No TV series in Sonarr library")] result = f"Sonarr library ({len(series_list)} TV series):\n" for series in series_list: title = series.get("title", "Unknown") year = series.get("year", "Unknown") series_id = series.get("id", "Unknown") # For TV series, we check episode statistics instead of hasFile statistics = series.get("statistics", {}) episode_file_count = statistics.get("episodeFileCount", 0) episode_count = statistics.get("episodeCount", 0) status = f"{episode_file_count}/{episode_count} episodes" result += f"• ID {series_id} - {title} ({year}) - {status}\n" return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error getting TV series: {e}")] elif name == "get_series": series_id = arguments.get("series_id") try: # Get series details series_url = f"{SONARR_URL}/api/v3/series/{series_id}" response = await client.get(series_url, headers=headers) response.raise_for_status() series = response.json() # Get quality profile name quality_profile_id = series.get("qualityProfileId") quality_profiles_url = f"{SONARR_URL}/api/v3/qualityprofile" profiles_response = await client.get(quality_profiles_url, headers=headers) profiles_response.raise_for_status() profiles = profiles_response.json() quality_profile_name = "Unknown" for profile in profiles: if profile.get("id") == quality_profile_id: quality_profile_name = profile.get("name", "Unknown") break # Check for active downloads/queue items queue_url = f"{SONARR_URL}/api/v3/queue" queue_response = await client.get(queue_url, headers=headers) queue_response.raise_for_status() queue_items = queue_response.json().get("records", []) # Find any downloads for this series active_downloads = [] for item in queue_items: if item.get("seriesId") == series_id: active_downloads.append({ "episode": item.get("episode", {}).get("title", "Unknown Episode"), "status": item.get("status", "Unknown"), "progress": item.get("sizeleft", 0), "total_size": item.get("size", 0), "eta": item.get("timeleft", "Unknown") }) # Build detailed result title = series.get("title", "Unknown") year = series.get("year", "Unknown") overview = series.get("overview", "No overview available") status = series.get("status", "Unknown") monitored = series.get("monitored", False) # Episode statistics statistics = series.get("statistics", {}) episode_file_count = statistics.get("episodeFileCount", 0) episode_count = statistics.get("episodeCount", 0) total_episode_count = statistics.get("totalEpisodeCount", 0) size_on_disk = statistics.get("sizeOnDisk", 0) size_gb = round(size_on_disk / (1024**3), 2) if size_on_disk else 0 # External IDs tvdb_id = series.get("tvdbId", "Unknown") imdb_id = series.get("imdbId", "Unknown") result = f"📺 {title} ({year})\n" result += f"ID: {series_id} | TVDB: {tvdb_id} | IMDb: {imdb_id}\n" result += f"Status: {status} | Monitored: {'Yes' if monitored else 'No'}\n" result += f"Quality Profile: {quality_profile_name} (ID: {quality_profile_id})\n" result += f"Episodes: {episode_file_count}/{episode_count} downloaded" if total_episode_count != episode_count: result += f" ({total_episode_count} total)\n" else: result += "\n" result += f"Size on disk: {size_gb} GB\n\n" if active_downloads: result += "⬇️ Active Downloads:\n" for download in active_downloads: progress_percent = 0 if download["total_size"] > 0: progress_percent = round(((download["total_size"] - download["progress"]) / download["total_size"]) * 100, 1) result += f" • {download['episode']}\n" result += f" Status: {download['status']} | Progress: {progress_percent}% | ETA: {download['eta']}\n" result += "\n" result += f"📝 Overview:\n{overview}" return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error getting series details: {e}")] elif name == "add_series": tvdb_id = arguments.get("tvdb_id") quality_profile_id = arguments.get("quality_profile_id") root_folder_path = arguments.get("root_folder_path", "/tv") season_folder = arguments.get("season_folder", True) monitor_type = arguments.get("monitor_type") # Now required, no default monitor_seasons = arguments.get("monitor_seasons") search_for_missing_episodes = arguments.get("search_for_missing_episodes", False) # Validation rules if monitor_type == "season_specific": if not monitor_seasons: return [TextContent(type="text", text="Error: monitor_seasons array required when monitor_type is 'season_specific'")] else: if monitor_seasons: return [TextContent(type="text", text="Error: monitor_seasons can only be used with monitor_type: 'season_specific'")] # First lookup the series details lookup_url = f"{SONARR_URL}/api/v3/series/lookup" params = {"term": f"tvdb:{tvdb_id}"} try: response = await client.get(lookup_url, headers=headers, params=params) response.raise_for_status() lookup_results = response.json() if not lookup_results: return [TextContent(type="text", text=f"No series found with TVDB ID {tvdb_id}")] series_data = lookup_results[0] # Take the first result # Handle season monitoring if monitor_type == "season_specific": # Set specific seasons to monitor (monitor_seasons is guaranteed to exist due to validation) for season in series_data.get("seasons", []): season["monitored"] = season.get("seasonNumber") in monitor_seasons elif monitor_type == "first": # Monitor only first season for season in series_data.get("seasons", []): season["monitored"] = season.get("seasonNumber") == 1 elif monitor_type == "latest": # Monitor only latest season seasons = series_data.get("seasons", []) if seasons: latest_season_num = max(s.get("seasonNumber", 0) for s in seasons if s.get("seasonNumber", 0) > 0) for season in seasons: season["monitored"] = season.get("seasonNumber") == latest_season_num elif monitor_type == "none": # Monitor no seasons for season in series_data.get("seasons", []): season["monitored"] = False else: # Monitor all seasons for other types (all, future, missing, existing, recent) for season in series_data.get("seasons", []): season["monitored"] = True # Prepare series for adding series_data.update({ "qualityProfileId": quality_profile_id, "rootFolderPath": root_folder_path, "seasonFolder": season_folder, "monitored": True, "addOptions": { "searchForMissingEpisodes": search_for_missing_episodes, "monitor": "none" if monitor_type == "season_specific" else monitor_type } }) # Add the series add_url = f"{SONARR_URL}/api/v3/series" response = await client.post(add_url, headers=headers, json=series_data) response.raise_for_status() added_series = response.json() title = added_series.get("title", "Unknown") year = added_series.get("year", "Unknown") series_id = added_series.get("id") # If using season_specific monitoring, update the series after a brief delay if monitor_type == "season_specific" and series_id: # Wait for Sonarr to fully process the series and populate seasons await asyncio.sleep(2) # Get the added series data to ensure we have the correct season structure get_url = f"{SONARR_URL}/api/v3/series/{series_id}" get_response = await client.get(get_url, headers=headers) get_response.raise_for_status() current_series_data = get_response.json() # Set season monitoring on the actual series data from Sonarr for season in current_series_data.get("seasons", []): season["monitored"] = season.get("seasonNumber") in monitor_seasons # Update the series with correct season monitoring put_url = f"{SONARR_URL}/api/v3/series/{series_id}" put_response = await client.put(put_url, headers=headers, json=current_series_data) put_response.raise_for_status() monitor_msg = "" if monitor_type == "season_specific": monitor_msg = f" (monitoring seasons: {', '.join(map(str, monitor_seasons))})" else: monitor_msg = f" (monitor type: {monitor_type})" return [TextContent(type="text", text=f"Successfully added '{title} ({year})' to Sonarr{monitor_msg}")] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error adding series: {e}")] elif name == "get_quality_profiles": url = f"{SONARR_URL}/api/v3/qualityprofile" try: response = await client.get(url, headers=headers) response.raise_for_status() profiles = response.json() if not profiles: return [TextContent(type="text", text="No quality profiles found")] result = "Available quality profiles:\n" for profile in profiles: profile_id = profile.get("id", "Unknown") name = profile.get("name", "Unknown") result += f"• {name} (ID: {profile_id})\n" return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error getting quality profiles: {e}")] elif name == "interactive_search": series_id = arguments.get("series_id") season_number = arguments.get("season_number") episode_number = arguments.get("episode_number") try: # If we have specific season/episode, search for that episode if season_number is not None and episode_number is not None: # Get episode ID first episodes_url = f"{SONARR_URL}/api/v3/episode" episodes_params = {"seriesId": series_id} episodes_response = await client.get(episodes_url, headers=headers, params=episodes_params) episodes_response.raise_for_status() episodes = episodes_response.json() target_episode = None for episode in episodes: if episode.get("seasonNumber") == season_number and episode.get("episodeNumber") == episode_number: target_episode = episode break if not target_episode: return [TextContent(type="text", text=f"Episode S{season_number:02d}E{episode_number:02d} not found for series ID {series_id}")] url = f"{SONARR_URL}/api/v3/release" params = {"episodeId": target_episode["id"]} search_description = f"episode S{season_number:02d}E{episode_number:02d}" else: # Search for all missing episodes in the series url = f"{SONARR_URL}/api/v3/release" params = {"seriesId": series_id} search_description = f"series ID {series_id}" response = await client.get(url, headers=headers, params=params) response.raise_for_status() releases = response.json() if not releases: return [TextContent(type="text", text=f"No releases found for {search_description}")] # Sort by seeders (highest first), treating None/missing as 0 releases.sort(key=lambda x: x.get("seeders", 0) or 0, reverse=True) result = f"Available releases for {search_description} (sorted by seeders):\n\n" for release in releases[:20]: # Show top 20 releases title = release.get("title", "Unknown") indexer = release.get("indexer", "Unknown") size_bytes = release.get("size", 0) size_gb = round(size_bytes / (1024**3), 2) if size_bytes else 0 # Get protocol (torrent vs usenet) protocol = release.get("protocol", "Unknown") # Torrent-specific info seeders = release.get("seeders") leechers = release.get("leechers") # Release identification guid = release.get("guid", "Unknown") # Quality info quality = release.get("quality", {}).get("quality", {}).get("name", "Unknown") # Age age_hours = release.get("ageHours", 0) age_days = round(age_hours / 24, 1) if age_hours else 0 result += f"📁 {title}\n" result += f" Quality: {quality} | Size: {size_gb} GB | Age: {age_days} days\n" result += f" Indexer: {indexer} ({protocol})\n" if protocol.lower() == "torrent" and (seeders is not None or leechers is not None): result += f" Seeds: {seeders or 0} | Leechers: {leechers or 0}\n" # Add GUID for download_release tool result += f" Release ID: {guid}\n" result += "\n" if len(releases) > 20: result += f"... and {len(releases) - 20} more releases available" return [TextContent(type="text", text=result)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error searching releases: {e}")] elif name == "update_series": series_id = arguments.get("series_id") quality_profile_id = arguments.get("quality_profile_id") monitor_type = arguments.get("monitor_type") monitor_seasons = arguments.get("monitor_seasons") start_search = arguments.get("start_search", False) try: # First get the current series data get_url = f"{SONARR_URL}/api/v3/series/{series_id}" response = await client.get(get_url, headers=headers) response.raise_for_status() series_data = response.json() # Update quality profile if provided if quality_profile_id: series_data["qualityProfileId"] = quality_profile_id # Update monitoring if provided if monitor_seasons: # Set specific seasons to monitor for season in series_data.get("seasons", []): season["monitored"] = season.get("seasonNumber") in monitor_seasons elif monitor_type: # Use monitor_type for all seasons if monitor_type == "first": # Monitor only first season for season in series_data.get("seasons", []): season["monitored"] = season.get("seasonNumber") == 1 elif monitor_type == "latest": # Monitor only latest season seasons = series_data.get("seasons", []) if seasons: latest_season_num = max(s.get("seasonNumber", 0) for s in seasons if s.get("seasonNumber", 0) > 0) for season in seasons: season["monitored"] = season.get("seasonNumber") == latest_season_num elif monitor_type == "none": # Monitor no seasons for season in series_data.get("seasons", []): season["monitored"] = False else: # Monitor all seasons for other types (all, future, missing, existing, recent) for season in series_data.get("seasons", []): season["monitored"] = True # Update the series put_url = f"{SONARR_URL}/api/v3/series/{series_id}" response = await client.put(put_url, headers=headers, json=series_data) response.raise_for_status() updated_series = response.json() title = updated_series.get("title", "Unknown") result_msg = f"Successfully updated '{title}'" # Add monitoring info to result if monitor_seasons: result_msg += f" (now monitoring seasons: {', '.join(map(str, monitor_seasons))})" elif monitor_type: result_msg += f" (monitor type: {monitor_type})" # Start search if requested if start_search: search_url = f"{SONARR_URL}/api/v3/command" search_command = { "name": "SeriesSearch", "seriesId": series_id } search_response = await client.post(search_url, headers=headers, json=search_command) search_response.raise_for_status() result_msg += " and started search for missing episodes in monitored seasons" return [TextContent(type="text", text=result_msg)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error updating series: {e}")] elif name == "delete_series": series_id = arguments.get("series_id") delete_seasons = arguments.get("delete_seasons") delete_files = arguments.get("delete_files", False) try: # First get series info for confirmation message get_url = f"{SONARR_URL}/api/v3/series/{series_id}" response = await client.get(get_url, headers=headers) response.raise_for_status() series_data = response.json() title = series_data.get("title", "Unknown") if delete_seasons: # Delete specific seasons by unmonitoring them and optionally deleting files for season in series_data.get("seasons", []): if season.get("seasonNumber") in delete_seasons: season["monitored"] = False # Update the series with unmonitored seasons put_url = f"{SONARR_URL}/api/v3/series/{series_id}" response = await client.put(put_url, headers=headers, json=series_data) response.raise_for_status() result_msg = f"Successfully unmonitored seasons {', '.join(map(str, delete_seasons))} for '{title}'" # If delete_files is True, delete episode files for those seasons if delete_files: episodes_url = f"{SONARR_URL}/api/v3/episode" episodes_params = {"seriesId": series_id} episodes_response = await client.get(episodes_url, headers=headers, params=episodes_params) episodes_response.raise_for_status() episodes = episodes_response.json() # Find episode files for the seasons to delete episode_file_ids = [] for episode in episodes: if (episode.get("seasonNumber") in delete_seasons and episode.get("hasFile", False) and episode.get("episodeFileId")): episode_file_ids.append(episode["episodeFileId"]) # Delete the episode files if episode_file_ids: for file_id in set(episode_file_ids): # Remove duplicates file_delete_url = f"{SONARR_URL}/api/v3/episodefile/{file_id}" await client.delete(file_delete_url, headers=headers) result_msg += f" and deleted {len(set(episode_file_ids))} episode files from disk" else: # Delete the entire series delete_url = f"{SONARR_URL}/api/v3/series/{series_id}" params = {"deleteFiles": str(delete_files).lower()} response = await client.delete(delete_url, headers=headers, params=params) response.raise_for_status() result_msg = f"Successfully deleted '{title}' from Sonarr" if delete_files: result_msg += " (including files from disk)" return [TextContent(type="text", text=result_msg)] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error deleting series/seasons: {e}")] elif name == "download_torrent": release_guid = arguments.get("release_guid") movie_id = arguments.get("movie_id") try: # First, get the original release data by searching again search_url = f"{SONARR_URL}/api/v3/release" search_params = {"movieId": movie_id} search_response = await client.get(search_url, headers=headers, params=search_params) search_response.raise_for_status() releases = search_response.json() # Find the specific release by GUID target_release = None for release in releases: if release.get("guid") == release_guid: target_release = release break if not target_release: return [TextContent(type="text", text=f"Release with GUID {release_guid} not found")] # Send the complete release data to download download_url = f"{SONARR_URL}/api/v3/release" response = await client.post(download_url, headers=headers, json=target_release) response.raise_for_status() # Get movie title for response movie_url = f"{SONARR_URL}/api/v3/movie/{movie_id}" movie_response = await client.get(movie_url, headers=headers) movie_response.raise_for_status() movie_data = movie_response.json() movie_title = movie_data.get("title", "Unknown") return [TextContent(type="text", text=f"Successfully started download for '{movie_title}' (Release ID: {release_guid})")] except httpx.HTTPError as e: return [TextContent(type="text", text=f"Error downloading torrent: {e}")] else: return [TextContent(type="text", text=f"Unknown tool: {name}")] async def run_server(): """Run the MCP server.""" global SONARR_URL, SONARR_API_KEY logger.debug(f"Starting Sonarr MCP server for {SONARR_URL}") # Run the server using stdin/stdout streams from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="sonarr-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) def main(): """Main entry point for the server.""" parser = argparse.ArgumentParser(description="Sonarr MCP Server") parser.add_argument("--url", required=True, help="Sonarr base URL (e.g., http://localhost:8989)") parser.add_argument("--api-token", required=True, help="Sonarr API token") parser.add_argument("--debug", action="store_true", help="Enable debug logging") args = parser.parse_args() # Configure logging based on debug flag if args.debug: logging.basicConfig(level=logging.DEBUG) logging.getLogger("httpx").setLevel(logging.DEBUG) else: logging.basicConfig(level=logging.ERROR) logging.getLogger("httpx").setLevel(logging.ERROR) global SONARR_URL, SONARR_API_KEY SONARR_URL = args.url.rstrip('/') # Remove trailing slash if present SONARR_API_KEY = args.api_token asyncio.run(run_server()) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MichaelReubenDev/sonarr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server