Speech MCP

""" Audio processing module for speech-mcp. This module provides centralized audio processing functionality including: - Audio device selection - Audio recording - Audio playback - Audio level visualization - Silence detection """ import os import time import tempfile import threading import wave import numpy as np import pyaudio from typing import Optional, List, Tuple, Callable, Any, Dict # Import the centralized logger from speech_mcp.utils.logger import get_logger # Get a logger for this module logger = get_logger(__name__, component="server") # Import centralized constants from speech_mcp.constants import ( CHUNK, FORMAT, CHANNELS, RATE, SILENCE_THRESHOLD, MAX_SILENCE_DURATION, SILENCE_CHECK_INTERVAL, START_LISTENING_SOUND, STOP_LISTENING_SOUND ) class AudioProcessor: """ Core audio processing class that handles device selection, recording, and playback. This class provides the shared audio functionality used by both the server and UI components. """ def __init__(self, on_audio_level: Optional[Callable[[float], None]] = None): """ Initialize the audio processor. Args: on_audio_level: Optional callback function that receives audio level updates (0.0 to 1.0) """ self.pyaudio = None self.stream = None self.selected_device_index = None self.is_listening = False self.audio_frames = [] self.on_audio_level = on_audio_level self._setup_audio() def _setup_audio(self) -> None: """Set up audio capture and processing.""" try: logger.info("Setting up audio processing") self.pyaudio = pyaudio.PyAudio() # Log audio device information logger.info(f"PyAudio version: {pyaudio.get_portaudio_version()}") # Get all available audio devices info = self.pyaudio.get_host_api_info_by_index(0) numdevices = info.get('deviceCount') logger.info(f"Found {numdevices} audio devices:") # Find the best input device for i in range(numdevices): try: device_info = self.pyaudio.get_device_info_by_host_api_device_index(0, i) device_name = device_info.get('name') max_input_channels = device_info.get('maxInputChannels') logger.info(f"Device {i}: {device_name}") logger.info(f" Max Input Channels: {max_input_channels}") logger.info(f" Default Sample Rate: {device_info.get('defaultSampleRate')}") # Only consider input devices if max_input_channels > 0: logger.info(f"Found input device: {device_name}") # Prefer non-default devices as they're often external mics if self.selected_device_index is None or 'default' not in device_name.lower(): self.selected_device_index = i logger.info(f"Selected input device: {device_name} (index {i})") except Exception as e: logger.warning(f"Error checking device {i}: {e}") if self.selected_device_index is None: logger.warning("No suitable input device found, using default") except Exception as e: logger.error(f"Error setting up audio: {e}") def start_listening(self, callback: Optional[Callable] = None) -> bool: """ Start listening for audio input. Args: callback: Optional callback function to call when audio data is received Returns: bool: True if listening started successfully, False otherwise """ if self.is_listening: logger.info("Already listening, ignoring start_listening call") return True self.is_listening = True self.audio_frames = [] # Play start listening notification sound threading.Thread(target=self.play_audio_file, args=(START_LISTENING_SOUND,), daemon=True).start() try: logger.info("Starting audio recording") def audio_callback(in_data, frame_count, time_info, status): try: # Check for audio status flags if status: status_flags = [] if status & pyaudio.paInputUnderflow: status_flags.append("Input Underflow") if status & pyaudio.paInputOverflow: status_flags.append("Input Overflow") if status & pyaudio.paOutputUnderflow: status_flags.append("Output Underflow") if status & pyaudio.paOutputOverflow: status_flags.append("Output Overflow") if status & pyaudio.paPrimingOutput: status_flags.append("Priming Output") if status_flags: logger.warning(f"Audio callback status flags: {', '.join(status_flags)}") # Store audio data for processing self.audio_frames.append(in_data) # Process audio for visualization self._process_audio_for_visualization(in_data) # Call user-provided callback if available if callback: callback(in_data) return (in_data, pyaudio.paContinue) except Exception as e: logger.error(f"Error in audio callback: {e}") return (in_data, pyaudio.paContinue) # Try to continue despite errors # Start the audio stream with the selected device logger.debug(f"Opening audio stream with FORMAT={FORMAT}, CHANNELS={CHANNELS}, RATE={RATE}, CHUNK={CHUNK}, DEVICE={self.selected_device_index}") self.stream = self.pyaudio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, input_device_index=self.selected_device_index, frames_per_buffer=CHUNK, stream_callback=audio_callback ) # Verify stream is active and receiving audio if not self.stream.is_active(): logger.error("Stream created but not active") self.is_listening = False return False logger.info("Audio stream initialized and receiving data") # Start a thread to detect silence and stop recording threading.Thread(target=self._detect_silence, daemon=True).start() return True except Exception as e: logger.error(f"Error starting audio stream: {e}") self.is_listening = False return False def _process_audio_for_visualization(self, audio_data: bytes) -> None: """ Process audio data for visualization. Args: audio_data: Raw audio data from PyAudio """ try: # Convert to numpy array data = np.frombuffer(audio_data, dtype=np.int16) # Normalize the data to range [-1, 1] normalized = data.astype(float) / 32768.0 # Take absolute value to get amplitude amplitude = np.abs(normalized).mean() # Apply amplification factor to make the visualization more prominent # Increase the factor from 1.0 to 5.0 to make the visualization more visible amplification_factor = 5.0 amplified_amplitude = min(amplitude * amplification_factor, 1.0) # Clamp to 1.0 max # Call the audio level callback if provided if self.on_audio_level: self.on_audio_level(amplified_amplitude) except Exception: pass def _detect_silence(self) -> None: """ Detect when the user stops speaking and end recording. This method runs in a separate thread and monitors audio levels to detect when the user has stopped speaking. """ try: # Wait for initial audio to accumulate time.sleep(0.5) # Initialize silence detection parameters silence_duration = 0 while self.is_listening and self.stream and silence_duration < MAX_SILENCE_DURATION: if not self.audio_frames or len(self.audio_frames) < 2: time.sleep(SILENCE_CHECK_INTERVAL) continue # Get the latest audio frame latest_frame = self.audio_frames[-1] audio_data = np.frombuffer(latest_frame, dtype=np.int16) normalized = audio_data.astype(float) / 32768.0 current_amplitude = np.abs(normalized).mean() if current_amplitude < SILENCE_THRESHOLD: silence_duration += SILENCE_CHECK_INTERVAL else: silence_duration = 0 time.sleep(SILENCE_CHECK_INTERVAL) # If we exited because of silence detection if self.is_listening and self.stream: self.stop_listening() except Exception: pass def stop_listening(self) -> None: """ Stop listening for audio input. Returns: None """ try: if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # Play stop listening notification sound threading.Thread(target=self.play_audio_file, args=(STOP_LISTENING_SOUND,), daemon=True).start() self.is_listening = False except Exception: self.is_listening = False def get_recorded_audio_path(self) -> Optional[str]: """ Save the recorded audio to a temporary WAV file and return the path. Returns: str: Path to the temporary WAV file, or None if an error occurred """ if not self.audio_frames: return None try: # Check if we have enough audio data total_audio_time = len(self.audio_frames) * (CHUNK / RATE) # Save the recorded audio to a temporary WAV file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio: temp_audio_path = temp_audio.name # Create a WAV file from the recorded frames wf = wave.open(temp_audio_path, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(self.pyaudio.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(self.audio_frames)) wf.close() return temp_audio_path except Exception: return None def record_audio(self) -> Optional[str]: """ Record audio from the microphone and return the path to the audio file. This is a blocking method that handles the entire recording process including starting recording, detecting silence, and stopping recording. Returns: str: Path to the recorded audio file, or None if an error occurred """ if not self.start_listening(): return None # Wait for recording to complete (silence detection will stop it) while self.is_listening: time.sleep(0.1) # Get the recorded audio file path return self.get_recorded_audio_path() def play_audio_file(self, file_path: str) -> bool: """ Play an audio file using PyAudio. Args: file_path: Path to the audio file to play Returns: bool: True if the file was played successfully, False otherwise """ try: if not os.path.exists(file_path): return False # Open the wave file with wave.open(file_path, 'rb') as wf: # Create PyAudio instance p = pyaudio.PyAudio() # Open stream stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # Read data in chunks and play chunk_size = 1024 data = wf.readframes(chunk_size) while data: stream.write(data) data = wf.readframes(chunk_size) # Close stream and PyAudio stream.stop_stream() stream.close() p.terminate() return True except Exception: return False def get_available_devices(self) -> List[Dict[str, Any]]: """ Get a list of available audio input devices. Returns: List of dictionaries containing device information """ devices = [] try: if not self.pyaudio: self._setup_audio() if not self.pyaudio: return devices # Get all available audio devices info = self.pyaudio.get_host_api_info_by_index(0) numdevices = info.get('deviceCount') for i in range(numdevices): try: device_info = self.pyaudio.get_device_info_by_host_api_device_index(0, i) max_input_channels = device_info.get('maxInputChannels') # Only include input devices if max_input_channels > 0: devices.append({ 'index': i, 'name': device_info.get('name'), 'channels': max_input_channels, 'sample_rate': device_info.get('defaultSampleRate') }) except Exception: pass return devices except Exception: return devices def set_device_index(self, device_index: int) -> bool: """ Set the audio input device by index. Args: device_index: Index of the audio device to use Returns: bool: True if the device was set successfully, False otherwise """ try: # Check if the device exists if not self.pyaudio: self._setup_audio() if not self.pyaudio: return False try: device_info = self.pyaudio.get_device_info_by_host_api_device_index(0, device_index) if device_info.get('maxInputChannels') > 0: self.selected_device_index = device_index return True else: return False except Exception: return False except Exception: return False def cleanup(self) -> None: """ Clean up resources used by the audio processor. This should be called when the audio processor is no longer needed. """ try: if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None if self.pyaudio: self.pyaudio.terminate() self.pyaudio = None except Exception: pass