Skip to main content
Glama

YouTube Translate MCP

by brianshin22
server.py23.7 kB
import asyncio import httpx import json import re import os import time import logging import argparse from typing import Any, Dict, List, Optional, Tuple from mcp.server.fastmcp import FastMCP, Context from mcp.server.fastmcp.resources import TextResource, ResourceTemplate from mcp.types import Resource as MCPResource, ResourceTemplate as MCPResourceTemplate # Initialize FastMCP server mcp_server = FastMCP("youtube-translate") # Set up basic logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('youtube-translate-mcp') # Global constants YT_TRANSLATE_API_BASE = "https://api.youtubetranslate.com" YOUTUBE_TRANSLATE_API_KEY = os.environ.get("YOUTUBE_TRANSLATE_API_KEY", "") if not YOUTUBE_TRANSLATE_API_KEY: logger.warning("YOUTUBE_TRANSLATE_API_KEY environment variable not set!") # Add a health check endpoint for SSE transport mode async def add_health_check(app): @app.get("/health") async def health_check(): from fastapi import Response return Response(content=json.dumps({"status": "ok"}), media_type="application/json") # Register this function to be called when the FastAPI app is created try: # Try the newer method first mcp_server.on_startup(add_health_check) except AttributeError: try: # Fall back to the older method mcp_server.on_app_init(add_health_check) except AttributeError: logger.warning("Could not register health check endpoint - MCP version incompatibility") # Constants USER_AGENT = "YouTubeTranslateMCP/1.0" # Regular expression to extract YouTube video ID from various URL formats YOUTUBE_ID_REGEX = r'(?:youtube\.com\/(?:[^\/\n\s]+\/\S+\/|(?:v|e(?:mbed)?)\/|\S*?[?&]v=)|youtu\.be\/)([a-zA-Z0-9_-]{11})' def extract_youtube_id(url: str) -> str: """Extract YouTube video ID from a URL. Args: url: YouTube URL Returns: YouTube video ID or empty string if no match """ match = re.search(YOUTUBE_ID_REGEX, url) if match: return match.group(1) return "" async def make_yt_api_request(endpoint: str, method: str = "GET", params: dict = None, json_data: dict = None) -> dict[str, Any] | str | None: """Make a request to the YouTube Translate API with proper error handling.""" headers = { "X-API-Key": YOUTUBE_TRANSLATE_API_KEY, "Content-Type": "application/json" } url = f"{YT_TRANSLATE_API_BASE}{endpoint}" logger.info(f"Making API request: {method} {url}") if params: logger.info(f"Request params: {params}") if json_data: logger.info(f"Request data: {json_data}") async with httpx.AsyncClient() as client: try: if method.upper() == "GET": response = await client.get(url, headers=headers, params=params, timeout=30.0) elif method.upper() == "POST": response = await client.post(url, headers=headers, params=params, json=json_data, timeout=30.0) else: logger.error(f"ERROR: Invalid HTTP method: {method}") return None response.raise_for_status() logger.info(f"API response status: {response.status_code}") # If the endpoint is for subtitles, directly return the text content if "/subtitles" in endpoint: return response.text # For all other endpoints, return the JSON response return response.json() except Exception as e: logger.error(f"API request error: {str(e)}") return None async def process_video(url: str) -> tuple[bool, str, str]: """Helper function to submit a video for processing and wait for completion. This function now tries to optimize API calls by: 1. Extracting YouTube ID from URL when possible 2. Checking if video is already processed using YouTube ID directly 3. Only submitting for processing if needed Args: url: The YouTube video URL Returns: A tuple of (success, video_id, error_message) """ try: # Step 1: Try to extract YouTube ID from URL youtube_id = extract_youtube_id(url) video_id = "" if youtube_id: logger.info(f"Extracted YouTube ID: {youtube_id} from URL: {url}") # Step 2: Check if video has already been processed using YouTube ID directly status_response = await make_yt_api_request(f"/api/videos/{youtube_id}") if status_response and "status" in status_response: video_id = youtube_id logger.info(f"Found existing video with YouTube ID: {youtube_id}, status: {status_response.get('status')}") # If video is already processed or processing, we can use this ID if status_response.get("status") == "completed": logger.info(f"Video already processed, using YouTube ID: {youtube_id}") return True, youtube_id, "" elif status_response.get("status") == "processing": # Need to wait for processing to complete logger.info(f"Video already processing, waiting for completion: {youtube_id}") # Continue to polling step below with the YouTube ID video_id = youtube_id elif status_response.get("status") == "error": error_message = status_response.get("message", "Unknown error occurred") logger.error(f"Error with video: {error_message}") return False, youtube_id, f"Error processing video: {error_message}" # Step 3: Submit video for processing if needed (if we don't have a video_id yet) if not video_id: logger.info(f"Submitting video for processing: {url}") submit_response = await make_yt_api_request("/api/videos", method="POST", json_data={"url": url}) if not submit_response or "id" not in submit_response: logger.error("Failed to submit video for processing") return False, "", "Failed to submit video for processing." video_id = submit_response["id"] logger.info(f"Video submitted, received ID: {video_id}") await asyncio.sleep(1) # wait for 1 second before polling # Step 4: Poll for video processing status until it's complete max_attempts = 10 attempts = 0 while attempts < max_attempts: logger.info(f"Checking video status, attempt {attempts+1}/{max_attempts}") status_response = await make_yt_api_request(f"/api/videos/{video_id}") if not status_response: logger.error("Failed to retrieve video status") return False, video_id, "Failed to retrieve video status." status = status_response.get("status") logger.info(f"Video status: {status}") if status == "completed": logger.info(f"Video processing completed for ID: {video_id}") return True, video_id, "" if status == "error": error_message = status_response.get("message", "Unknown error occurred") logger.error(f"Error processing video: {error_message}") return False, video_id, f"Error processing video: {error_message}" # Calculate backoff delay delay = await calculate_backoff_delay(attempts) logger.info(f"Waiting {delay:.1f}s before checking video status again, attempt {attempts+1}/{max_attempts}") await asyncio.sleep(delay) attempts += 1 logger.error("Video processing timeout - too many attempts") return False, video_id, "Video processing timed out. Please try again later." except Exception as e: logger.error(f"Exception during video processing: {str(e)}") return False, "", f"An error occurred: {str(e)}" async def calculate_backoff_delay(attempt: int, base_delay: float = 1.0, multiplier: float = 1.5, max_delay: float = 20.0) -> float: """Calculate a progressive backoff delay. Args: attempt: The current attempt number (0-based) base_delay: The initial delay in seconds multiplier: How much to increase the delay each time max_delay: Maximum delay in seconds Returns: The delay in seconds for the current attempt """ delay = min(base_delay * (multiplier ** attempt), max_delay) return delay # MCP Tool Functions @mcp_server.tool(name="get_transcript", description="Get the transcript of a YouTube video") async def get_transcript(url: str) -> str: """Get the transcript of a YouTube video. This tool processes a video and retrieves its transcript. It can efficiently handle YouTube URLs by extracting the video ID and checking if it's already been processed before submitting a new request. Args: url: The YouTube video URL Returns: The video transcript as text """ logger.info(f"Getting transcript for URL: {url}") # Process the video to ensure it's ready success, video_id, error_message = await process_video(url) if not success: logger.error(f"Failed to process video: {error_message}") return f"Error: {error_message}" # Get the transcript from the API transcript_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript") if not transcript_response: error_msg = "Failed to retrieve transcript." logger.error(error_msg) return f"Error: {error_msg}" # Check if the response is a string or a JSON object if isinstance(transcript_response, str): return transcript_response elif isinstance(transcript_response, dict) and "transcript" in transcript_response: return transcript_response["transcript"]["text"] else: error_msg = "Unexpected response format from API." logger.error(error_msg) return f"Error: {error_msg}" @mcp_server.tool(name="get_translation", description="Get a translated transcript of a YouTube video") async def get_translation(url: str, language: str) -> str: """Get a translated transcript of a YouTube video. This tool processes a video and translates its transcript to the specified language. It optimizes API calls by first checking if the translation already exists before processing the video. Args: url: The YouTube video URL language: Target language code (e.g., "en", "fr", "es") Returns: The translated transcript as text """ logger.info(f"Getting translation for URL: {url} to language: {language}") # Process the video to ensure it's ready success, video_id, error_message = await process_video(url) if not success: logger.error(f"Failed to process video: {error_message}") return f"Error: {error_message}" # First try to get the transcript to make sure it exists logger.info(f"Retrieving transcript before requesting translation") transcript_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript") if not transcript_response: error_msg = "Failed to retrieve transcript. Cannot translate without a transcript." logger.error(error_msg) return f"Error: {error_msg}" # Try to fetch the translated transcript directly first (it might already exist) logger.info(f"Checking if translation for language {language} already exists") translation_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript/{language}") # If we got a valid response, return it if translation_response and isinstance(translation_response, dict) and "status" in translation_response and translation_response["status"] == "success": logger.info(f"Found existing translation for language {language}") if "data" in translation_response and "text" in translation_response["data"]: return translation_response["data"]["text"] elif "data" in translation_response: # Return whatever data we have return json.dumps(translation_response["data"], indent=2) # If we don't have a translation yet, request one logger.info(f"Requesting new translation for language {language}") request_translation = await make_yt_api_request( f"/api/videos/{video_id}/translate", method="POST", json_data={"language": language} ) if not request_translation: error_msg = f"Failed to request translation for language: {language}" logger.error(error_msg) return f"Error: {error_msg}" # Check the status of the translation request translation_status = None max_attempts = 10 attempts = 0 while attempts < max_attempts: logger.info(f"Checking translation status, attempt {attempts+1}/{max_attempts}") status_response = await make_yt_api_request( f"/api/videos/{video_id}/translate/{language}/status" ) if not status_response: logger.error("Failed to retrieve translation status") return "Error: Failed to retrieve translation status." if isinstance(status_response, dict) and "status" in status_response and "data" in status_response: translation_status = status_response["data"].get("status") logger.info(f"Translation status: {translation_status}") if translation_status == "completed": logger.info(f"Translation completed for language: {language}") break if translation_status == "error": error_message = status_response.get("message", "Unknown error occurred") logger.error(f"Error translating: {error_message}") return f"Error: Translation failed: {error_message}" # Calculate backoff delay delay = await calculate_backoff_delay(attempts) logger.info(f"Waiting {delay:.1f}s before checking translation status again") await asyncio.sleep(delay) attempts += 1 if attempts >= max_attempts and translation_status != "completed": logger.error("Translation timed out - too many attempts") return "Error: Translation timed out. Please try again later." # Get the translated transcript logger.info(f"Retrieving translated transcript for language: {language}") translated_transcript = await make_yt_api_request(f"/api/videos/{video_id}/transcript/{language}") if not translated_transcript: error_msg = f"Failed to retrieve translated transcript for language: {language}" logger.error(error_msg) return f"Error: {error_msg}" # Format the response based on what we received if isinstance(translated_transcript, str): return translated_transcript elif isinstance(translated_transcript, dict): if "data" in translated_transcript and "text" in translated_transcript["data"]: return translated_transcript["data"]["text"] elif "text" in translated_transcript: return translated_transcript["text"] else: # Return the complete JSON response if we can't extract specific fields return json.dumps(translated_transcript, indent=2) else: error_msg = "Unexpected response format from API." logger.error(error_msg) return f"Error: {error_msg}" @mcp_server.tool(name="get_subtitles", description="Generate subtitle files for a YouTube video") async def get_subtitles(url: str, language: str = "en", format: str = "srt") -> str: """Generate subtitle files for a YouTube video. This tool processes a video and generates subtitle files in the specified format and language. It first checks if the subtitles already exist before processing the video to optimize performance. If the requested language is not available, it automatically requests a translation first. Args: url: The YouTube video URL language: Language code for subtitles (e.g., "en", "fr", "es") format: Subtitle format, either "srt" or "vtt" (default: "srt") Returns: The subtitles content as text """ logger.info(f"Getting subtitles for URL: {url}, language: {language}, format: {format}") # Validate format if format not in ["srt", "vtt"]: error_msg = f"Invalid format: {format}. Must be 'srt' or 'vtt'." logger.error(error_msg) return f"Error: {error_msg}" # Process the video to ensure it's ready success, video_id, error_message = await process_video(url) if not success: logger.error(f"Failed to process video: {error_message}") return f"Error: {error_message}" # Get the subtitles from the API subtitles_response = await make_yt_api_request( f"/api/videos/{video_id}/subtitles", params={"language": language, "format": format} ) if not subtitles_response: error_msg = f"Failed to retrieve subtitles for language: {language}, format: {format}" logger.error(error_msg) return f"Error: {error_msg}" # The response could be the subtitles as text or a JSON object with an error if isinstance(subtitles_response, dict) and "error" in subtitles_response: error_msg = subtitles_response["error"] logger.error(f"API error: {error_msg}") return f"Error: {error_msg}" return subtitles_response @mcp_server.tool(name="get_summary", description="Generate a summary of a YouTube video") async def get_summary(url: str, language: str = "en", length: str = "medium") -> str: """Generate a summary of a YouTube video. This tool processes a video and generates a summary of its content in the specified language. It properly handles "processing" states by polling until completion rather than failing immediately. If the requested language is not available, it automatically requests a translation first. Args: url: The YouTube video URL language: Language code for the summary (e.g., "en", "fr") length: Length of the summary ("short", "medium", or "long") Returns: A summary of the video content """ logger.info(f"Getting summary for URL: {url}, language: {language}, length: {length}") # Validate length if length not in ["short", "medium", "long"]: error_msg = f"Invalid length: {length}. Must be 'short', 'medium', or 'long'." logger.error(error_msg) return f"Error: {error_msg}" # Process the video to ensure it's ready success, video_id, error_message = await process_video(url) if not success: logger.error(f"Failed to process video: {error_message}") return f"Error: {error_message}" # Get the summary from the API summary_response = await make_yt_api_request( f"/api/videos/{video_id}/summary", params={"language": language, "length": length} ) if not summary_response: error_msg = f"Failed to retrieve summary for language: {language}, length: {length}" logger.error(error_msg) return f"Error: {error_msg}" # Check if the response is a JSON object with the summary if isinstance(summary_response, dict) and "summary" in summary_response: return summary_response["summary"] elif isinstance(summary_response, dict) and "error" in summary_response: error_msg = summary_response["error"] logger.error(f"API error: {error_msg}") return f"Error: {error_msg}" else: error_msg = "Unexpected response format from API." logger.error(error_msg) return f"Error: {error_msg}" @mcp_server.tool(name="search_video", description="Search for specific content within a YouTube video's transcript") async def search_video(url: str, query: str) -> str: """Search for specific content within a YouTube video's transcript. This tool processes a video and searches for specific terms or phrases within its transcript. It properly handles "processing" states by polling until completion rather than failing immediately. Args: url: The YouTube video URL query: The search term or phrase to look for Returns: Search results with context from the video transcript """ logger.info(f"Searching video for URL: {url}, query: {query}") # Process the video to ensure it's ready success, video_id, error_message = await process_video(url) if not success: logger.error(f"Failed to process video: {error_message}") return f"Error: {error_message}" # Search the video transcript search_response = await make_yt_api_request( f"/api/videos/{video_id}/search", params={"query": query} ) if not search_response: error_msg = f"Failed to search video for query: {query}" logger.error(error_msg) return f"Error: {error_msg}" # Format the search results if isinstance(search_response, dict) and "matches" in search_response: matches = search_response["matches"] if not matches: return f"No matches found for '{query}' in the video." result = f"Found {len(matches)} matches for '{query}':\n\n" for i, match in enumerate(matches, 1): timestamp = match.get("timestamp", "Unknown") text = match.get("text", "") timestamp_str = f"{int(timestamp) // 60}:{int(timestamp) % 60:02d}" if isinstance(timestamp, (int, float)) else timestamp result += f"{i}. [{timestamp_str}] {text}\n\n" return result elif isinstance(search_response, dict) and "error" in search_response: error_msg = search_response["error"] logger.error(f"API error: {error_msg}") return f"Error: {error_msg}" else: error_msg = "Unexpected response format from API." logger.error(error_msg) return f"Error: {error_msg}" # Main function to run the server async def main(): parser = argparse.ArgumentParser(description="YouTube Translate MCP Server") parser.add_argument("--transport", default="stdio", choices=["stdio", "sse"], help="Transport protocol to use (stdio or sse)") parser.add_argument("--port", type=int, default=8000, help="Port to use for SSE transport (default: 8000)") parser.add_argument("--host", default="0.0.0.0", help="Host to bind for SSE transport (default: 0.0.0.0)") args = parser.parse_args() logger.info(f"Starting YouTube Translate MCP server with {args.transport} transport") if args.transport == "stdio": await mcp_server.run_stdio_async() else: # SSE logger.info(f"Listening on {args.host}:{args.port}") await mcp_server.run_sse_async() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brianshin22/youtube-translate-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server