Skip to main content
Glama
tools.py12.3 kB
from collections.abc import Sequence from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, ) import json import os from . import overseerr # Constants for tool names TOOL_GET_STATUS = "overseerr_status" TOOL_GET_MOVIE_REQUESTS = "overseerr_movie_requests" TOOL_GET_TV_REQUESTS = "overseerr_tv_requests" # Environment variables api_key = os.getenv("OVERSEERR_API_KEY", "") url = os.getenv("OVERSEERR_URL", "") if not api_key or not url: raise ValueError("OVERSEERR_API_KEY and OVERSEERR_URL environment variables are required") # Media status mapping MEDIA_STATUS_MAPPING = { 1: "UNKNOWN", 2: "PENDING", 3: "PROCESSING", 4: "PARTIALLY_AVAILABLE", 5: "AVAILABLE" } class ToolHandler(): def __init__(self, tool_name: str): self.name = tool_name def get_tool_description(self) -> Tool: raise NotImplementedError() async def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: raise NotImplementedError() class StatusToolHandler(ToolHandler): def __init__(self): super().__init__(TOOL_GET_STATUS) def get_tool_description(self): return Tool( name=self.name, description="Get the status of the Overseerr server.", inputSchema={ "type": "object", "properties": {}, "required": [] }, ) async def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: # Create asynchronous client client = overseerr.Overseerr(api_key=api_key, url=url) data = await client.get_status() if "version" in data: status_response = f"\n---\nOverseerr is available and these are the status data:\n" status_response += "\n- " + "\n- ".join([f"{key}: {val}" for key, val in data.items()]) else: status_response = f"\n---\nOverseerr is not available and below is the request error: \n" status_response += "\n- " + "\n- ".join([f"{key}: {val}" for key, val in data.items()]) return [ TextContent( type="text", text=status_response ) ] class MovieRequestsToolHandler(ToolHandler): def __init__(self): super().__init__(TOOL_GET_MOVIE_REQUESTS) def get_tool_description(self): return Tool( name=self.name, description="Get the list of all movie requests that satisfies the filter arguments.", inputSchema={ "type": "object", "properties": { "status": { "type": "string", "description": "Filter by media availability status.", "enum": ["all", "approved", "available", "pending", "processing", "unavailable", "failed"] }, "start_date": { "type": "string", "description": "Filter for the date of request, formatted as '2020-09-12T10:00:27.000Z'" } } }, ) async def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: # Extract arguments status = args.get("status") start_date = args.get("start_date") # Now using asynchronous approach results = await self.get_movie_requests(status, start_date) return [ TextContent( type="text", text=json.dumps(results, indent=2) ) ] async def get_movie_requests(self, status=None, start_date=None): client = overseerr.Overseerr(api_key=api_key, url=url) # Parameter validation valid_statuses = ["all", "approved", "available", "pending", "processing", "unavailable", "failed"] if status and status not in valid_statuses: status = None # Initialize pagination parameters take = 20 # Number of items per page skip = 0 # Starting offset has_more = True all_results = [] # Process all pages while has_more: # Prepare params params = { "take": take, "skip": skip } # Add filter if specified if status: params["filter"] = status # Call API response = await client.get_requests(params) # Process results results = response.get("results", []) for result in results: # Only process if it's a movie (no tvdbId) media_info = result.get("media", {}) if media_info and not media_info.get("tvdbId"): # Check if request date matches the filter if provided created_at = result.get("createdAt", "") if start_date and start_date > created_at: continue # Get movie details to get the title movie_id = media_info.get("tmdbId") movie_details = await client.get_movie_details(movie_id) # Map media availability to string value media_status_code = media_info.get("status", 1) media_availability = MEDIA_STATUS_MAPPING.get(media_status_code, "UNKNOWN") # Create formatted result formatted_result = { "title": movie_details.get("title", "Unknown Movie"), "media_availability": media_availability, "request_date": created_at } all_results.append(formatted_result) # Check if there are more pages page_info = response.get("pageInfo", {}) if page_info.get("pages", 0) <= (skip // take) + 1: has_more = False else: skip += take return all_results class TvRequestsToolHandler(ToolHandler): def __init__(self): super().__init__(TOOL_GET_TV_REQUESTS) def get_tool_description(self): return Tool( name=self.name, description="Get the list of all TV requests that satisfies the filter arguments.", inputSchema={ "type": "object", "properties": { "status": { "type": "string", "description": "Filter by media availability status.", "enum": ["all", "approved", "available", "pending", "processing", "unavailable", "failed"] }, "start_date": { "type": "string", "description": "Filter for the date of request, formatted as '2020-09-12T10:00:27.000Z'" } } }, ) async def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]: # Extract arguments status = args.get("status") start_date = args.get("start_date") # Now using asynchronous approach results = await self.get_tv_requests(status, start_date) return [ TextContent( type="text", text=json.dumps(results, indent=2) ) ] async def get_tv_requests(self, status=None, start_date=None): client = overseerr.Overseerr(api_key=api_key, url=url) # Parameter validation valid_statuses = ["all", "approved", "available", "pending", "processing", "unavailable", "failed"] if status and status not in valid_statuses: status = None # Initialize pagination parameters take = 20 # Number of items per page skip = 0 # Starting offset has_more = True all_results = [] # Process all pages while has_more: # Prepare params params = { "take": take, "skip": skip } # Add filter if specified if status: params["filter"] = status # Call API response = await client.get_requests(params) # Process results results = response.get("results", []) for result in results: # Only process if it's a TV show (has tvdbId) media_info = result.get("media", {}) if media_info and media_info.get("tvdbId"): # Check if request date matches the filter if provided created_at = result.get("createdAt", "") if start_date and start_date > created_at: continue # Get TV details to get the title and seasons tv_id = media_info.get("tmdbId") tv_details = await client.get_tv_details(tv_id) # Map media availability to string value media_status_code = media_info.get("status", 1) tv_title_availability = MEDIA_STATUS_MAPPING.get(media_status_code, "UNKNOWN") # Get seasons information seasons = tv_details.get("seasons", []) # For each season, get more detailed info including episodes for season in seasons: season_number = season.get("seasonNumber", 0) # Skip if it's a special season (season 0) if season_number == 0: continue # Format season string (e.g., S01) season_str = f"S{season_number:02d}" # Get detailed season info including episodes season_details = await client.get_season_details(tv_id, season_number) # Season availability is assumed to be the same as the show tv_season_availability = tv_title_availability # Process episodes episodes = season_details.get("episodes", []) episode_details = [] for episode in episodes: episode_number = episode.get("episodeNumber", 0) episode_details.append({ "episode_number": f"{episode_number:02d}", "episode_name": episode.get("name", f"Episode {episode_number}") }) # Create formatted result for this season formatted_result = { "tv_title": tv_details.get("name", "Unknown TV Show"), "tv_title_availability": tv_title_availability, "tv_season": season_str, "tv_season_availability": tv_season_availability, "tv_episodes": episode_details, "request_date": created_at } all_results.append(formatted_result) # Check if there are more pages page_info = response.get("pageInfo", {}) if page_info.get("pages", 0) <= (skip // take) + 1: has_more = False else: skip += take return all_results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/reallycleansauce/overseerr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server