Skip to main content
Glama
audio_transcription.py108 kB
"""Advanced audio transcription and enhancement tools for Ultimate MCP Server. This module provides tools for high-quality audio transcription, pre-processing, and intelligent transcript enhancement with advanced features like speaker diarization, custom vocabulary support, and semantic structuring. """ import asyncio import concurrent.futures import datetime import json import os import re import shutil import subprocess import tempfile import time from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional import aiofiles import httpx from docx import Document from pydantic import BaseModel, Field from pydantic.functional_validators import field_validator from ultimate_mcp_server.constants import Provider, TaskType from ultimate_mcp_server.core.providers.base import get_provider from ultimate_mcp_server.exceptions import ( ProviderError, ResourceError, ToolError, ToolInputError, ) from ultimate_mcp_server.services.cache import with_cache from ultimate_mcp_server.tools.base import with_error_handling, with_retry, with_tool_metrics from ultimate_mcp_server.tools.completion import chat_completion, generate_completion from ultimate_mcp_server.utils import get_logger from ultimate_mcp_server.utils.text import count_tokens logger = get_logger("ultimate_mcp_server.tools.audio") # --- Constants and Enums --- class AudioEnhancementProfile(str, Enum): """Predefined audio enhancement profiles for different recording types.""" CONFERENCE_CALL = "conference_call" # Optimized for online meetings INTERVIEW = "interview" # Optimized for interview recordings LECTURE = "lecture" # Optimized for lectures/presentations NOISY = "noisy" # Heavy noise reduction for noisy environments PHONE_CALL = "phone_call" # Optimized for telephone audio VOICEMAIL = "voicemail" # Optimized for voicemail recordings CUSTOM = "custom" # User-defined settings # Expected model sizes in bytes, with some tolerance (minimum size) WHISPER_MODEL_SIZES = { "large-v3": 2900000000, # ~2.9GB "large-v3-turbo": 1500000000, # ~1.5GB } class TranscriptionQuality(str, Enum): """Quality settings for transcription, balancing speed vs accuracy.""" DRAFT = "draft" # Fastest, less accurate STANDARD = "standard" # Balanced speed/accuracy ENHANCED = "enhanced" # More accurate, slower MAXIMUM = "maximum" # Most accurate, slowest class EnhancementStyle(str, Enum): """Transcript enhancement styles for different use cases.""" RAW = "raw" # No enhancement, just cleaned READABLE = "readable" # Basic readability improvements POLISHED = "polished" # Well-formatted with proper punctuation VERBATIM = "verbatim" # Preserve all speech patterns, hesitations STRUCTURED = "structured" # Add semantic structure (paragraphs, sections) class OutputFormat(str, Enum): """Available output formats for transcripts.""" JSON = "json" # Full JSON with all metadata TEXT = "text" # Plain text SRT = "srt" # SubRip subtitle format VTT = "vtt" # WebVTT subtitle format DOCX = "docx" # Microsoft Word format MARKDOWN = "markdown" # Markdown with formatting # --- Schema Validation Models --- class AudioEnhancementParams(BaseModel): """Parameters for audio enhancement.""" profile: AudioEnhancementProfile = Field( default=AudioEnhancementProfile.CONFERENCE_CALL, description="Predefined audio enhancement profile" ) volume: float = Field( default=1.5, ge=0.1, le=5.0, description="Volume adjustment factor" ) noise_reduction: int = Field( default=10, ge=0, le=30, description="Noise reduction strength (0-30)" ) highpass: int = Field( default=200, ge=50, le=500, description="Highpass filter frequency in Hz" ) lowpass: int = Field( default=3000, ge=1000, le=20000, description="Lowpass filter frequency in Hz" ) normalize: bool = Field( default=True, description="Apply dynamic audio normalization" ) compression: bool = Field( default=True, description="Apply dynamic range compression" ) dereverberation: bool = Field( default=False, description="Apply dereverberation filter" ) custom_filters: Optional[str] = Field( default=None, description="Additional custom FFmpeg filters" ) output_channels: int = Field( default=2, ge=1, le=2, description="Number of output channels (1=mono, 2=stereo)" ) output_sample_rate: int = Field( default=16000, ge=8000, le=48000, description="Output sample rate in Hz" ) @field_validator('custom_filters') def validate_custom_filters(cls, v): """Validate that custom filters don't contain dangerous commands.""" if v: # Check for shell escape attempts dangerous_patterns = [';', '&&', '||', '`', '$', '\\', '>', '<', '|', '*', '?', '~', '#'] for pattern in dangerous_patterns: if pattern in v: raise ValueError(f"Custom filter contains disallowed character: {pattern}") return v class WhisperParams(BaseModel): """Parameters for Whisper transcription.""" model: str = Field( default="large-v3-turbo", description="Whisper model name" ) language: Optional[str] = Field( default=None, description="Language code (auto-detect if None)" ) quality: TranscriptionQuality = Field( default=TranscriptionQuality.STANDARD, description="Transcription quality level" ) beam_size: int = Field( default=5, ge=1, le=10, description="Beam size for beam search" ) processors: int = Field( default=2, ge=1, le=8, description="Number of processors to use" ) word_timestamps: bool = Field( default=True, description="Generate timestamps for each word" ) translate: bool = Field( default=False, description="Translate non-English to English" ) diarize: bool = Field( default=False, description="Attempt speaker diarization" ) highlight_words: bool = Field( default=False, description="Highlight words with lower confidence" ) max_context: int = Field( default=-1, ge=-1, description="Maximum number of text tokens to consider from previous history" ) custom_vocab: Optional[List[str]] = Field( default=None, description="Custom vocabulary terms to improve recognition" ) class TranscriptEnhancementParams(BaseModel): """Parameters for transcript enhancement.""" style: EnhancementStyle = Field( default=EnhancementStyle.READABLE, description="Enhancement style" ) provider: str = Field( default=Provider.ANTHROPIC.value, description="LLM provider for enhancement" ) model: Optional[str] = Field( default=None, description="Specific model to use (provider default if None)" ) identify_speakers: bool = Field( default=False, description="Attempt to identify and label speakers" ) add_paragraphs: bool = Field( default=True, description="Add paragraph breaks at natural points" ) fix_spelling: bool = Field( default=True, description="Fix spelling errors" ) fix_grammar: bool = Field( default=True, description="Fix basic grammatical errors" ) sections: bool = Field( default=False, description="Add section headings based on topic changes" ) max_chunk_size: int = Field( default=6500, ge=1000, le=100000, description="Maximum chunk size in characters" ) format_numbers: bool = Field( default=True, description="Format numbers consistently (e.g., '25' instead of 'twenty-five')" ) custom_instructions: Optional[str] = Field( default=None, description="Additional custom instructions for enhancement" ) class TranscriptionOptions(BaseModel): """Complete options for audio transcription.""" enhance_audio: bool = Field( default=True, description="Whether to preprocess audio with FFmpeg" ) enhance_transcript: bool = Field( default=True, description="Whether to enhance the transcript with LLM" ) parallel_processing: bool = Field( default=True, description="Process chunks in parallel when possible" ) max_workers: int = Field( default=4, ge=1, description="Maximum number of parallel workers" ) output_formats: List[OutputFormat] = Field( default=[OutputFormat.JSON, OutputFormat.TEXT], description="Output formats to generate" ) save_enhanced_audio: bool = Field( default=False, description="Save the enhanced audio file" ) keep_artifacts: bool = Field( default=False, description="Keep temporary files and artifacts" ) audio_params: AudioEnhancementParams = Field( default_factory=AudioEnhancementParams, description="Audio enhancement parameters" ) whisper_params: WhisperParams = Field( default_factory=WhisperParams, description="Whisper transcription parameters" ) enhancement_params: TranscriptEnhancementParams = Field( default_factory=TranscriptEnhancementParams, description="Transcript enhancement parameters" ) language_detection: bool = Field( default=False, # Disable language detection by default description="Automatically detect language before transcription" ) class Segment(BaseModel): """A segment of transcript with timing information.""" start: float = Field(..., description="Start time in seconds") end: float = Field(..., description="End time in seconds") text: str = Field(..., description="Segment text") speaker: Optional[str] = Field(None, description="Speaker identifier") words: Optional[List[Dict[str, Any]]] = Field(None, description="Word-level data") confidence: Optional[float] = Field(None, description="Confidence score") class AudioInfo(BaseModel): """Audio file information.""" duration: float = Field(..., description="Duration in seconds") channels: int = Field(..., description="Number of audio channels") sample_rate: int = Field(..., description="Sample rate in Hz") format: str = Field(..., description="Audio format") codec: Optional[str] = Field(None, description="Audio codec") bit_depth: Optional[int] = Field(None, description="Bit depth") bitrate: Optional[int] = Field(None, description="Bitrate in bits/second") size_bytes: Optional[int] = Field(None, description="File size in bytes") # --- Data Classes --- @dataclass class ProcessingContext: """Context for the transcription process.""" file_path: str temp_dir: str original_filename: str base_filename: str options: TranscriptionOptions enhanced_audio_path: Optional[str] = None processing_times: Dict[str, float] = None language_code: Optional[str] = None def __post_init__(self): if self.processing_times is None: self.processing_times = {} # --- Tool Functions --- @with_cache(ttl=24 * 60 * 60) # Cache results for 24 hours @with_tool_metrics @with_retry(max_retries=1, retry_delay=1.0) @with_error_handling async def transcribe_audio( file_path: str, options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Transcribes an audio file to text with advanced preprocessing and enhancement. This tool performs a multi-stage process: 1. Analyzes the audio file to determine optimal processing parameters 2. Enhances audio quality with adaptive filtering and preprocessing 3. Performs high-quality transcription with customizable settings 4. Intelligently enhances and structures the transcript for readability 5. Optionally identifies speakers and adds semantic structure Args: file_path: Path to the input audio file (.mp3, .m4a, .wav, etc.) options: Optional dictionary with transcription options including: - enhance_audio: Whether to preprocess audio (default True) - enhance_transcript: Whether to enhance transcript (default True) - parallel_processing: Process chunks in parallel (default True) - output_formats: List of output formats (default ["json", "text"]) - audio_params: Audio enhancement parameters - whisper_params: Whisper transcription parameters - enhancement_params: Transcript enhancement parameters Returns: A dictionary containing: { "raw_transcript": "Original unmodified transcript from Whisper", "enhanced_transcript": "LLM-enhanced transcript with improved formatting", "segments": [ { "start": 0.0, "end": 10.5, "text": "Segment text content", "speaker": "Speaker 1", # If speaker diarization is enabled "words": [...] # Word-level data if available }, ... ], "metadata": { "language": "en", "duration": 120.5, "title": "Automatically detected title", "topics": ["Topic 1", "Topic 2"] # If topic extraction is enabled }, "audio_info": { "duration": 120.5, "channels": 2, "sample_rate": 44100, "format": "wav", "codec": "pcm_s16le", "bit_depth": 16, "bitrate": 1411000, "size_bytes": 31000000 }, "processing_time": { "audio_analysis": 0.5, "audio_enhancement": 5.2, "language_detection": 1.1, "transcription": 45.3, "transcript_enhancement": 10.2, "total": 62.3 }, "artifacts": { "enhanced_audio": "/path/to/enhanced.wav", # If save_enhanced_audio is True "output_files": { "json": "/path/to/transcript.json", "text": "/path/to/transcript.txt", "srt": "/path/to/transcript.srt" } }, "tokens": { "input": 5000, "output": 3200, "total": 8200 }, "cost": 0.00185, "success": true } Raises: ToolInputError: If the file path is invalid or unsupported ToolError: If transcription or enhancement fails ResourceError: If required dependencies are not available """ # Start timing total processing start_time = time.time() # --- Input Validation --- try: if not file_path or not isinstance(file_path, str): raise ToolInputError("File path must be a non-empty string.") file_path = os.path.abspath(os.path.expanduser(file_path)) if not os.path.exists(file_path): raise ToolInputError(f"File not found: {file_path}") if not os.access(file_path, os.R_OK): raise ToolInputError(f"File not readable: {file_path}") # Validate file is an audio file _, ext = os.path.splitext(file_path) if ext.lower() not in ['.mp3', '.wav', '.m4a', '.mp4', '.flac', '.ogg', '.aac', '.wma', '.opus']: raise ToolInputError(f"Unsupported file format: {ext}. Please provide an audio file.") # Parse and validate options parsed_options = parse_options(options or {}) except Exception as e: if isinstance(e, ToolInputError): raise raise ToolInputError(f"Failed to validate input: {str(e)}") from e # --- Initialize Processing Context --- try: temp_dir = tempfile.mkdtemp(prefix="llm_audio_") original_filename = os.path.basename(file_path) base_filename = os.path.splitext(original_filename)[0] context = ProcessingContext( file_path=file_path, temp_dir=temp_dir, original_filename=original_filename, base_filename=base_filename, options=parsed_options, ) logger.info( f"Starting audio transcription process for {original_filename}", emoji_key="audio", temp_dir=temp_dir ) except Exception as e: raise ToolError(f"Failed to initialize processing context: {str(e)}") from e try: # --- Check Dependencies --- await check_dependencies(context) # --- Process Audio --- result = await process_audio_file(context) # --- Calculate Total Time --- total_time = time.time() - start_time context.processing_times["total"] = total_time result["processing_time"] = context.processing_times logger.success( f"Audio transcription completed in {total_time:.2f}s", emoji_key="success", file=context.original_filename, duration=result.get("audio_info", {}).get("duration", 0) ) return result except Exception as e: logger.error( f"Audio transcription failed: {str(e)}", emoji_key="error", exc_info=True, file=context.original_filename ) # Clean up temporary directory unless keep_artifacts is True if context.options.keep_artifacts: logger.info(f"Keeping artifacts in {temp_dir}") else: try: shutil.rmtree(temp_dir) except Exception as cleanup_err: logger.warning(f"Failed to clean up temporary directory: {cleanup_err}") if isinstance(e, (ToolError, ToolInputError, ResourceError)): raise # Return a structured error response instead of just raising an exception return { "raw_transcript": "", "enhanced_transcript": "", "segments": [], "metadata": {}, "audio_info": {}, "processing_time": context.processing_times if hasattr(context, "processing_times") else {}, "success": False, "error": f"Audio transcription failed: {str(e)}" } finally: # Clean up temporary directory unless keep_artifacts is True if not context.options.keep_artifacts: try: shutil.rmtree(temp_dir) except Exception as cleanup_err: logger.warning(f"Failed to clean up temporary directory: {cleanup_err}") # --- Main Processing Functions --- async def process_audio_file(context: ProcessingContext) -> Dict[str, Any]: """Process an audio file through the complete transcription pipeline.""" # Get detailed audio information audio_analysis_start = time.time() audio_info = await get_detailed_audio_info(context.file_path) context.processing_times["audio_analysis"] = time.time() - audio_analysis_start logger.info( f"Audio analysis: {audio_info.get('duration', 0):.1f}s duration, " f"{audio_info.get('sample_rate', 0)} Hz, " f"{audio_info.get('channels', 0)} channels", emoji_key="audio" ) # Update parameters based on audio analysis if needed _update_parameters_from_audio_info(context, audio_info) # --- Audio Enhancement --- enhanced_audio_path = context.file_path if context.options.enhance_audio: audio_enhance_start = time.time() logger.info("Enhancing audio quality", emoji_key="audio", profile=context.options.audio_params.profile.value) enhanced_audio_path = await enhance_audio(context, audio_info) if not enhanced_audio_path: logger.warning("Audio enhancement failed, falling back to original file", emoji_key="warning") enhanced_audio_path = context.file_path context.processing_times["audio_enhancement"] = time.time() - audio_enhance_start else: context.processing_times["audio_enhancement"] = 0 context.enhanced_audio_path = enhanced_audio_path if not os.path.exists(enhanced_audio_path): logger.warning(f"Enhanced audio path does not exist: {enhanced_audio_path}", emoji_key="warning") return { "raw_transcript": "", "enhanced_transcript": "", "segments": [], "metadata": {}, "audio_info": audio_info, "processing_time": context.processing_times, "success": False, "error": "Enhanced audio file not found" } # --- Skip Language Detection --- # Language detection is disabled - always use Whisper's built-in detection context.processing_times["language_detection"] = 0 if context.options.whisper_params.language: logger.info(f"Using specified language: {context.options.whisper_params.language}", emoji_key="language") else: logger.info("Using Whisper's built-in language detection", emoji_key="language") # --- Transcribe Audio --- transcribe_start = time.time() model = context.options.whisper_params.model quality = context.options.whisper_params.quality.value logger.info( f"Transcribing audio with model '{model}' (quality: {quality})", emoji_key="transcribe" ) try: transcript_result = await transcribe_with_whisper(context) context.processing_times["transcription"] = time.time() - transcribe_start raw_transcript = transcript_result.get("text", "") segments = transcript_result.get("segments", []) # Check if transcript is empty if not raw_transcript: logger.warning("Whisper returned an empty transcript", emoji_key="warning") else: transcript_length = len(raw_transcript) segments_count = len(segments) logger.info( f"Transcription complete: {transcript_length} characters, {segments_count} segments", emoji_key="success" ) # Extract metadata if available metadata = transcript_result.get("metadata", {}) if context.language_code and "language" not in metadata: metadata["language"] = context.language_code except Exception as e: logger.error(f"Transcription failed: {str(e)}", emoji_key="error", exc_info=True) context.processing_times["transcription"] = time.time() - transcribe_start return { "raw_transcript": "", "enhanced_transcript": "", "segments": [], "metadata": {"language": context.language_code} if context.language_code else {}, "audio_info": audio_info, "processing_time": context.processing_times, "success": False, "error": f"Transcription failed: {str(e)}" } # --- Enhance Transcript --- enhanced_transcript = raw_transcript enhancement_cost = 0.0 enhancement_tokens = {"input": 0, "output": 0, "total": 0} if context.options.enhance_transcript and raw_transcript: enhance_start = time.time() logger.info( f"Enhancing transcript with style: {context.options.enhancement_params.style.value}", emoji_key="enhance", provider=context.options.enhancement_params.provider ) try: enhancement_result = await enhance_transcript(context, raw_transcript, metadata) enhanced_transcript = enhancement_result["transcript"] enhancement_cost = enhancement_result["cost"] enhancement_tokens = enhancement_result["tokens"] if "topics" in enhancement_result and enhancement_result["topics"]: metadata["topics"] = enhancement_result["topics"] if "title" in enhancement_result and enhancement_result["title"]: metadata["title"] = enhancement_result["title"] context.processing_times["transcript_enhancement"] = time.time() - enhance_start if not enhanced_transcript: logger.warning("Enhancement returned an empty transcript, falling back to raw transcript", emoji_key="warning") enhanced_transcript = raw_transcript else: enhancement_length = len(enhanced_transcript) logger.info(f"Enhancement complete: {enhancement_length} characters", emoji_key="success") except Exception as e: logger.error(f"Transcript enhancement failed: {e}", emoji_key="error", exc_info=True) context.processing_times["transcript_enhancement"] = time.time() - enhance_start # Fall back to raw transcript enhanced_transcript = raw_transcript else: if not raw_transcript: logger.warning("Skipping transcript enhancement because raw transcript is empty", emoji_key="warning") elif not context.options.enhance_transcript: logger.info("Transcript enhancement disabled by options", emoji_key="info") context.processing_times["transcript_enhancement"] = 0 # --- Generate Output Files --- artifact_paths = await generate_output_files(context, raw_transcript, enhanced_transcript, segments, metadata) # --- Prepare Result --- result = { "raw_transcript": raw_transcript, "enhanced_transcript": enhanced_transcript, "segments": segments, "metadata": metadata, "audio_info": audio_info, "tokens": enhancement_tokens, "cost": enhancement_cost, "artifacts": artifact_paths, "success": bool(raw_transcript or enhanced_transcript) } return result def parse_options(options: Dict[str, Any]) -> TranscriptionOptions: """Parse and validate transcription options.""" # Convert string output formats to enum values if "output_formats" in options: if isinstance(options["output_formats"], list): formats = [] for fmt in options["output_formats"]: if isinstance(fmt, str): try: formats.append(OutputFormat(fmt.lower())) except ValueError: logger.warning(f"Invalid output format: {fmt}, ignoring") elif isinstance(fmt, OutputFormat): formats.append(fmt) if formats: # Only update if we have valid formats options["output_formats"] = formats # Handle nested parameter objects for key in ["audio_params", "whisper_params", "enhancement_params"]: if key in options and options[key]: # If a dictionary is provided, keep it for Pydantic if not isinstance(options[key], dict): # Convert non-dict to dict by serializing/deserializing if possible try: options[key] = json.loads(json.dumps(options[key])) except Exception: # If can't convert, remove the invalid value logger.warning(f"Invalid format for {key}, using defaults") options.pop(key) # Set audio profile parameters if a profile is specified if "audio_params" in options and "profile" in options["audio_params"]: profile = options["audio_params"]["profile"] if isinstance(profile, str): try: profile = AudioEnhancementProfile(profile.lower()) # Update audio parameters based on the selected profile options["audio_params"].update(_get_audio_profile_params(profile)) except ValueError: logger.warning(f"Invalid audio profile: {profile}, using default") # Set whisper quality parameters if quality is specified if "whisper_params" in options and "quality" in options["whisper_params"]: quality = options["whisper_params"]["quality"] if isinstance(quality, str): try: quality = TranscriptionQuality(quality.lower()) # Update whisper parameters based on the selected quality options["whisper_params"].update(_get_whisper_quality_params(quality)) except ValueError: logger.warning(f"Invalid transcription quality: {quality}, using default") # Parse with Pydantic model try: return TranscriptionOptions(**options) except Exception as e: logger.warning(f"Error parsing options: {e}, using defaults with valid values", emoji_key="warning") # Create with default options return TranscriptionOptions() def _get_audio_profile_params(profile: AudioEnhancementProfile) -> Dict[str, Any]: """Get audio enhancement parameters for a specific profile.""" profiles = { AudioEnhancementProfile.CONFERENCE_CALL: { "volume": 1.5, "noise_reduction": 10, "highpass": 200, "lowpass": 3000, "compression": True, "normalize": True, "dereverberation": False }, AudioEnhancementProfile.INTERVIEW: { "volume": 1.3, "noise_reduction": 8, "highpass": 150, "lowpass": 8000, "compression": True, "normalize": True, "dereverberation": False }, AudioEnhancementProfile.LECTURE: { "volume": 1.4, "noise_reduction": 6, "highpass": 120, "lowpass": 8000, "compression": True, "normalize": True, "dereverberation": True }, AudioEnhancementProfile.NOISY: { "volume": 1.8, "noise_reduction": 20, "highpass": 250, "lowpass": 3000, "compression": True, "normalize": True, "dereverberation": True }, AudioEnhancementProfile.PHONE_CALL: { "volume": 2.0, "noise_reduction": 15, "highpass": 300, "lowpass": 3400, "compression": True, "normalize": True, "dereverberation": False }, AudioEnhancementProfile.VOICEMAIL: { "volume": 2.0, "noise_reduction": 12, "highpass": 250, "lowpass": 3000, "compression": True, "normalize": True, "dereverberation": False } } return profiles.get(profile, {}) def _get_whisper_quality_params(quality: TranscriptionQuality) -> Dict[str, Any]: """Get whisper parameters for a specific quality level.""" quality_params = { TranscriptionQuality.DRAFT: { "beam_size": 1, "processors": 1, "word_timestamps": False, "highlight_words": False }, TranscriptionQuality.STANDARD: { "beam_size": 5, "processors": 2, "word_timestamps": True, "highlight_words": False }, TranscriptionQuality.ENHANCED: { "beam_size": 8, "processors": 2, "word_timestamps": True, "highlight_words": True }, TranscriptionQuality.MAXIMUM: { "beam_size": 10, "processors": 4, "word_timestamps": True, "highlight_words": True } } return quality_params.get(quality, {}) def _update_parameters_from_audio_info(context: ProcessingContext, audio_info: Dict[str, Any]) -> None: """Update processing parameters based on audio file analysis.""" # If mono audio, adjust enhancement params audio_params = context.options.audio_params updated_params = False if audio_info.get("channels", 0) == 1: # Set output channels to match input if not explicitly set if "output_channels" not in context.options.audio_params.dict(): # Create a copy with the updated parameter audio_params = AudioEnhancementParams( **{**audio_params.dict(), "output_channels": 1} ) updated_params = True # If low-quality audio, adjust enhancement profile sample_rate = audio_info.get("sample_rate", 0) if sample_rate < 16000 and context.options.audio_params.profile == AudioEnhancementProfile.CONFERENCE_CALL: logger.info(f"Detected low sample rate ({sample_rate} Hz), adjusting enhancement profile", emoji_key="audio") # Use phone_call profile for low sample rate audio params = _get_audio_profile_params(AudioEnhancementProfile.PHONE_CALL) # Create a copy with the updated parameters audio_params = AudioEnhancementParams( **{**audio_params.dict(), **params} ) updated_params = True # If params were updated, create a new options object with the updated audio_params if updated_params: context.options = TranscriptionOptions( **{**context.options.dict(), "audio_params": audio_params} ) # If very short audio (<10 seconds), adjust transcription quality duration = audio_info.get("duration", 0) if duration < 10 and context.options.whisper_params.quality != TranscriptionQuality.MAXIMUM: logger.info(f"Short audio detected ({duration:.1f}s), increasing transcription quality", emoji_key="audio") # Create a new whisper_params with enhanced quality whisper_params = WhisperParams( **{**context.options.whisper_params.dict(), "quality": TranscriptionQuality.ENHANCED} ) # Update options with new whisper_params context.options = TranscriptionOptions( **{**context.options.dict(), "whisper_params": whisper_params} ) # --- Dependency and Audio Processing Functions --- async def download_whisper_model(model_name: str, output_path: str) -> bool: """Download a Whisper model from Hugging Face using httpx. Args: model_name: Name of the model to download (e.g. 'large-v3') output_path: Path where to save the model file Returns: True if download was successful, False otherwise """ url = f"https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-{model_name}.bin" logger.info(f"Downloading Whisper model '{model_name}' from {url}", emoji_key="download") # Expected model size min_size_bytes = WHISPER_MODEL_SIZES.get(model_name, 100000000) # Default to 100MB minimum if unknown expected_size_bytes = min_size_bytes # Initialize with minimum size try: async with httpx.AsyncClient(timeout=None) as client: # Make HEAD request first to check if the URL is valid and get expected size try: head_response = await client.head(url) if head_response.status_code != 200: logger.warning(f"Model URL is not accessible: HTTP {head_response.status_code} for {url}", emoji_key="warning") return False # If Content-Length is available, use it to check expected size content_length = int(head_response.headers.get("content-length", 0)) if content_length > 0: expected_size_mb = content_length / (1024 * 1024) logger.info(f"Expected model size: {expected_size_mb:.1f} MB", emoji_key="info") # Update expected size if it's larger than our preset minimum if content_length > min_size_bytes: expected_size_bytes = int(content_length * 0.95) # Allow 5% tolerance except Exception as e: logger.warning(f"Failed to validate model URL: {url} - Error: {str(e)}", emoji_key="warning") # Continue anyway, the GET might still work # Stream the response to handle large files try: async with client.stream("GET", url) as response: if response.status_code != 200: logger.warning(f"Failed to download model: HTTP {response.status_code} for {url}", emoji_key="warning") if response.status_code == 404: logger.warning(f"Model '{model_name}' not found on Hugging Face repository", emoji_key="warning") return False # Get content length if available content_length = int(response.headers.get("content-length", 0)) if content_length == 0: logger.warning("Content length is zero or not provided", emoji_key="warning") downloaded = 0 # Open file for writing try: with open(output_path, "wb") as f: # Display progress last_log_time = time.time() async for chunk in response.aiter_bytes(chunk_size=8192): f.write(chunk) downloaded += len(chunk) # Log progress every 5 seconds now = time.time() if now - last_log_time > 5: if content_length > 0: percent = downloaded / content_length * 100 logger.info(f"Download progress: {percent:.1f}% ({downloaded/(1024*1024):.1f} MB)", emoji_key="download") else: logger.info(f"Downloaded {downloaded/(1024*1024):.1f} MB", emoji_key="download") last_log_time = now except IOError as e: logger.warning(f"Failed to write to file {output_path}: {str(e)}", emoji_key="warning") return False except httpx.RequestError as e: logger.warning(f"HTTP request error while downloading model: {str(e)}", emoji_key="warning") return False # Verify the file was downloaded if not os.path.exists(output_path): logger.warning(f"Downloaded file doesn't exist at {output_path}", emoji_key="warning") return False actual_size = os.path.getsize(output_path) if actual_size == 0: logger.warning(f"Downloaded file is empty at {output_path}", emoji_key="warning") return False # Verify file size meets expectations actual_size_mb = actual_size / (1024 * 1024) if actual_size < expected_size_bytes: logger.warning( f"Model file size ({actual_size_mb:.1f} MB) is smaller than expected minimum size " f"({expected_size_bytes/(1024*1024):.1f} MB). File may be corrupted or incomplete.", emoji_key="warning" ) return False logger.info(f"Successfully downloaded model to {output_path} ({actual_size_mb:.1f} MB)", emoji_key="success") return True except Exception as e: logger.warning(f"Error downloading whisper model: {e}", emoji_key="warning", exc_info=True) return False async def check_dependencies(context: ProcessingContext) -> bool: """Verifies that required dependencies are installed and accessible.""" # Check ffmpeg try: result = await asyncio.create_subprocess_exec( "ffmpeg", "-version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await result.communicate() if result.returncode != 0: raise ResourceError( "ffmpeg is not installed or not accessible. Please install it with 'apt install ffmpeg'." ) # Extract ffmpeg version for logging version_match = re.search(r'ffmpeg version (\S+)', stdout.decode('utf-8', errors='ignore')) version = version_match.group(1) if version_match else "unknown" logger.debug(f"Found ffmpeg version {version}", emoji_key="dependency") except FileNotFoundError as e: raise ResourceError( "ffmpeg is not installed. Please install it with 'apt install ffmpeg'." ) from e # Check whisper.cpp whisper_path = os.path.expanduser("~/whisper.cpp") # Use user-supplied model model = context.options.whisper_params.model model_path = os.path.join(whisper_path, "models", f"ggml-{model}.bin") if not os.path.exists(whisper_path): raise ResourceError( f"whisper.cpp not found at {whisper_path}. Please install it first." ) if not os.path.exists(model_path): # Check if models directory exists models_dir = os.path.join(whisper_path, "models") if not os.path.exists(models_dir): try: os.makedirs(models_dir) logger.info(f"Created models directory at {models_dir}", emoji_key="info") except Exception as e: raise ResourceError(f"Failed to create models directory: {e}") from e # Try to automatically download the model using httpx logger.info(f"Whisper model '{model}' not found at {model_path}", emoji_key="info") logger.info(f"Attempting to download model '{model}' now...", emoji_key="download") # Download the model directly using httpx - first check if model exists if model == "large-v3": # Double check if the model actually exists test_url = f"https://huggingface.co/ggerganov/whisper.cpp/resolve/main/ggml-{model}.bin" async with httpx.AsyncClient(timeout=10.0) as client: try: head_response = await client.head(test_url) if head_response.status_code != 200: # Model might not be directly available - show a clear error if head_response.status_code == 404: raise ResourceError( f"Whisper model 'large-v3' not found in the HuggingFace repository at {test_url}\n" f"Please download it manually with one of these commands:\n" f"1. ~/whisper.cpp/models/download-ggml-model.sh large-v3\n" f"2. Or try a different model like 'large-v3-turbo' which is known to be available" ) except Exception as e: logger.warning(f"Error checking model availability: {e}", emoji_key="warning") # Continue with download attempt anyway # Attempt to download the model download_success = await download_whisper_model(model, model_path) if not download_success: # Modified error message with alternative suggestions if model == "large-v3": raise ResourceError( f"Failed to download Whisper model '{model}'.\n" f"You can:\n" f"1. Try using a different model like 'large-v3-turbo' which is more reliable\n" f"2. Or download manually with: ~/whisper.cpp/models/download-ggml-model.sh {model}" ) else: raise ResourceError( f"Failed to download Whisper model '{model}'.\n" f"Please download it manually with: ~/whisper.cpp/models/download-ggml-model.sh {model}" ) # Verify that the model was downloaded if not os.path.exists(model_path): raise ResourceError( f"Model download completed but model file not found at {model_path}. " f"Please check the download and try again." ) # Verify model file size actual_size = os.path.getsize(model_path) expected_min_size = WHISPER_MODEL_SIZES.get(model, 100000000) # Default to 100MB minimum if unknown if actual_size < expected_min_size: actual_size_mb = actual_size / (1024 * 1024) expected_mb = expected_min_size / (1024 * 1024) raise ResourceError( f"Downloaded model file at {model_path} is too small ({actual_size_mb:.1f} MB). " f"Expected at least {expected_mb:.1f} MB. File may be corrupted or incomplete. " f"Please download it manually with: ~/whisper.cpp/models/download-ggml-model.sh {model}" ) logger.info(f"Successfully downloaded Whisper model '{model}'", emoji_key="success") else: # Verify existing model file size actual_size = os.path.getsize(model_path) expected_min_size = WHISPER_MODEL_SIZES.get(model, 100000000) # Default to 100MB minimum if unknown file_size_mb = actual_size / (1024 * 1024) if actual_size < expected_min_size: expected_mb = expected_min_size / (1024 * 1024) logger.warning( f"Existing model at {model_path} is suspiciously small ({file_size_mb:.1f} MB). " f"Expected at least {expected_mb:.1f} MB. Model may be corrupted.", emoji_key="warning" ) else: logger.info(f"Found existing model at {model_path} ({file_size_mb:.1f} MB)", emoji_key="dependency") # Check if whisper binary is available in PATH using shlex for command safety try: result = subprocess.run(["which", "whisper-cli"], capture_output=True, text=True) if result.returncode == 0: whisper_path_found = result.stdout.strip() logger.debug(f"Found whisper-cli in PATH: {whisper_path_found}", emoji_key="dependency") else: # Check in the expected location whisper_path = os.path.expanduser("~/whisper.cpp") whisper_bin = os.path.join(whisper_path, "build", "bin", "whisper-cli") if not os.path.exists(whisper_bin): raise ResourceError( f"whisper-cli binary not found at {whisper_bin}. " f"Please build whisper.cpp first with: " f"cd ~/whisper.cpp && cmake -B build && cmake --build build -j --config Release" ) logger.debug(f"Found whisper-cli at {whisper_bin}", emoji_key="dependency") except FileNotFoundError as e: raise ResourceError("Command 'which' not found. Cannot check for whisper-cli installation.") from e logger.debug(f"Found whisper model: {model}", emoji_key="dependency") return True async def get_detailed_audio_info(file_path: str) -> Dict[str, Any]: """Gets detailed information about an audio file using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate,size", "-select_streams", "a:0", "-show_entries", "stream=channels,sample_rate,codec_name,bits_per_sample", "-of", "json", file_path ] try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.warning(f"Failed to get audio info: {stderr.decode('utf-8', errors='ignore')}", emoji_key="warning") return { "duration": 0, "channels": 0, "sample_rate": 0, "format": os.path.splitext(file_path)[1][1:], "codec": None, "bit_depth": None, "bitrate": None, "size_bytes": os.path.getsize(file_path) if os.path.exists(file_path) else 0 } info = json.loads(stdout) format_info = info.get("format", {}) stream_info = info.get("streams", [{}])[0] if info.get("streams") else {} # Extract information duration = float(format_info.get("duration", 0)) channels = int(stream_info.get("channels", 0)) sample_rate = int(stream_info.get("sample_rate", 0)) codec = stream_info.get("codec_name") bit_depth = int(stream_info.get("bits_per_sample", 0)) or None bitrate = int(format_info.get("bit_rate", 0)) or None size_bytes = int(format_info.get("size", 0)) or os.path.getsize(file_path) audio_format = os.path.splitext(file_path)[1][1:] return { "duration": duration, "channels": channels, "sample_rate": sample_rate, "format": audio_format, "codec": codec, "bit_depth": bit_depth, "bitrate": bitrate, "size_bytes": size_bytes } except Exception as e: logger.warning(f"Error getting audio info: {e}", emoji_key="warning", exc_info=True) try: size_bytes = os.path.getsize(file_path) if os.path.exists(file_path) else 0 except Exception: size_bytes = 0 return { "duration": 0, "channels": 0, "sample_rate": 0, "format": os.path.splitext(file_path)[1][1:], "codec": None, "bit_depth": None, "bitrate": None, "size_bytes": size_bytes } async def enhance_audio(context: ProcessingContext, audio_info: Dict[str, Any]) -> Optional[str]: """Enhances audio quality using ffmpeg preprocessing.""" # Create output path in temp directory output_path = os.path.join(context.temp_dir, f"{context.base_filename}_enhanced.wav") # Use optimized audio enhancement settings # Build the complete command with the standardized enhancement settings cmd = [ "ffmpeg", "-i", context.file_path, "-threads", str(os.cpu_count() or 1), "-af", "volume=1.5, highpass=f=200, lowpass=f=3000, afftdn=nr=10:nf=-20, " "compand=attacks=0:points=-80/-80|-45/-15|-27/-9|0/-7|20/-7:gain=5, " "dynaudnorm=f=150:g=15:p=1:m=1:s=0, " "pan=stereo|c0=c0|c1=c0", "-ar", "16000", "-ac", "2", "-c:a", "pcm_s16le", "-y", # Overwrite output if exists output_path ] logger.debug(f"Running ffmpeg command: {' '.join(cmd)}", emoji_key="command") try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode('utf-8', errors='ignore') logger.error(f"FFmpeg error: {error_msg}", emoji_key="error") return None # Verify the output file was created and has a reasonable size if os.path.exists(output_path) and os.path.getsize(output_path) > 1000: file_size_mb = os.path.getsize(output_path) / (1024 * 1024) logger.info(f"Enhanced audio saved to {output_path} ({file_size_mb:.2f} MB)", emoji_key="audio") else: logger.warning("Enhanced audio file is suspiciously small or doesn't exist", emoji_key="warning") return None # If we're keeping enhanced audio, copy it to a persistent location if context.options.save_enhanced_audio: original_dir = os.path.dirname(context.file_path) persistent_path = os.path.join(original_dir, f"{context.base_filename}_enhanced.wav") shutil.copy2(output_path, persistent_path) logger.info(f"Saved enhanced audio to {persistent_path}", emoji_key="save") logger.info("Audio enhancement completed", emoji_key="audio") return output_path except Exception as e: logger.error(f"Error enhancing audio: {e}", emoji_key="error", exc_info=True) return None async def transcribe_with_whisper(context: ProcessingContext) -> Dict[str, Any]: """Transcribes audio using Whisper.cpp with advanced options.""" # Create output base name in temp directory output_base = os.path.join(context.temp_dir, context.base_filename) output_json = f"{output_base}.json" output_txt = f"{output_base}.txt" # Get whisper parameters params = context.options.whisper_params # Build command with configurable parameters whisper_bin = os.path.expanduser("~/whisper.cpp/build/bin/whisper-cli") model_path = os.path.expanduser(f"~/whisper.cpp/models/ggml-{params.model}.bin") # Validate required files exist if not os.path.exists(whisper_bin): logger.error(f"Whisper binary not found at {whisper_bin}", emoji_key="error") raise ToolError(f"Whisper binary not found at {whisper_bin}") if not os.path.exists(model_path): logger.error(f"Whisper model not found at {model_path}", emoji_key="error") raise ToolError(f"Whisper model not found at {model_path}") # Verify model file size actual_size = os.path.getsize(model_path) expected_min_size = WHISPER_MODEL_SIZES.get(params.model, 100000000) # Default to 100MB minimum if unknown if actual_size < expected_min_size: actual_size_mb = actual_size / (1024 * 1024) expected_mb = expected_min_size / (1024 * 1024) logger.warning( f"Model file at {model_path} is smaller than expected ({actual_size_mb:.1f} MB, expected {expected_mb:.1f} MB)", emoji_key="warning" ) # Use file_path as fallback if enhanced_audio_path is None audio_path = context.enhanced_audio_path or context.file_path if not os.path.exists(audio_path): logger.error(f"Audio file not found at {audio_path}", emoji_key="error") raise ToolError(f"Audio file not found at {audio_path}") cmd = [ whisper_bin, "-m", model_path, "-f", audio_path, "-of", output_base, "-oj" # Always output JSON for post-processing ] # Add boolean flags if params.word_timestamps: cmd.append("-pc") if params.translate: cmd.append("-tr") # Always output text for readability cmd.append("-otxt") # Add numeric parameters cmd.extend(["-t", str(os.cpu_count() if params.processors <= 0 else params.processors)]) if params.beam_size: cmd.extend(["-bs", str(params.beam_size)]) # Add language parameter if specified if params.language: cmd.extend(["-l", params.language]) # Add max context parameter if specified if params.max_context > 0: cmd.extend(["-mc", str(params.max_context)]) # Additional optimizations cmd.append("-fa") # Full sentence timestamps (improved segmentation) cmd.append("-pp") # Enable post-processing # Add custom vocab if specified (create a vocab file) if params.custom_vocab: vocab_path = os.path.join(context.temp_dir, "custom_vocab.txt") try: async with aiofiles.open(vocab_path, 'w') as f: await f.write("\n".join(params.custom_vocab)) cmd.extend(["-kv", vocab_path]) except Exception as e: logger.warning(f"Failed to create custom vocab file: {e}", emoji_key="warning") # Add diarization if requested if params.diarize: cmd.append("-dm") cmd_str = ' '.join(cmd) logger.debug(f"Running whisper command: {cmd_str}", emoji_key="command") try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() stderr_output = stderr.decode('utf-8', errors='ignore') if stderr else "" stdout_output = stdout.decode('utf-8', errors='ignore') if stdout else "" # Log outputs for debugging if stdout_output: logger.debug(f"Whisper stdout (first 500 chars): {stdout_output[:500]}", emoji_key="info") if stderr_output: if process.returncode != 0: logger.error(f"Whisper stderr: {stderr_output}", emoji_key="error") elif "error" in stderr_output.lower() or "warning" in stderr_output.lower(): logger.warning(f"Whisper warnings/errors: {stderr_output}", emoji_key="warning") else: logger.debug(f"Whisper stderr: {stderr_output}", emoji_key="info") if process.returncode != 0: error_msg = stderr_output or "Unknown error" logger.error(f"Whisper transcription error (exit code {process.returncode}): {error_msg}", emoji_key="error") raise ToolError(f"Whisper transcription failed with exit code {process.returncode}: {error_msg}") # Check if output files exist if not os.path.exists(output_json) and not os.path.exists(output_txt): logger.error("Whisper completed successfully but no output files were created", emoji_key="error") raise ToolError("Whisper completed successfully but no output files were created") # Read results from the JSON file if os.path.exists(output_json): json_file_size = os.path.getsize(output_json) logger.debug(f"Reading JSON output file: {output_json} ({json_file_size} bytes)", emoji_key="info") if json_file_size < 10: # Suspiciously small logger.warning(f"JSON output file is suspiciously small: {json_file_size} bytes", emoji_key="warning") try: async with aiofiles.open(output_json, 'r') as f: content = await f.read() try: result = json.loads(content) # Fix missing fields in result if needed if "segments" not in result: logger.warning("No segments found in Whisper JSON output", emoji_key="warning") result["segments"] = [] if "text" not in result or not result.get("text"): logger.warning("No transcript text found in Whisper JSON output", emoji_key="warning") # Try to construct text from segments if result.get("segments"): reconstructed_text = " ".join([seg.get("text", "") for seg in result["segments"]]) if reconstructed_text: logger.info("Reconstructed transcript text from segments", emoji_key="info") result["text"] = reconstructed_text else: result["text"] = "" else: result["text"] = "" # Extract metadata metadata = { "language": context.language_code or result.get("language"), "duration": result.get("duration", 0) } result["metadata"] = metadata except json.JSONDecodeError as e: logger.error(f"Failed to parse Whisper JSON output: {e}", emoji_key="error") logger.error(f"JSON content: {content[:1000]}...", emoji_key="error") raise ToolError(f"Failed to parse Whisper output JSON: {e}") from e except Exception as e: logger.error(f"Failed to read Whisper JSON output file: {e}", emoji_key="error") raise ToolError(f"Failed to read Whisper output: {e}") from e else: logger.warning(f"Whisper JSON output not found at expected path: {output_json}", emoji_key="warning") # Fallback to text file if os.path.exists(output_txt): txt_file_size = os.path.getsize(output_txt) logger.info(f"Falling back to text output file: {output_txt} ({txt_file_size} bytes)", emoji_key="info") if txt_file_size < 10: # Suspiciously small logger.warning(f"Text output file is suspiciously small: {txt_file_size} bytes", emoji_key="warning") try: async with aiofiles.open(output_txt, 'r') as f: text = await f.read() # Create minimal result structure result = { "text": text, "segments": [{"text": text, "start": 0, "end": 0}], "metadata": { "language": context.language_code, "duration": 0 } } if not text: logger.warning("Text output file is empty", emoji_key="warning") except Exception as e: logger.error(f"Failed to read text output file: {e}", emoji_key="error") raise ToolError(f"Failed to read Whisper text output: {e}") from e else: logger.error(f"No output files found from Whisper at {output_base}.*", emoji_key="error") raise ToolError("No output files found from Whisper transcription") # Check if we actually got a transcript if not result.get("text"): logger.warning("Whisper returned an empty transcript", emoji_key="warning") # Clean up results (remove empty/duplicate segments) cleaned_segments = clean_segments(result.get("segments", [])) result["segments"] = cleaned_segments # Clean up text (remove dots-only lines, etc.) result["text"] = clean_raw_transcript(result.get("text", "")) return result except ToolError: raise except Exception as e: logger.error(f"Error in Whisper transcription: {e}", emoji_key="error", exc_info=True) raise ToolError(f"Whisper transcription failed: {str(e)}") from e def clean_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Clean and normalize segment data.""" cleaned_segments = [] seen_texts = set() for segment in segments: # Skip segments with empty or meaningless text text = segment.get("text", "").strip() if not text or re.match(r'^[\s.]*$', text): continue # Skip exact duplicates unless they have meaningful timing differences # (within 0.5s of a previous segment) is_duplicate = False if text in seen_texts: for prev_segment in cleaned_segments: if prev_segment.get("text") == text: start_diff = abs(prev_segment.get("start", 0) - segment.get("start", 0)) end_diff = abs(prev_segment.get("end", 0) - segment.get("end", 0)) if start_diff < 0.5 and end_diff < 0.5: is_duplicate = True break if is_duplicate: continue # Add to seen texts seen_texts.add(text) # Standardize segment structure clean_segment = { "start": float(segment.get("start", 0)), "end": float(segment.get("end", 0)), "text": text } # Add optional fields if available for field in ["speaker", "words", "confidence"]: if field in segment: clean_segment[field] = segment[field] cleaned_segments.append(clean_segment) # Sort by start time cleaned_segments.sort(key=lambda x: x["start"]) return cleaned_segments def clean_raw_transcript(text: str) -> str: """Cleans raw transcript.""" if not text: return "" # Split into lines and process lines = text.split("\n") cleaned_lines = [] seen_lines = set() for line in lines: line = line.strip() # Skip empty lines if not line: continue # Skip lines with just dots or other meaningless patterns if re.match(r'^[\s.]*$', line) or line == '[BLANK_AUDIO]': continue # Standardize multiple spaces line = re.sub(r'\s+', ' ', line) # Keep duplicates if they're long (likely not duplicates but legitimate repetition) if line in seen_lines and len(line) <= 50: continue seen_lines.add(line) cleaned_lines.append(line) # Join but ensure there's proper spacing return "\n".join(cleaned_lines) # --- Transcript Enhancement Functions --- async def enhance_transcript( context: ProcessingContext, transcript: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: """Enhance a transcript with formatting, readability and semantic structuring.""" if not transcript or transcript.strip() == "": return { "transcript": "", "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0, "topics": [], "title": None } # Extract key parameters params = context.options.enhancement_params # Split transcript into manageable chunks chunks = await chunk_text(transcript, params.max_chunk_size) if not chunks: return { "transcript": transcript, "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0, "topics": [], "title": None } # First analyze context to get a summary of the content context_data = await detect_subject_matter( chunks[0], params.provider, params.model, metadata ) # Track topics if available topics = context_data.get("topics", []) title = context_data.get("title") context_info = context_data.get("context", "") logger.info(f"Content analysis complete: {len(topics)} topics identified", emoji_key="analyze") # Process chunks concurrently if parallel processing is enabled if context.options.parallel_processing and len(chunks) > 1: enhanced_chunks = await process_chunks_parallel( context, chunks, context_info, params ) else: enhanced_chunks = await process_chunks_sequential( context, chunks, context_info, params ) # Calculate total metrics total_tokens = {"input": 0, "output": 0, "total": 0} total_cost = 0.0 for chunk_data in enhanced_chunks: chunk_tokens = chunk_data.get("tokens", {}) total_tokens["input"] += chunk_tokens.get("input", 0) total_tokens["output"] += chunk_tokens.get("output", 0) total_tokens["total"] += chunk_tokens.get("total", 0) total_cost += chunk_data.get("cost", 0.0) # Join the enhanced chunks enhanced_transcript = "\n\n".join(chunk_data["text"] for chunk_data in enhanced_chunks) # If sections are enabled, try to add section headings if params.sections and topics: enhanced_transcript = await add_section_headings( enhanced_transcript, topics, params.provider, params.model ) return { "transcript": enhanced_transcript, "tokens": total_tokens, "cost": total_cost, "topics": topics, "title": title } async def chunk_text(text: str, max_chunk_size: int = 6500) -> List[str]: """Split text into chunks with intelligent boundary detection.""" if len(text) <= max_chunk_size: return [text] # Define patterns for natural breaks, prioritized patterns = [ r'\n\s*\n\s*\n', # Triple line break (highest priority) r'\n\s*\n', # Double line break r'(?<=[.!?])\s+(?=[A-Z])', # Sentence boundary with capital letter following r'(?<=[.!?])\s', # Any sentence boundary r'(?<=[,:;])\s' # Phrase boundary (lowest priority) ] chunks = [] remaining_text = text while remaining_text: if len(remaining_text) <= max_chunk_size: chunks.append(remaining_text) break # Start with an initial chunk at max size chunk_candidate = remaining_text[:max_chunk_size] split_position = None # Try each pattern in order of priority for pattern in patterns: # Look for the last occurrence of the pattern matches = list(re.finditer(pattern, chunk_candidate)) if matches: # Use the last match as the split point split_position = matches[-1].end() break # Fallback if no natural breaks found if split_position is None or split_position < max_chunk_size // 2: # Look for the last space after a minimum chunk size min_size = max(max_chunk_size // 2, 1000) last_space = chunk_candidate.rfind(' ', min_size) if last_space > min_size: split_position = last_space else: # Forced split at max_chunk_size split_position = max_chunk_size # Create chunk and update remaining text chunks.append(remaining_text[:split_position].strip()) remaining_text = remaining_text[split_position:].strip() # Validate chunks validated_chunks = [] for chunk in chunks: if chunk and len(chunk) >= 100: # Minimum viable chunk size validated_chunks.append(chunk) elif chunk: # If chunk is too small, combine with previous or next chunk if validated_chunks: validated_chunks[-1] += "\n\n" + chunk else: # This is the first chunk and it's too small - rare case validated_chunks.append(chunk) return validated_chunks async def detect_subject_matter( text: str, provider: str, model: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Analyze transcript to determine subject matter, topics, and title.""" prompt = """Analyze this transcript excerpt for the following: 1. CONTEXT: The primary domain or topic being discussed (e.g., technology, business, healthcare, etc.) 2. SPEAKERS: The likely type and number of speakers (e.g., interview, panel, lecture, etc.) 3. TOPICS: List 2-5 specific topics covered, in order of importance 4. TITLE: A short, descriptive title for this content (under 10 words) Return your analysis in JSON format ONLY: { "context": "Brief description of the domain and conversation type", "topics": ["Topic 1", "Topic 2", "Topic 3"], "title": "Concise descriptive title" } Transcript excerpt: {text}""" # Include metadata if available if metadata and metadata.get("language"): language = metadata.get("language") prompt += f"\n\nMetadata: The transcript language is {language}." try: result = await chat_completion( messages=[{"role": "user", "content": prompt.format(text=text)}], provider=provider, model=model, temperature=0 ) if result.get("success") and "message" in result: content = result["message"].get("content", "") # Try to parse JSON from content try: # Extract JSON if it's embedded in text json_match = re.search(r'({[\s\S]*})', content) if json_match: analysis = json.loads(json_match.group(1)) else: analysis = json.loads(content) return { "context": analysis.get("context", ""), "topics": analysis.get("topics", []), "title": analysis.get("title") } except json.JSONDecodeError: # Fallback: extract fields manually context_match = re.search(r'context["\s:]+([^"}\n]+)', content, re.IGNORECASE) title_match = re.search(r'title["\s:]+([^"}\n]+)', content, re.IGNORECASE) topics_match = re.search(r'topics["\s:]+\[(.*?)\]', content, re.IGNORECASE | re.DOTALL) context = context_match.group(1).strip() if context_match else "" title = title_match.group(1).strip() if title_match else None topics = [] if topics_match: topics_text = topics_match.group(1) topics = [t.strip().strip('"\'') for t in re.findall(r'"([^"]+)"', topics_text)] if not topics: topics = [t.strip() for t in topics_text.split(',') if t.strip()] return { "context": context, "topics": topics, "title": title } except Exception as e: logger.warning(f"Subject matter detection failed: {e}", emoji_key="warning") return { "context": "", "topics": [], "title": None } async def process_chunks_parallel( context: ProcessingContext, chunks: List[str], context_info: str, params: TranscriptEnhancementParams ) -> List[Dict[str, Any]]: """Process transcript chunks in parallel for better performance.""" # Limit max workers to CPU count or specified max max_workers = min(context.options.max_workers, os.cpu_count() or 4, len(chunks)) # Create a semaphore to limit concurrency sem = asyncio.Semaphore(max_workers) # Create a thread pool for parallel processing chunk_results = [] async def process_chunk(i, chunk): """Process an individual chunk.""" async with sem: # Use semaphore to limit concurrency logger.info(f"Enhancing chunk {i+1}/{len(chunks)}", emoji_key="enhance") try: result = await enhance_chunk(chunk, context_info, params, i, len(chunks)) return result except Exception as e: logger.error(f"Error enhancing chunk {i+1}: {e}", emoji_key="error", exc_info=True) return {"text": chunk, "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0} # Create tasks for parallel execution tasks = [process_chunk(i, chunk) for i, chunk in enumerate(chunks)] chunk_results = await asyncio.gather(*tasks) # Make sure results are in original order (they should be) return chunk_results async def process_chunks_sequential( context: ProcessingContext, chunks: List[str], context_info: str, params: TranscriptEnhancementParams ) -> List[Dict[str, Any]]: """Process transcript chunks sequentially to preserve context flow.""" enhanced_chunks = [] accumulated_context = context_info for i, chunk in enumerate(chunks): logger.info(f"Enhancing chunk {i+1}/{len(chunks)}", emoji_key="enhance") # Update context with information from previous chunks if i > 0 and enhanced_chunks: # Add brief summary of what was covered in previous chunk previous_text = enhanced_chunks[-1]["text"] if len(previous_text) > 500: accumulated_context += f"\nPrevious chunk ended with: {previous_text[-500:]}" else: accumulated_context += f"\nPrevious chunk: {previous_text}" try: result = await enhance_chunk(chunk, accumulated_context, params, i, len(chunks)) enhanced_chunks.append(result) except Exception as e: logger.error(f"Error enhancing chunk {i+1}: {e}", emoji_key="error", exc_info=True) # Use original text on error enhanced_chunks.append({"text": chunk, "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0}) return enhanced_chunks async def enhance_chunk( chunk: str, context_info: str, params: TranscriptEnhancementParams, chunk_index: int, total_chunks: int ) -> Dict[str, Any]: """Enhance a single transcript chunk with LLM.""" # Build the prompt based on enhancement parameters style_instructions = _get_style_instructions(params.style) fix_instructions = [] if params.add_paragraphs: fix_instructions.append("- Add paragraph breaks at natural topic transitions") if params.fix_spelling: fix_instructions.append("- Fix obvious spelling errors while preserving domain-specific terms") if params.fix_grammar: fix_instructions.append("- Fix basic grammatical errors without changing style or meaning") if params.format_numbers: fix_instructions.append("- Format numbers consistently (e.g., '25' instead of 'twenty-five')") if params.identify_speakers: fix_instructions.append("- Try to identify different speakers and label them as Speaker 1, Speaker 2, etc.") fix_instructions.append("- Format speaker changes as 'Speaker N: text' on a new line") fix_section = "\n".join(fix_instructions) if fix_instructions else "None" # Add custom instructions if provided custom_section = f"\nADDITIONAL INSTRUCTIONS:\n{params.custom_instructions}" if params.custom_instructions else "" # Mention chunk position in context position_info = f"This is chunk {chunk_index+1} of {total_chunks}." if total_chunks > 1 else "" prompt = f"""You are cleaning up a raw transcript from a recorded conversation. {position_info} CONTENT CONTEXT: {context_info} ENHANCEMENT STYLE: {style_instructions} CLEANUP INSTRUCTIONS: 1. Remove filler sounds: "um", "uh", "er", "ah", "hmm" 2. Remove stutters and word repetitions: "the- the", "I- I" 3. Remove meaningless filler phrases when used as pure filler: "you know", "like", "sort of" 4. Fix clear transcription errors and garbled text 5. Add proper punctuation for readability {fix_section} STRICT PRESERVATION RULES: 1. DO NOT modify, rephrase, or restructure ANY of the speaker's content 2. DO NOT add ANY new content or explanations 3. DO NOT make the language more formal or technical 4. DO NOT summarize or condense anything 5. PRESERVE ALL technical terms, numbers, and specific details exactly as spoken 6. PRESERVE the speaker's unique speaking style and personality {custom_section} Here's the transcript chunk to clean: {chunk} Return ONLY the cleaned transcript text with no explanations, comments, or metadata.""" try: result = await chat_completion( messages=[{"role": "user", "content": prompt}], provider=params.provider, model=params.model, temperature=0.1, # Low temperature for consistent results max_tokens=min(len(chunk) * 2, 8192) # Reasonable token limit based on input size ) if result.get("success") and "message" in result: enhanced_text = result["message"].get("content", "").strip() # Validation: if enhanced text is much shorter, it might have been summarized if len(enhanced_text) < len(chunk) * 0.6: logger.warning( f"Enhanced text suspiciously short ({len(enhanced_text)} vs {len(chunk)} chars), " f"may have been summarized. Using original.", emoji_key="warning" ) enhanced_text = chunk return { "text": enhanced_text, "tokens": result.get("tokens", {"input": 0, "output": 0, "total": 0}), "cost": result.get("cost", 0.0) } return {"text": chunk, "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0} except Exception as e: logger.error(f"Chunk enhancement error: {e}", emoji_key="error") return {"text": chunk, "tokens": {"input": 0, "output": 0, "total": 0}, "cost": 0.0} def _get_style_instructions(style: EnhancementStyle) -> str: """Get instructions for the specified enhancement style.""" styles = { EnhancementStyle.RAW: ( "Minimal cleaning only. Preserve all speech patterns and informality. " "Focus on removing only transcription errors and unintelligible elements." ), EnhancementStyle.READABLE: ( "Basic readability improvements. Light cleanup while preserving natural speech patterns. " "Remove only clear disfluencies and maintain conversational flow." ), EnhancementStyle.POLISHED: ( "Well-formatted with proper punctuation and clean sentences. " "Remove speech disfluencies but preserve the speaker's voice and style. " "Create a professional but authentic reading experience." ), EnhancementStyle.VERBATIM: ( "Preserve all speech patterns, hesitations, and repetitions. " "Format for readability but maintain every verbal quirk and pause. " "Indicate hesitations with ellipses [...] and preserve every repeated word or phrase." ), EnhancementStyle.STRUCTURED: ( "Add semantic structure with clear paragraphs around topics. " "Clean speech for maximum readability while preserving content accuracy. " "Organize into logical sections while keeping all original information." ) } return styles.get(style, styles[EnhancementStyle.READABLE]) async def add_section_headings( transcript: str, topics: List[str], provider: str, model: Optional[str] = None ) -> str: """Add section headings to the transcript based on topic changes.""" if not transcript or not topics: return transcript prompt = """Add clear section headings to this transcript based on topic changes. TOPICS COVERED (in approximate order): {topics} RULES: 1. Insert section headings as "## [Topic]" on their own line 2. Place headings ONLY where there is a clear topic change 3. Use at most {max_sections} headings total 4. NEVER add content or edit the existing text 5. NEVER remove any original content 6. Base headings on the given topics list, but you can adjust wording for clarity 7. Don't duplicate headings for the same topic 8. Keep headings short and descriptive (2-6 words each) TRANSCRIPT: {text} Return the full transcript with section headings added.""" # Adjust max sections based on transcript length token_estimate = len(transcript) // 4 max_sections = min(len(topics) + 1, token_estimate // 1000 + 1) topics_text = "\n".join([f"- {topic}" for topic in topics]) try: result = await chat_completion( messages=[{ "role": "user", "content": prompt.format(topics=topics_text, text=transcript, max_sections=max_sections) }], provider=provider, model=model, temperature=0.1, max_tokens=min(len(transcript) * 2, 8192) ) if result.get("success") and "message" in result: return result["message"].get("content", "").strip() return transcript except Exception as e: logger.warning(f"Failed to add section headings: {e}", emoji_key="warning") return transcript # --- Output File Generation --- async def generate_output_files( context: ProcessingContext, raw_transcript: str, enhanced_transcript: str, segments: List[Dict[str, Any]], metadata: Dict[str, Any] ) -> Dict[str, Any]: """Generate output files in requested formats.""" artifact_paths = { "output_files": {} } # Save enhanced audio path if requested if context.options.save_enhanced_audio and context.enhanced_audio_path: original_dir = os.path.dirname(context.file_path) persistent_path = os.path.join(original_dir, f"{context.base_filename}_enhanced.wav") # May have already been saved during enhancement if not os.path.exists(persistent_path) and os.path.exists(context.enhanced_audio_path): shutil.copy2(context.enhanced_audio_path, persistent_path) artifact_paths["enhanced_audio"] = persistent_path # Generate requested output formats output_formats = context.options.output_formats output_dir = os.path.dirname(context.file_path) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Convert temp files to Path objects for easier path manipulation output_dir = Path(os.path.dirname(context.file_path)) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # Generate JSON output if OutputFormat.JSON in output_formats: json_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.json" # Create JSON data structure json_data = { "metadata": { "filename": context.original_filename, "processed_at": timestamp, **metadata }, "raw_transcript": raw_transcript, "enhanced_transcript": enhanced_transcript, "segments": segments } # Write JSON file async with aiofiles.open(str(json_path), 'w') as f: await f.write(json.dumps(json_data, indent=2)) artifact_paths["output_files"]["json"] = json_path # Generate TEXT output if OutputFormat.TEXT in output_formats: text_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.txt" # Create plain text file async with aiofiles.open(str(text_path), 'w') as f: # Add metadata header if available if metadata: if "title" in metadata and metadata["title"]: await f.write(f"Title: {metadata['title']}\n") if "language" in metadata and metadata["language"]: await f.write(f"Language: {metadata['language']}\n") if "topics" in metadata and metadata["topics"]: topics_str = ", ".join(metadata["topics"]) await f.write(f"Topics: {topics_str}\n") await f.write("\n") # Write enhanced transcript if available, otherwise use raw transcript await f.write(enhanced_transcript or raw_transcript) artifact_paths["output_files"]["text"] = text_path # Generate SRT output if OutputFormat.SRT in output_formats: srt_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.srt" # Convert segments to SRT format srt_content = generate_srt(segments) # Write SRT file async with aiofiles.open(str(srt_path), 'w') as f: await f.write(srt_content) artifact_paths["output_files"]["srt"] = srt_path # Generate VTT output if OutputFormat.VTT in output_formats: vtt_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.vtt" # Convert segments to VTT format vtt_content = generate_vtt(segments) # Write VTT file async with aiofiles.open(str(vtt_path), 'w') as f: await f.write(vtt_content) artifact_paths["output_files"]["vtt"] = vtt_path # Generate Markdown output if OutputFormat.MARKDOWN in output_formats: md_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.md" # Create markdown content md_content = generate_markdown(enhanced_transcript, metadata) # Write markdown file async with aiofiles.open(str(md_path), 'w') as f: await f.write(md_content) artifact_paths["output_files"]["markdown"] = md_path # Generate DOCX output (if supported) if OutputFormat.DOCX in output_formats: try: docx_path = output_dir / f"{context.base_filename}_transcript_{timestamp}.docx" # Generate DOCX in a thread pool to avoid blocking with concurrent.futures.ThreadPoolExecutor() as executor: await asyncio.get_event_loop().run_in_executor( executor, generate_docx, docx_path, enhanced_transcript, metadata ) artifact_paths["output_files"]["docx"] = docx_path except (ImportError, Exception) as e: logger.warning(f"Failed to generate DOCX output: {e}", emoji_key="warning") return artifact_paths def generate_srt(segments: List[Dict[str, Any]]) -> str: """Generate SRT format from segments.""" srt_lines = [] for i, segment in enumerate(segments): # Convert times to SRT format (HH:MM:SS,mmm) start_time = segment.get("start", 0) end_time = segment.get("end", 0) start_str = format_srt_time(start_time) end_str = format_srt_time(end_time) # Format text text = segment.get("text", "").replace("\n", " ") # Add speaker if available if "speaker" in segment and segment["speaker"]: text = f"[{segment['speaker']}] {text}" # Add to SRT srt_lines.append(f"{i+1}") srt_lines.append(f"{start_str} --> {end_str}") srt_lines.append(f"{text}") srt_lines.append("") # Empty line between entries return "\n".join(srt_lines) def format_srt_time(seconds: float) -> str: """Format seconds as SRT time: HH:MM:SS,mmm.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) sec_int = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{sec_int:02d},{ms:03d}" def generate_vtt(segments: List[Dict[str, Any]]) -> str: """Generate WebVTT format from segments.""" vtt_lines = ["WEBVTT", ""] for segment in segments: # Convert times to VTT format (HH:MM:SS.mmm) start_time = segment.get("start", 0) end_time = segment.get("end", 0) start_str = format_vtt_time(start_time) end_str = format_vtt_time(end_time) # Format text text = segment.get("text", "").replace("\n", " ") # Add speaker if available if "speaker" in segment and segment["speaker"]: text = f"<v {segment['speaker']}>{text}</v>" # Add to VTT vtt_lines.append(f"{start_str} --> {end_str}") vtt_lines.append(f"{text}") vtt_lines.append("") # Empty line between entries return "\n".join(vtt_lines) def format_vtt_time(seconds: float) -> str: """Format seconds as WebVTT time: HH:MM:SS.mmm.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) sec_fractional = seconds % 60 return f"{hours:02d}:{minutes:02d}:{sec_fractional:06.3f}" def generate_markdown(transcript: str, metadata: Dict[str, Any]) -> str: """Generate Markdown format for the transcript.""" md_lines = [] # Add title if "title" in metadata and metadata["title"]: md_lines.append(f"# {metadata['title']}") md_lines.append("") else: md_lines.append("# Transcript") md_lines.append("") # Add metadata section md_lines.append("## Metadata") md_lines.append("") if "language" in metadata and metadata["language"]: md_lines.append(f"- **Language:** {metadata['language']}") if "duration" in metadata and metadata["duration"]: duration_min = int(metadata["duration"] // 60) duration_sec = int(metadata["duration"] % 60) md_lines.append(f"- **Duration:** {duration_min} min {duration_sec} sec") if "topics" in metadata and metadata["topics"]: topics_str = ", ".join(metadata["topics"]) md_lines.append(f"- **Topics:** {topics_str}") md_lines.append("") md_lines.append("## Transcript") md_lines.append("") # Add transcript with proper line breaks preserved for line in transcript.split("\n"): md_lines.append(line) return "\n".join(md_lines) def generate_docx(docx_path: str, transcript: str, metadata: Dict[str, Any]) -> None: """Generate DOCX format for the transcript.""" # Must be run in a ThreadPoolExecutor since python-docx is not async doc = Document() # Add title if "title" in metadata and metadata["title"]: title = doc.add_heading(metadata["title"], 0) else: title = doc.add_heading("Transcript", 0) # noqa: F841 # Add metadata section doc.add_heading("Metadata", 1) if "language" in metadata and metadata["language"]: doc.add_paragraph(f"Language: {metadata['language']}") if "duration" in metadata and metadata["duration"]: duration_min = int(metadata["duration"] // 60) duration_sec = int(metadata["duration"] % 60) doc.add_paragraph(f"Duration: {duration_min} min {duration_sec} sec") if "topics" in metadata and metadata["topics"]: topics_str = ", ".join(metadata["topics"]) doc.add_paragraph(f"Topics: {topics_str}") # Add transcript doc.add_heading("Transcript", 1) # Split into paragraphs and add for paragraph in transcript.split("\n\n"): if paragraph.strip(): p = doc.add_paragraph() # Check if paragraph starts with a heading marker if paragraph.startswith("##"): parts = paragraph.split(" ", 1) if len(parts) > 1: doc.add_heading(parts[1], 2) continue # Regular paragraph p.add_run(paragraph) # Save the document doc.save(docx_path) async def chat_with_transcript( transcript: str, query: str, provider: str = Provider.ANTHROPIC.value, model: Optional[str] = None, context: Optional[str] = None ) -> Dict[str, Any]: """Chat with a transcript to extract information or answer questions about its content. Args: transcript: The transcript text to analyze query: The question or instruction to process regarding the transcript provider: LLM provider to use (default: Anthropic) model: Specific model to use (default: provider's default model) context: Optional additional context about the audio/transcript Returns: A dictionary containing the response and related metadata """ if not transcript or not isinstance(transcript, str): raise ToolInputError("Transcript must be a non-empty string.") if not query or not isinstance(query, str): raise ToolInputError("Query must be a non-empty string.") # Calculate token count for logging try: transcript_tokens = count_tokens(transcript, model) query_tokens = count_tokens(query, model) logger.info( f"Transcript: {transcript_tokens} tokens, Query: {query_tokens} tokens", emoji_key=TaskType.CHAT.value ) except Exception as e: logger.warning(f"Failed to count tokens: {e}", emoji_key="warning") # Build the prompt system_prompt = """You are an expert at analyzing transcripts and extracting information. Provide concise, accurate answers based solely on the provided transcript. If the answer is not in the transcript, say so clearly.""" if context: system_prompt += f"\n\nAdditional context about this transcript: {context}" # Get provider instance to ensure it exists and is available try: provider_instance = await get_provider(provider) if model is None: # Check if the provider has a default model or use claude-3-7-sonnet as fallback default_models = await provider_instance.list_models() if default_models and len(default_models) > 0: model = default_models[0].get("id") else: model = "claude-3-7-sonnet-20250219" if provider == Provider.ANTHROPIC.value else None logger.info(f"Using model: {provider}/{model}", emoji_key="model") except Exception as e: raise ProviderError( f"Failed to initialize provider '{provider}': {str(e)}", provider=provider, cause=e ) from e # Use relative file paths for any file references rel_transcript_path = None if os.path.exists(transcript): rel_transcript_path = Path(transcript).relative_to(Path.cwd()) # noqa: F841 # Create the message with the transcript and query user_message = f"""Here is a transcript to analyze: ---TRANSCRIPT BEGIN--- {transcript} ---TRANSCRIPT END--- {query}""" # Send to LLM result = await chat_completion( messages=[{"role": "user", "content": user_message}], provider=provider, model=model, system_prompt=system_prompt, temperature=0.1 ) return result @with_cache(ttl=24 * 60 * 60) # Cache results for 24 hours @with_tool_metrics @with_retry(max_retries=1, retry_delay=1.0) @with_error_handling async def extract_audio_transcript_key_points( file_path_or_transcript: str, is_file: bool = True, provider: str = Provider.ANTHROPIC.value, model: Optional[str] = None, max_points: int = 10 ) -> Dict[str, Any]: """Extracts the most important key points from an audio transcript. This tool can process either an audio file (which it will transcribe first) or directly analyze an existing transcript to identify the most important information, main topics, and key takeaways. Args: file_path_or_transcript: Path to audio file or transcript text content is_file: Whether the input is a file path (True) or transcript text (False) provider: LLM provider to use for analysis model: Specific model to use (provider default if None) max_points: Maximum number of key points to extract Returns: A dictionary containing: { "key_points": ["Point 1", "Point 2", ...], "summary": "Brief summary of the content", "topics": ["Topic 1", "Topic 2", ...], "speakers": ["Speaker 1", "Speaker 2", ...] (if multiple speakers detected), "tokens": { statistics about token usage }, "cost": estimated cost of the operation, "processing_time": total processing time in seconds } """ start_time = time.time() # Get transcript from file or use provided text transcript = "" if is_file: try: # Validate file path file_path = os.path.abspath(os.path.expanduser(file_path_or_transcript)) if not os.path.exists(file_path): raise ToolInputError(f"File not found: {file_path}") # Get file info for logging file_size_mb = os.path.getsize(file_path) / (1024 * 1024) file_name = Path(file_path).name logger.info( f"Extracting key points from audio file: {file_name} ({file_size_mb:.2f} MB)", emoji_key="audio" ) # Transcribe audio transcription_result = await transcribe_audio(file_path, { "enhance_audio": True, "enhance_transcript": True, "output_formats": ["json"] }) transcript = transcription_result.get("enhanced_transcript", "") if not transcript: transcript = transcription_result.get("raw_transcript", "") if not transcript: raise ToolError("Failed to generate transcript from audio") except Exception as e: if isinstance(e, (ToolError, ToolInputError)): raise raise ToolError(f"Failed to process audio file: {str(e)}") from e else: # Input is already a transcript transcript = file_path_or_transcript if not transcript or not isinstance(transcript, str): raise ToolInputError("Transcript text must be a non-empty string") # Calculate token count for the transcript try: token_count = count_tokens(transcript, model) logger.info(f"Transcript token count: {token_count}", emoji_key="tokens") except Exception as e: logger.warning(f"Failed to count tokens: {e}") # Create prompt for key points extraction prompt = f"""Extract the most important key points from this transcript. Identify: 1. The {max_points} most important key points or takeaways 2. Main topics discussed 3. Any speakers or main entities mentioned (if identifiable) 4. A brief summary (2-3 sentences max) Format your response as JSON with these fields: {{ "key_points": ["Point 1", "Point 2", ...], "topics": ["Topic 1", "Topic 2", ...], "speakers": ["Speaker 1", "Speaker 2", ...], "summary": "Brief summary here" }} TRANSCRIPT: {transcript} """ # Get provider instance try: provider_instance = await get_provider(provider) # noqa: F841 except Exception as e: raise ProviderError( f"Failed to initialize provider '{provider}': {str(e)}", provider=provider, cause=e ) from e # Generate completion try: completion_result = await generate_completion( prompt=prompt, provider=provider, model=model, temperature=0.1, max_tokens=1000 ) # Parse JSON response response_text = completion_result.get("text", "") # Find JSON in the response json_match = re.search(r'({[\s\S]*})', response_text) if json_match: try: extracted_data = json.loads(json_match.group(1)) except json.JSONDecodeError: # Fallback to regex extraction extracted_data = _extract_key_points_with_regex(response_text) else: # Fallback to regex extraction extracted_data = _extract_key_points_with_regex(response_text) processing_time = time.time() - start_time # Add token and cost info result = { "key_points": extracted_data.get("key_points", []), "topics": extracted_data.get("topics", []), "speakers": extracted_data.get("speakers", []), "summary": extracted_data.get("summary", ""), "tokens": completion_result.get("tokens", {"input": 0, "output": 0, "total": 0}), "cost": completion_result.get("cost", 0.0), "processing_time": processing_time } return result except Exception as e: error_model = model or f"{provider}/default" raise ProviderError( f"Key points extraction failed for model '{error_model}': {str(e)}", provider=provider, model=error_model, cause=e ) from e def _extract_key_points_with_regex(text: str) -> Dict[str, Any]: """Extract key points data using regex when JSON parsing fails.""" result = { "key_points": [], "topics": [], "speakers": [], "summary": "" } # Extract key points key_points_pattern = r'key_points"?\s*:?\s*\[\s*"([^"]+)"(?:\s*,\s*"([^"]+)")*\s*\]' key_points_match = re.search(key_points_pattern, text, re.IGNORECASE | re.DOTALL) if key_points_match: point_list = re.findall(r'"([^"]+)"', key_points_match.group(0)) # Filter out empty strings result["key_points"] = [p for p in point_list if p.strip()] else: # Try alternative pattern for non-JSON format point_list = re.findall(r'(?:^|\n)(?:•|\*|-|[0-9]+\.)\s*([^\n]+?)(?:\n|$)', text) # Filter out empty strings result["key_points"] = [p.strip() for p in point_list if p.strip()][:10] # Limit to 10 points # Extract topics topics_pattern = r'topics"?\s*:?\s*\[\s*"([^"]+)"(?:\s*,\s*"([^"]+)")*\s*\]' topics_match = re.search(topics_pattern, text, re.IGNORECASE | re.DOTALL) if topics_match: topic_list = re.findall(r'"([^"]+)"', topics_match.group(0)) # Filter out empty strings result["topics"] = [t for t in topic_list if t.strip()] # Extract speakers speakers_pattern = r'speakers"?\s*:?\s*\[\s*"([^"]+)"(?:\s*,\s*"([^"]+)")*\s*\]' speakers_match = re.search(speakers_pattern, text, re.IGNORECASE | re.DOTALL) if speakers_match: speaker_list = re.findall(r'"([^"]+)"', speakers_match.group(0)) # Filter out empty strings result["speakers"] = [s for s in speaker_list if s.strip()] # Extract summary summary_pattern = r'summary"?\s*:?\s*"([^"]+)"' summary_match = re.search(summary_pattern, text, re.IGNORECASE) if summary_match and summary_match.group(1).strip(): result["summary"] = summary_match.group(1).strip() return result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Kappasig920/Ultimate-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server