Skip to main content
Glama
dubbing.py21.5 kB
import logging import os import random import time import requests from mcp.types import TextContent from .base import get_client from .utils import validate_output_directory, create_error_response def download_dubbing_file( dubbing_id: str, output_dir: str = None ) -> TextContent: """ Download audio file from a completed dubbing project Args: dubbing_id: The unique identifier of the dubbing project to download. Required. output_dir: Output directory for the downloaded audio file. Default is user's desktop. Returns: TextContent: Text content containing the path to the downloaded audio file. """ all_voice_lab = get_client() output_dir = all_voice_lab.get_output_path(output_dir) logging.info(f"Tool called: download_dubbing_audio") logging.info(f"Dubbing ID: {dubbing_id}") logging.info(f"Output directory: {output_dir}") # Validate parameters if not dubbing_id: logging.warning("Dubbing ID parameter is empty") return TextContent( type="text", text="dubbing_id parameter cannot be empty" ) # Validate and create output directory is_valid, error_message = validate_output_directory(output_dir) if not is_valid: return create_error_response(error_message) try: logging.info(f"Starting dubbing audio download, dubbing ID: {dubbing_id}") file_path = all_voice_lab.download_dubbing_audio(dubbing_id, output_dir) logging.info(f"Dubbing audio download successful, file saved at: {file_path}") return TextContent( type="text", text=f"Dubbing audio download completed, file saved at: {file_path}\n" ) except Exception as e: logging.error(f"Dubbing audio download failed: {str(e)}") return TextContent( type="text", text=f"Download failed, tool temporarily unavailable" ) def remove_subtitle( video_file_path: str, language_code: str = "auto", name: str = None, output_dir: str = None ) -> TextContent: """ Remove hardcoded subtitles from videos using OCR technology Args: video_file_path: Path to the video file to process. Only MP4 and MOV formats are supported. Maximum file size: 2GB. language_code: Language code for subtitle text detection (e.g., 'en', 'zh'). Set to 'auto' for automatic language detection. Default is 'auto'. name: Optional project name for identification purposes. output_dir: Output directory for the processed video file. Default is user's desktop. Returns: TextContent: Text content containing the processed video file path or error message. If the process takes longer than expected, returns the project ID for later status checking. """ all_voice_lab = get_client() output_dir = all_voice_lab.get_output_path(output_dir) poll_interval = 10 max_retries = 60 logging.info(f"Tool called: subtitle_removal") logging.info(f"Video file path: {video_file_path}") logging.info(f"Language code: {language_code}") logging.info(f"Output directory: {output_dir}") logging.info(f"Poll interval: {poll_interval} seconds") logging.info(f"Max retries: {max_retries}") if name: logging.info(f"Project name: {name}") # Validate parameters if not video_file_path: logging.warning("Video file path parameter is empty") return TextContent( type="text", text="video_file_path parameter cannot be empty" ) # Check if video file exists before processing if not os.path.exists(video_file_path): logging.warning(f"Video file does not exist: {video_file_path}") return TextContent( type="text", text=f"Video file does not exist: {video_file_path}" ) # Check file format, only allow mp4 and mov _, file_extension = os.path.splitext(video_file_path) file_extension = file_extension.lower() if file_extension not in [".mp4", ".mov"]: logging.warning(f"Unsupported video file format: {file_extension}") return TextContent( type="text", text=f"Unsupported video file format. Only MP4 and MOV formats are supported." ) # Check file size, limit to 2GB max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB in bytes file_size = os.path.getsize(video_file_path) if file_size > max_size_bytes: logging.warning(f"Video file size exceeds limit: {file_size} bytes, max allowed: {max_size_bytes} bytes") return TextContent( type="text", text=f"Video file size exceeds the maximum limit of 2GB. Please use a smaller file." ) try: logging.info("Starting subtitle removal process") project_id = all_voice_lab.subtitle_removal( video_file_path=video_file_path, language_code=language_code, name=name ) logging.info(f"Subtitle removal initiated, project ID: {project_id}") # Poll for task completion logging.info(f"Starting to poll for task completion, interval: {poll_interval}s, max retries: {max_retries}") # Initialize variables for polling retry_count = 0 task_completed = False removal_info = None # Poll until task is completed or max retries reached while retry_count < max_retries and not task_completed: try: # Wait for the specified interval time.sleep(poll_interval) # Check task status removal_info = all_voice_lab.get_removal_info(project_id) logging.info(f"Poll attempt {retry_count + 1}, status: {removal_info.status}") # Check if task is completed if removal_info.status.lower() == "success": task_completed = True logging.info("Subtitle removal task completed successfully") break elif removal_info.status.lower() == "failed": logging.error("Subtitle removal task failed") return TextContent( type="text", text=f"Subtitle removal failed. Please try again later." ) # Increment retry count retry_count += 1 except Exception as e: logging.error(f"Error checking task status: {str(e)}") retry_count += 1 # Check if task completed successfully if not task_completed: logging.warning(f"Subtitle removal task did not complete within {max_retries} attempts") return TextContent( type="text", text=f"Subtitle removal is still in progress. Your project ID is: {project_id}. You can check the status later." ) # Download the processed video logging.info("Downloading processed video") try: # Check if output URL is available if not removal_info.removal_result: logging.error("No removal_result URL available in the response") return TextContent( type="text", text=f"Subtitle removal completed but no output file is available. Your project ID is: {project_id}" ) # Prepare HTTP request url = removal_info.removal_result # Set request headers, accept all types of responses headers = all_voice_lab._get_headers(content_type="", accept="*/*") # Send request and get response response = requests.get(url, headers=headers, stream=True) # Check response status response.raise_for_status() # Generate filename based on original file name with suffix original_filename = os.path.basename(video_file_path) name_without_ext, _ = os.path.splitext(original_filename) filename = f"{name_without_ext}_subtitle_removed.mp4" # Build complete file path file_path = os.path.join(output_dir, filename) # Save response content to file with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logging.info(f"Processed video saved to: {file_path}") return TextContent( type="text", text=f"Subtitle removal completed successfully. Processed video saved to: {file_path}" ) except Exception as e: logging.error(f"Failed to download processed video: {str(e)}") return TextContent( type="text", text=f"Subtitle removal completed but failed to download the processed video. Your project ID is: {project_id}" ) except FileNotFoundError as e: logging.error(f"Video file does not exist: {video_file_path}, error: {str(e)}") return TextContent( type="text", text=f"Video file does not exist: {video_file_path}" ) except Exception as e: logging.error(f"Subtitle removal failed: {str(e)}") return TextContent( type="text", text=f"Subtitle removal failed, tool temporarily unavailable" ) def video_translation_dubbing( video_file_path: str, target_lang: str, source_lang: str = "auto", name: str = None, output_dir: str = None ) -> TextContent: """ Translate and dub video speech into a different language with AI-generated voices Args: video_file_path: Path to the video or audio file to process. Supports MP4, MOV, MP3, and WAV formats. Maximum file size: 2GB. target_lang: Target language code for translation (e.g., 'en', 'zh', 'ja', 'fr', 'de', 'ko'). Required. source_lang: Source language code of the original content. Set to 'auto' for automatic language detection. Default is 'auto'. name: Optional project name for identification purposes. output_dir: Output directory for the downloaded result file. Default is user's desktop. Returns: TextContent: Text content containing the dubbing ID and file path to the downloaded result. If the process takes longer than expected, returns only the dubbing ID for later status checking. """ all_voice_lab = get_client() output_dir = all_voice_lab.get_output_path(output_dir) max_polling_time = 600 polling_interval = 10 logging.info(f"Tool called: video_translation_dubbing") logging.info(f"Video file path: {video_file_path}") logging.info(f"Target language: {target_lang}, Source language: {source_lang}") if name: logging.info(f"Project name: {name}") logging.info(f"Output directory: {output_dir}") logging.info(f"Max polling time: {max_polling_time}s, Polling interval: {polling_interval}s") # Validate parameters if not video_file_path: logging.warning("Video file path parameter is empty") return TextContent( type="text", text="video_file_path parameter cannot be empty" ) # Check if video file exists before processing if not os.path.exists(video_file_path): logging.warning(f"Video file does not exist: {video_file_path}") return TextContent( type="text", text=f"Video file does not exist: {video_file_path}" ) # Check file format, only allow mp4 and mov _, file_extension = os.path.splitext(video_file_path) file_extension = file_extension.lower() if file_extension not in [".mp4", ".mov", ".mp3", ".wav"]: logging.warning(f"Unsupported video file format: {file_extension}") return TextContent( type="text", text=f"Unsupported video file format. Only MP4, MOV, MP3 and WAV formats are supported." ) # Check file size, limit to 2GB max_size_bytes = 2 * 1024 * 1024 * 1024 # 2GB in bytes file_size = os.path.getsize(video_file_path) if file_size > max_size_bytes: logging.warning(f"Video file size exceeds limit: {file_size} bytes, max allowed: {max_size_bytes} bytes") return TextContent( type="text", text=f"Video file size exceeds the maximum limit of 2GB. Please use a smaller file." ) # Validate target language if not target_lang: logging.warning(f"target language is empty") return TextContent( type="text", text="target language parameter cannot be empty" ) # Validate output directory if not output_dir: logging.warning("Output directory parameter is empty") return TextContent( type="text", text="output_dir parameter cannot be empty" ) # Try to create output directory if it doesn't exist try: os.makedirs(output_dir, exist_ok=True) except Exception as e: logging.error(f"Failed to create output directory: {output_dir}, error: {str(e)}") return TextContent( type="text", text=f"Failed to create output directory: {output_dir}" ) try: # Submit dubbing request logging.info("Starting video dubbing process") dubbing_id = all_voice_lab.dubbing( video_file_path=video_file_path, target_lang=target_lang, source_lang=source_lang, name=name, watermark=True, drop_background_audio=False ) logging.info(f"Video dubbing request successful, dubbing ID: {dubbing_id}") # Start polling for task completion logging.info(f"Starting to poll dubbing status for ID: {dubbing_id}") start_time = time.time() completed = False file_path = None # Poll until task is completed or timeout while time.time() - start_time < max_polling_time: try: # Get dubbing info dubbing_info = all_voice_lab.get_dubbing_info(dubbing_id) logging.info(f"Dubbing status: {dubbing_info.status} for ID: {dubbing_id}") # Check if dubbing is completed if dubbing_info.status.lower() == "success": logging.info(f"Dubbing completed for ID: {dubbing_id}") completed = True break # Check if dubbing failed elif dubbing_info.status.lower() in ["failed", "error"]: logging.error(f"Dubbing failed for ID: {dubbing_id}") return TextContent( type="text", text=f"Video dubbing failed. Please try again later.\n" f"Dubbing ID: {dubbing_id}\n" ) # Wait for next polling interval logging.info(f"Waiting {polling_interval} seconds before next poll") time.sleep(polling_interval) except Exception as e: logging.error(f"Error polling dubbing status: {str(e)}") time.sleep(polling_interval) # Continue polling despite errors # Check if polling timed out if not completed: logging.warning(f"Polling timed out after {max_polling_time} seconds for dubbing ID: {dubbing_id}") return TextContent( type="text", text=f"Video dubbing is still in progress. Your dubbing ID is: {dubbing_id}\n" f"The process is taking longer than expected. You can check the status later using this ID.\n" ) # Download the file if dubbing completed try: logging.info(f"Downloading dubbing audio for ID: {dubbing_id}") file_path = all_voice_lab.get_dubbing_audio(dubbing_id, output_dir, video_file_path) logging.info(f"Dubbing audio downloaded successfully, file saved at: {file_path}") return TextContent( type="text", text=f"Video dubbing completed successfully!\n" f"Dubbing ID: {dubbing_id}\n" f"File saved at: {file_path}\n" ) except Exception as e: logging.error(f"Failed to download dubbing audio: {str(e)}") return TextContent( type="text", text=f"Video dubbing completed, but failed to download the audio file.\n" f"Dubbing ID: {dubbing_id}\n" f"Error: {str(e)}\n" ) except FileNotFoundError as e: logging.error(f"Video file does not exist: {video_file_path}, error: {str(e)}") return TextContent( type="text", text=f"Video file does not exist: {video_file_path}" ) except Exception as e: logging.error(f"Video dubbing failed: {str(e)}") return TextContent( type="text", text=f"Video dubbing failed, tool temporarily unavailable" ) def get_dubbing_info(dubbing_id: str) -> TextContent: """ Retrieve status and details of a video dubbing task Args: dubbing_id: The unique identifier of the dubbing task to check. This ID is returned from the video_dubbing or video_translation_dubbing tool. Required. Returns: TextContent: Text content containing the status (e.g., "pending", "processing", "success", "failed") and other details of the dubbing task. """ all_voice_lab = get_client() logging.info(f"Tool called: get_dubbing_info") logging.info(f"Dubbing ID: {dubbing_id}") # Validate parameters if not dubbing_id: logging.warning("Dubbing ID parameter is empty") return TextContent( type="text", text="dubbing_id parameter cannot be empty" ) try: logging.info("Getting dubbing task information") dubbing_info = all_voice_lab.get_dubbing_info(dubbing_id) logging.info(f"Dubbing info retrieved successfully for ID: {dubbing_id}") # Format the result buffer = [] buffer.append(f"Dubbing ID: {dubbing_info.dubbing_id}\n") buffer.append(f"Status: {dubbing_info.status}\n") if dubbing_info.name: buffer.append(f"Project Name: {dubbing_info.name}\n") buffer.append( "Note: If the task has not been completed, you may need to explicitly inform the user of the task ID when responding.\n") # Join the list into a string result = "".join(buffer) return TextContent( type="text", text=result ) except Exception as e: logging.error(f"Failed to get dubbing information: {str(e)}") return TextContent( type="text", text=f"Failed to get dubbing information, tool temporarily unavailable" ) def get_removal_info(project_id: str) -> TextContent: """ Retrieve status and details of a subtitle removal task Args: project_id: The unique identifier of the subtitle removal task to check. This ID is returned from the remove_subtitle tool. Required. Returns: TextContent: Text content containing the status (e.g., "pending", "processing", "success", "failed") and other details of the subtitle removal task, including the URL to the processed video if the task has completed successfully. """ all_voice_lab = get_client() logging.info(f"Tool called: get_removal_info") logging.info(f"Project ID: {project_id}") # Validate parameters if not project_id: logging.warning("Project ID parameter is empty") return TextContent( type="text", text="project_id parameter cannot be empty" ) try: logging.info("Getting subtitle removal task information") removal_info = all_voice_lab.get_removal_info(project_id) logging.info(f"Subtitle removal info retrieved successfully for ID: {project_id}") # Format the result buffer = [] buffer.append(f"Project ID: {removal_info.project_id}\n") buffer.append(f"Status: {removal_info.status}\n") if removal_info.name: buffer.append(f"Project Name: {removal_info.name}\n") if removal_info.output_url and removal_info.status == "done": buffer.append(f"Output URL: {removal_info.output_url}\n") buffer.append( f"The subtitle removal task has been completed. You can download the processed video from the output URL.\n") else: buffer.append( f"The subtitle removal task is still in progress. Please check again later using the project ID.\n") # Join the list into a string result = "".join(buffer) return TextContent( type="text", text=result ) except Exception as e: logging.error(f"Failed to get subtitle removal information: {str(e)}") return TextContent( type="text", text=f"Failed to get subtitle removal information, tool temporarily unavailable" )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/allvoicelab/AllVoiceLab-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server