Encoding DevOps MCP Server

  • src
  • encoding_devops
import webbrowser from urllib.parse import quote from mcp.server.fastmcp import Context from encoding_devops.mcp_instance import AppContext, mcp @mcp.tool() async def get_job_by_name(name: str, ctx: Context) -> str: """Get details of an encoding job by its name""" app_ctx: AppContext = ctx.request_context.lifespan_context job_data = await app_ctx.client.get_job_by_name(name) return job_data @mcp.tool() async def get_job_tasks_by_id(job_id: str, ctx: Context) -> str: """Get tasks for a specific job by its ID""" app_ctx: AppContext = ctx.request_context.lifespan_context tasks_data = await app_ctx.client.get_job_tasks_by_id(job_id) return tasks_data @mcp.tool() async def get_clients(ctx: Context) -> str: """Get list of all clients""" app_ctx: AppContext = ctx.request_context.lifespan_context clients_data = await app_ctx.client.get_clients() return clients_data @mcp.tool() async def is_cluster_busy(ctx: Context) -> str: """Check if the encoding cluster is busy (has jobs in progress)""" app_ctx: AppContext = ctx.request_context.lifespan_context jobs_count = await app_ctx.client.get_inprogress_jobs_count() is_busy = jobs_count > 0 return {"is_busy": is_busy, "jobs_count": jobs_count, "status": "busy" if is_busy else "not busy"} @mcp.tool() async def get_latest_jobs(limit: int, ctx: Context) -> str: """ Get the most recent encoding jobs Args: limit: Number of jobs to return (default: 3, max: 10) Returns: String representation of the latest jobs """ if not 1 <= limit <= 10: return "Error: Limit must be between 1 and 10 jobs" app_ctx: AppContext = ctx.request_context.lifespan_context jobs_data = await app_ctx.client.get_latest_jobs(limit) return {"count": len(jobs_data) if jobs_data else 0, "limit": limit, "jobs": jobs_data if jobs_data else []} @mcp.tool() async def search_movie(title: str, ctx: Context) -> str: """ Search for a movie by title Args: title: Movie title to search for Returns: String representation of the search results """ app_ctx: AppContext = ctx.request_context.lifespan_context results = await app_ctx.omdb_client.search_movie(title) return {"query": title, "total_results": int(results.get("totalResults", 0)), "movies": results.get("Search", [])} @mcp.tool() async def get_movie_details(imdb_id: str, ctx: Context) -> str: """ Get detailed information about a movie by its IMDB ID Args: imdb_id: IMDB ID of the movie (e.g., 'tt0111161') Returns: String representation of the movie details """ app_ctx: AppContext = ctx.request_context.lifespan_context movie_data = await app_ctx.omdb_client.get_movie_details(imdb_id) if not movie_data: return {"error": f"No movie found with IMDB ID: {imdb_id}"} return movie_data @mcp.tool() async def open_email( body: str, to: str = "", subject: str = "", cc: str = "", bcc: str = "", ctx: Context = None ) -> str: """ Opens the default email client with your message ready to go Args: body: What you want to say in the email to: Who to send it to (optional) subject: Email subject line (optional) cc: CC recipients (optional) bcc: BCC recipients (optional) ctx: MCP context Returns: str: Quick status message so you know it worked """ # URL encode everything (handles special characters and spaces) params = [] # Start with the body since it's required params.append(f"body={quote(body)}") # Add any optional fields that were provided if subject: params.append(f"subject={quote(subject)}") if cc: params.append(f"cc={quote(cc)}") if bcc: params.append(f"bcc={quote(bcc)}") # Build the mailto URL - if we have a recipient, include it in the base URL mailto_base = f"mailto:{quote(to)}?" if to else "mailto:?" mailto_url = mailto_base + "&".join(params) # Fire up the email client webbrowser.open(mailto_url, new=1) return "Ready to go! Your email client should be open with the template loaded."