Skip to main content
Glama

get_transcript

Extract text transcripts from YouTube videos to access video content in written form.

Instructions

Get the transcript of a YouTube video

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
urlYes

Implementation Reference

  • The main handler function for the 'get_transcript' MCP tool. It takes a YouTube URL, uses helper functions to process the video and fetch the transcript from the external API, handles errors, and returns the transcript text.
    @mcp_server.tool(name="get_transcript", description="Get the transcript of a YouTube video")
    async def get_transcript(url: str) -> str:
        """Get the transcript of a YouTube video.
    
        This tool processes a video and retrieves its transcript. It can efficiently
        handle YouTube URLs by extracting the video ID and checking if it's already
        been processed before submitting a new request.
    
        Args:
            url: The YouTube video URL
            
        Returns:
            The video transcript as text
        """
        logger.info(f"Getting transcript for URL: {url}")
        
        # Process the video to ensure it's ready
        success, video_id, error_message = await process_video(url)
        
        if not success:
            logger.error(f"Failed to process video: {error_message}")
            return f"Error: {error_message}"
        
        # Get the transcript from the API
        transcript_response = await make_yt_api_request(f"/api/videos/{video_id}/transcript")
        
        if not transcript_response:
            error_msg = "Failed to retrieve transcript."
            logger.error(error_msg)
            return f"Error: {error_msg}"
        
        # Check if the response is a string or a JSON object
        if isinstance(transcript_response, str):
            return transcript_response
        elif isinstance(transcript_response, dict) and "transcript" in transcript_response:
            return transcript_response["transcript"]["text"]
        else:
            error_msg = "Unexpected response format from API."
            logger.error(error_msg)
            return f"Error: {error_msg}"
  • Critical helper function called by get_transcript to handle video processing: extracts YouTube ID, checks existing status, submits if needed, and polls until completed or error.
    async def process_video(url: str) -> tuple[bool, str, str]:
        """Helper function to submit a video for processing and wait for completion.
        
        This function now tries to optimize API calls by:
        1. Extracting YouTube ID from URL when possible
        2. Checking if video is already processed using YouTube ID directly
        3. Only submitting for processing if needed
        
        Args:
            url: The YouTube video URL
            
        Returns:
            A tuple of (success, video_id, error_message)
        """
        try:
            # Step 1: Try to extract YouTube ID from URL
            youtube_id = extract_youtube_id(url)
            video_id = ""
            
            if youtube_id:
                logger.info(f"Extracted YouTube ID: {youtube_id} from URL: {url}")
                
                # Step 2: Check if video has already been processed using YouTube ID directly
                status_response = await make_yt_api_request(f"/api/videos/{youtube_id}")
                
                if status_response and "status" in status_response:
                    video_id = youtube_id
                    logger.info(f"Found existing video with YouTube ID: {youtube_id}, status: {status_response.get('status')}")
                    
                    # If video is already processed or processing, we can use this ID
                    if status_response.get("status") == "completed":
                        logger.info(f"Video already processed, using YouTube ID: {youtube_id}")
                        return True, youtube_id, ""
                    elif status_response.get("status") == "processing":
                        # Need to wait for processing to complete
                        logger.info(f"Video already processing, waiting for completion: {youtube_id}")
                        # Continue to polling step below with the YouTube ID
                        video_id = youtube_id
                    elif status_response.get("status") == "error":
                        error_message = status_response.get("message", "Unknown error occurred")
                        logger.error(f"Error with video: {error_message}")
                        return False, youtube_id, f"Error processing video: {error_message}"
            
            # Step 3: Submit video for processing if needed (if we don't have a video_id yet)
            if not video_id:
                logger.info(f"Submitting video for processing: {url}")
                
                submit_response = await make_yt_api_request("/api/videos", method="POST", json_data={"url": url})
                
                if not submit_response or "id" not in submit_response:
                    logger.error("Failed to submit video for processing")
                    return False, "", "Failed to submit video for processing."
                
                video_id = submit_response["id"]
                logger.info(f"Video submitted, received ID: {video_id}")
                await asyncio.sleep(1) # wait for 1 second before polling
            
            # Step 4: Poll for video processing status until it's complete
            max_attempts = 10
            attempts = 0
            
            while attempts < max_attempts:
                logger.info(f"Checking video status, attempt {attempts+1}/{max_attempts}")
                
                status_response = await make_yt_api_request(f"/api/videos/{video_id}")
                
                if not status_response:
                    logger.error("Failed to retrieve video status")
                    return False, video_id, "Failed to retrieve video status."
                
                status = status_response.get("status")
                logger.info(f"Video status: {status}")
                    
                if status == "completed":
                    logger.info(f"Video processing completed for ID: {video_id}")
                    return True, video_id, ""
                    
                if status == "error":
                    error_message = status_response.get("message", "Unknown error occurred")
                    logger.error(f"Error processing video: {error_message}")
                    return False, video_id, f"Error processing video: {error_message}"
                
                # Calculate backoff delay
                delay = await calculate_backoff_delay(attempts)
                logger.info(f"Waiting {delay:.1f}s before checking video status again, attempt {attempts+1}/{max_attempts}")
                
                await asyncio.sleep(delay)
                attempts += 1
            
            logger.error("Video processing timeout - too many attempts")
            return False, video_id, "Video processing timed out. Please try again later."
            
        except Exception as e:
            logger.error(f"Exception during video processing: {str(e)}")
            return False, "", f"An error occurred: {str(e)}"
  • Helper function used by get_transcript and process_video to make HTTP requests to the YouTube Translate API, handling GET/POST, errors, and special cases like subtitles.
    async def make_yt_api_request(endpoint: str, method: str = "GET", params: dict = None, json_data: dict = None) -> dict[str, Any] | str | None:
        """Make a request to the YouTube Translate API with proper error handling."""
        headers = {
            "X-API-Key": YOUTUBE_TRANSLATE_API_KEY,
            "Content-Type": "application/json"
        }
        
        url = f"{YT_TRANSLATE_API_BASE}{endpoint}"
        
        logger.info(f"Making API request: {method} {url}")
        if params:
            logger.info(f"Request params: {params}")
        if json_data:
            logger.info(f"Request data: {json_data}")
        
        async with httpx.AsyncClient() as client:
            try:
                if method.upper() == "GET":
                    response = await client.get(url, headers=headers, params=params, timeout=30.0)
                elif method.upper() == "POST":
                    response = await client.post(url, headers=headers, params=params, json=json_data, timeout=30.0)
                else:
                    logger.error(f"ERROR: Invalid HTTP method: {method}")
                    return None
                    
                response.raise_for_status()
                
                logger.info(f"API response status: {response.status_code}")
                
                # If the endpoint is for subtitles, directly return the text content
                if "/subtitles" in endpoint:
                    return response.text
                
                # For all other endpoints, return the JSON response
                return response.json()
            except Exception as e:
                logger.error(f"API request error: {str(e)}")
                return None
  • Decorator that registers the get_transcript function as an MCP tool with the specified name and description.
    @mcp_server.tool(name="get_transcript", description="Get the transcript of a YouTube video")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brianshin22/youtube-translate-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server