Skip to main content
Glama

get_translation

Translate YouTube video transcripts into different languages to make content accessible across language barriers.

Instructions

Get a translated transcript of a YouTube video

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
urlYes
languageYes

Implementation Reference

  • The main execution logic for the 'get_translation' tool. This async function takes a YouTube URL and target language, processes the video if necessary, checks for existing translations, requests a new translation if needed, polls for completion using backoff, and returns the translated transcript text.
    @mcp_server.tool(name="get_translation", description="Get a translated transcript of a YouTube video")
    async def get_translation(url: str, language: str) -> str:
        """Get a translated transcript of a YouTube video.
    
        This tool processes a video and translates its transcript to the specified language.
        It optimizes API calls by first checking if the translation already exists before
        processing the video.
    
        Args:
            url: The YouTube video URL
            language: Target language code (e.g., "en", "fr", "es")
            
        Returns:
            The translated transcript as text
        """
        logger.info(f"Getting translation for URL: {url} to language: {language}")
        
        # Process the video to ensure it's ready
        success, video_id, error_message = await process_video(url)
        
        if not success:
            logger.error(f"Failed to process video: {error_message}")
            return f"Error: {error_message}"
        
        # First try to get the transcript to make sure it exists
        logger.info(f"Retrieving transcript before requesting translation")
        transcript_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript")
        
        if not transcript_response:
            error_msg = "Failed to retrieve transcript. Cannot translate without a transcript."
            logger.error(error_msg)
            return f"Error: {error_msg}"
        
        # Try to fetch the translated transcript directly first (it might already exist)
        logger.info(f"Checking if translation for language {language} already exists")
        translation_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript/{language}")
        
        # If we got a valid response, return it
        if translation_response and isinstance(translation_response, dict) and "status" in translation_response and translation_response["status"] == "success":
            logger.info(f"Found existing translation for language {language}")
            if "data" in translation_response and "text" in translation_response["data"]:
                return translation_response["data"]["text"]
            elif "data" in translation_response:
                # Return whatever data we have
                return json.dumps(translation_response["data"], indent=2)
        
        # If we don't have a translation yet, request one
        logger.info(f"Requesting new translation for language {language}")
        request_translation = await make_yt_api_request(
            f"/api/videos/{video_id}/translate", 
            method="POST", 
            json_data={"language": language}
        )
        
        if not request_translation:
            error_msg = f"Failed to request translation for language: {language}"
            logger.error(error_msg)
            return f"Error: {error_msg}"
        
        # Check the status of the translation request
        translation_status = None
        max_attempts = 10
        attempts = 0
        
        while attempts < max_attempts:
            logger.info(f"Checking translation status, attempt {attempts+1}/{max_attempts}")
            
            status_response = await make_yt_api_request(
                f"/api/videos/{video_id}/translate/{language}/status"
            )
            
            if not status_response:
                logger.error("Failed to retrieve translation status")
                return "Error: Failed to retrieve translation status."
            
            if isinstance(status_response, dict) and "status" in status_response and "data" in status_response:
                translation_status = status_response["data"].get("status")
                logger.info(f"Translation status: {translation_status}")
                
                if translation_status == "completed":
                    logger.info(f"Translation completed for language: {language}")
                    break
                    
                if translation_status == "error":
                    error_message = status_response.get("message", "Unknown error occurred")
                    logger.error(f"Error translating: {error_message}")
                    return f"Error: Translation failed: {error_message}"
            
            # Calculate backoff delay
            delay = await calculate_backoff_delay(attempts)
            logger.info(f"Waiting {delay:.1f}s before checking translation status again")
            
            await asyncio.sleep(delay)
            attempts += 1
        
        if attempts >= max_attempts and translation_status != "completed":
            logger.error("Translation timed out - too many attempts")
            return "Error: Translation timed out. Please try again later."
        
        # Get the translated transcript
        logger.info(f"Retrieving translated transcript for language: {language}")
        translated_transcript = await make_yt_api_request(f"/api/videos/{video_id}/transcript/{language}")
        
        if not translated_transcript:
            error_msg = f"Failed to retrieve translated transcript for language: {language}"
            logger.error(error_msg)
            return f"Error: {error_msg}"
        
        # Format the response based on what we received
        if isinstance(translated_transcript, str):
            return translated_transcript
        elif isinstance(translated_transcript, dict):
            if "data" in translated_transcript and "text" in translated_transcript["data"]:
                return translated_transcript["data"]["text"]
            elif "text" in translated_transcript:
                return translated_transcript["text"]
            else:
                # Return the complete JSON response if we can't extract specific fields
                return json.dumps(translated_transcript, indent=2)
        else:
            error_msg = "Unexpected response format from API."
            logger.error(error_msg)
            return f"Error: {error_msg}"
  • The MCP tool registration decorator that registers the get_translation function with name 'get_translation' and its description.
    @mcp_server.tool(name="get_translation", description="Get a translated transcript of a YouTube video")
  • Key helper function that processes the YouTube video: extracts ID, checks status, submits if needed, and polls until ready. Called first in the handler.
    async def process_video(url: str) -> tuple[bool, str, str]:
        """Helper function to submit a video for processing and wait for completion.
        
        This function now tries to optimize API calls by:
        1. Extracting YouTube ID from URL when possible
        2. Checking if video is already processed using YouTube ID directly
        3. Only submitting for processing if needed
        
        Args:
            url: The YouTube video URL
            
        Returns:
            A tuple of (success, video_id, error_message)
        """
        try:
            # Step 1: Try to extract YouTube ID from URL
            youtube_id = extract_youtube_id(url)
            video_id = ""
            
            if youtube_id:
                logger.info(f"Extracted YouTube ID: {youtube_id} from URL: {url}")
                
                # Step 2: Check if video has already been processed using YouTube ID directly
                status_response = await make_yt_api_request(f"/api/videos/{youtube_id}")
                
                if status_response and "status" in status_response:
                    video_id = youtube_id
                    logger.info(f"Found existing video with YouTube ID: {youtube_id}, status: {status_response.get('status')}")
                    
                    # If video is already processed or processing, we can use this ID
                    if status_response.get("status") == "completed":
                        logger.info(f"Video already processed, using YouTube ID: {youtube_id}")
                        return True, youtube_id, ""
                    elif status_response.get("status") == "processing":
                        # Need to wait for processing to complete
                        logger.info(f"Video already processing, waiting for completion: {youtube_id}")
                        # Continue to polling step below with the YouTube ID
                        video_id = youtube_id
                    elif status_response.get("status") == "error":
                        error_message = status_response.get("message", "Unknown error occurred")
                        logger.error(f"Error with video: {error_message}")
                        return False, youtube_id, f"Error processing video: {error_message}"
            
            # Step 3: Submit video for processing if needed (if we don't have a video_id yet)
            if not video_id:
                logger.info(f"Submitting video for processing: {url}")
                
                submit_response = await make_yt_api_request("/api/videos", method="POST", json_data={"url": url})
                
                if not submit_response or "id" not in submit_response:
                    logger.error("Failed to submit video for processing")
                    return False, "", "Failed to submit video for processing."
                
                video_id = submit_response["id"]
                logger.info(f"Video submitted, received ID: {video_id}")
                await asyncio.sleep(1) # wait for 1 second before polling
            
            # Step 4: Poll for video processing status until it's complete
            max_attempts = 10
            attempts = 0
            
            while attempts < max_attempts:
                logger.info(f"Checking video status, attempt {attempts+1}/{max_attempts}")
                
                status_response = await make_yt_api_request(f"/api/videos/{video_id}")
                
                if not status_response:
                    logger.error("Failed to retrieve video status")
                    return False, video_id, "Failed to retrieve video status."
                
                status = status_response.get("status")
                logger.info(f"Video status: {status}")
                    
                if status == "completed":
                    logger.info(f"Video processing completed for ID: {video_id}")
                    return True, video_id, ""
                    
                if status == "error":
                    error_message = status_response.get("message", "Unknown error occurred")
                    logger.error(f"Error processing video: {error_message}")
                    return False, video_id, f"Error processing video: {error_message}"
                
                # Calculate backoff delay
                delay = await calculate_backoff_delay(attempts)
                logger.info(f"Waiting {delay:.1f}s before checking video status again, attempt {attempts+1}/{max_attempts}")
                
                await asyncio.sleep(delay)
                attempts += 1
            
            logger.error("Video processing timeout - too many attempts")
            return False, video_id, "Video processing timed out. Please try again later."
            
        except Exception as e:
            logger.error(f"Exception during video processing: {str(e)}")
            return False, "", f"An error occurred: {str(e)}"
  • Helper to make authenticated API requests to the YouTube Translate service, handling GET/POST and responses.
    async def make_yt_api_request(endpoint: str, method: str = "GET", params: dict = None, json_data: dict = None) -> dict[str, Any] | str | None:
        """Make a request to the YouTube Translate API with proper error handling."""
        headers = {
            "X-API-Key": YOUTUBE_TRANSLATE_API_KEY,
            "Content-Type": "application/json"
        }
        
        url = f"{YT_TRANSLATE_API_BASE}{endpoint}"
        
        logger.info(f"Making API request: {method} {url}")
        if params:
            logger.info(f"Request params: {params}")
        if json_data:
            logger.info(f"Request data: {json_data}")
        
        async with httpx.AsyncClient() as client:
            try:
                if method.upper() == "GET":
                    response = await client.get(url, headers=headers, params=params, timeout=30.0)
                elif method.upper() == "POST":
                    response = await client.post(url, headers=headers, params=params, json=json_data, timeout=30.0)
                else:
                    logger.error(f"ERROR: Invalid HTTP method: {method}")
                    return None
                    
                response.raise_for_status()
                
                logger.info(f"API response status: {response.status_code}")
                
                # If the endpoint is for subtitles, directly return the text content
                if "/subtitles" in endpoint:
                    return response.text
                
                # For all other endpoints, return the JSON response
                return response.json()
            except Exception as e:
                logger.error(f"API request error: {str(e)}")
                return None
  • Calculates progressive backoff delays for polling loops.
    async def calculate_backoff_delay(attempt: int, base_delay: float = 1.0, multiplier: float = 1.5, max_delay: float = 20.0) -> float:
        """Calculate a progressive backoff delay.
        
        Args:
            attempt: The current attempt number (0-based)
            base_delay: The initial delay in seconds
            multiplier: How much to increase the delay each time
            max_delay: Maximum delay in seconds
            
        Returns:
            The delay in seconds for the current attempt
        """
        delay = min(base_delay * (multiplier ** attempt), max_delay)
        return delay

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brianshin22/youtube-translate-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server