Skip to main content
Glama

Voice Mode

by mbailey
streaming.py•21.7 kB
""" Streaming audio playback for voice-mode. This module provides progressive audio playback to reduce latency by playing audio chunks as they arrive from the TTS service. """ import asyncio import io import logging import time import queue import threading from typing import Optional, Tuple, AsyncIterator from dataclasses import dataclass from pathlib import Path import numpy as np import sounddevice as sd from pydub import AudioSegment from .config import ( STREAM_CHUNK_SIZE, STREAM_BUFFER_MS, STREAM_MAX_BUFFER, SAMPLE_RATE, logger ) from .utils import get_event_logger # Opus decoder support (optional) try: import opuslib OPUS_AVAILABLE = True except ImportError: OPUS_AVAILABLE = False logger.info("opuslib not available - Opus streaming will use fallback method") @dataclass class StreamMetrics: """Metrics for streaming playback performance.""" ttfa: float = 0.0 # Time to first audio generation_time: float = 0.0 playback_time: float = 0.0 buffer_underruns: int = 0 chunks_received: int = 0 chunks_played: int = 0 audio_path: Optional[str] = None # Path to saved audio file class AudioStreamPlayer: """Manages streaming audio playback with buffering.""" def __init__(self, format: str, sample_rate: int = SAMPLE_RATE, channels: int = 1): self.format = format self.sample_rate = sample_rate self.channels = channels self.metrics = StreamMetrics() # Buffering self.audio_queue = queue.Queue(maxsize=int(STREAM_MAX_BUFFER * sample_rate)) self.min_buffer_samples = int((STREAM_BUFFER_MS / 1000.0) * sample_rate) # State self.playing = False self.finished_downloading = False self.playback_started = False self.start_time = time.perf_counter() # Partial data buffer for format-specific decoding self.partial_data = b'' # Initialize decoder based on format self.decoder = self._get_decoder() # Sounddevice stream self.stream = None self._lock = threading.Lock() def _get_decoder(self): """Get appropriate decoder for the audio format.""" if self.format == "opus" and OPUS_AVAILABLE: # Opus decoder initialization return opuslib.Decoder(self.sample_rate, self.channels) elif self.format == "pcm": # PCM needs no decoding return None else: # For MP3, AAC, etc. we'll use PyDub return "pydub" def _audio_callback(self, outdata, frames, time_info, status): """Sounddevice callback for audio playback.""" if status: logger.debug(f"Sounddevice status: {status}") try: # Fill output buffer from queue for i in range(frames): try: sample = self.audio_queue.get_nowait() outdata[i] = sample except queue.Empty: # Buffer underrun outdata[i] = 0 if self.playing: self.metrics.buffer_underruns += 1 # Track playback progress if self.playing: self.metrics.chunks_played += 1 except Exception as e: logger.error(f"Error in audio callback: {e}") outdata.fill(0) async def start(self): """Start the audio stream.""" self.stream = sd.OutputStream( samplerate=self.sample_rate, channels=self.channels, callback=self._audio_callback, blocksize=1024, dtype='float32' ) self.stream.start() logger.debug("Audio stream started") async def add_chunk(self, chunk: bytes) -> bool: """Add an audio chunk for playback. Returns True if this was the first chunk (TTFA moment). """ first_chunk = self.metrics.chunks_received == 0 self.metrics.chunks_received += 1 # Combine with any partial data data = self.partial_data + chunk try: # Decode chunk based on format samples = await self._decode_chunk(data) if samples is not None: # Successfully decoded - clear partial data self.partial_data = b'' # Add samples to playback queue await self._queue_samples(samples) # Check if we should start playback if not self.playback_started and self.audio_queue.qsize() >= self.min_buffer_samples: self.playback_started = True self.playing = True self.metrics.ttfa = time.perf_counter() - self.start_time logger.info(f"Starting playback - TTFA: {self.metrics.ttfa:.3f}s") return True else: # Partial data - save for next chunk self.partial_data = data except Exception as e: logger.error(f"Error decoding chunk: {e}") # Skip this chunk but try to continue self.partial_data = b'' return first_chunk and self.playback_started async def _decode_chunk(self, data: bytes) -> Optional[np.ndarray]: """Decode audio chunk to samples.""" if self.format == "pcm": # PCM is raw samples - just convert if len(data) % 2 != 0: # Incomplete sample - save for next chunk return None samples = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0 return samples elif self.format == "opus" and isinstance(self.decoder, opuslib.Decoder): # Opus decoding try: # Opus decoder needs complete frames pcm = self.decoder.decode(data, frame_size=960) samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0 return samples except Exception: # Incomplete frame - wait for more data return None elif self.decoder == "pydub": # Use PyDub for MP3, AAC, etc. # This is tricky because we need complete frames try: # Try to decode what we have audio = AudioSegment.from_file(io.BytesIO(data), format=self.format) samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 return samples except Exception: # Need more data for a complete frame return None return None async def _queue_samples(self, samples: np.ndarray): """Add samples to the playback queue.""" for sample in samples: try: self.audio_queue.put_nowait(sample) except queue.Full: # Buffer overflow - drop oldest samples try: self.audio_queue.get_nowait() self.audio_queue.put_nowait(sample) except queue.Empty: pass async def finish(self): """Signal that downloading is complete.""" self.finished_downloading = True self.metrics.generation_time = time.perf_counter() - self.start_time # Process any remaining partial data if self.partial_data: # For formats like MP3, we might have a complete frame now samples = await self._decode_chunk(self.partial_data) if samples is not None: await self._queue_samples(samples) # Wait for playback to complete while not self.audio_queue.empty() or self.playing: await asyncio.sleep(0.1) self.metrics.playback_time = time.perf_counter() - self.start_time async def stop(self): """Stop playback and cleanup.""" self.playing = False if self.stream: self.stream.stop() self.stream.close() logger.debug("Audio stream stopped") async def stream_pcm_audio( text: str, openai_client, request_params: dict, debug: bool = False, save_audio: bool = False, audio_dir: Optional[Path] = None, conversation_id: Optional[str] = None ) -> Tuple[bool, StreamMetrics]: """Stream PCM audio with true HTTP streaming for minimal latency. Uses the OpenAI SDK's streaming response with iter_bytes() for real-time playback. """ metrics = StreamMetrics() start_time = time.perf_counter() stream = None first_chunk_time = None save_buffer = io.BytesIO() if save_audio else None try: # Setup sounddevice stream for PCM playback # PCM parameters: 16-bit, mono, 24kHz (standard for TTS) audio_started = False audio_start_time = None def audio_callback(outdata, frames, time_info, status): """Callback to track when audio actually starts playing.""" nonlocal audio_started, audio_start_time if not audio_started and frames > 0: audio_started = True audio_start_time = time.perf_counter() stream = sd.OutputStream( samplerate=SAMPLE_RATE, # Standard TTS sample rate (24kHz) channels=1, dtype='int16' # PCM is 16-bit integers # Note: Can't use callback and write() together ) stream.start() # Log TTS playback start when we start the stream event_logger = get_event_logger() if event_logger: event_logger.log_event(event_logger.TTS_PLAYBACK_START) # Don't add stream parameter - Kokoro defaults to true, OpenAI doesn't support it logger.info("Starting true HTTP streaming with iter_bytes()") # Use the streaming response API async with openai_client.audio.speech.with_streaming_response.create( **request_params ) as response: chunk_count = 0 bytes_received = 0 # Stream chunks as they arrive async for chunk in response.iter_bytes(chunk_size=STREAM_CHUNK_SIZE): if chunk: # Track first chunk received if first_chunk_time is None: first_chunk_time = time.perf_counter() chunk_receive_time = first_chunk_time - start_time logger.info(f"First audio chunk received after {chunk_receive_time:.3f}s") # Log TTS first audio event event_logger = get_event_logger() if event_logger: event_logger.log_event(event_logger.TTS_FIRST_AUDIO) # Convert bytes to numpy array for sounddevice # PCM data is already in the right format audio_array = np.frombuffer(chunk, dtype=np.int16) # Play the chunk immediately stream.write(audio_array) # Save chunk if enabled if save_buffer: save_buffer.write(chunk) chunk_count += 1 bytes_received += len(chunk) metrics.chunks_received = chunk_count metrics.chunks_played = chunk_count if debug and chunk_count % 10 == 0: logger.debug(f"Streamed {chunk_count} chunks, {bytes_received} bytes") # Wait for playback to finish stream.stop() # Log TTS playback end if event_logger: event_logger.log_event(event_logger.TTS_PLAYBACK_END) end_time = time.perf_counter() metrics.generation_time = first_chunk_time - start_time if first_chunk_time else 0 metrics.playback_time = end_time - start_time # Calculate true TTFA based on actual audio playback or chunk receipt if debug and audio_start_time: # Use actual playback start time when available metrics.ttfa = audio_start_time - start_time logger.info(f"True TTFA (audio started): {metrics.ttfa:.3f}s") elif first_chunk_time: # Fall back to first chunk time metrics.ttfa = first_chunk_time - start_time logger.info(f"TTFA (first chunk): {metrics.ttfa:.3f}s") logger.info(f"Streaming complete - TTFA: {metrics.ttfa:.3f}s, " f"Total: {metrics.playback_time:.3f}s, " f"Chunks: {metrics.chunks_received}") # Save audio if enabled if save_audio and save_buffer and audio_dir: try: from .core import save_debug_file save_buffer.seek(0) audio_data = save_buffer.read() # PCM format needs special handling - save as WAV if audio_data: # For PCM, we need to save as WAV with proper headers import wave import tempfile with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_wav: with wave.open(tmp_wav.name, 'wb') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(SAMPLE_RATE) wav_file.writeframes(audio_data) # Read back the WAV file with open(tmp_wav.name, 'rb') as f: wav_data = f.read() import os os.unlink(tmp_wav.name) audio_path = save_debug_file(wav_data, "tts", "wav", audio_dir, True, conversation_id) if audio_path: logger.info(f"TTS audio saved to: {audio_path}") # Store audio path in metrics for the caller metrics.audio_path = audio_path except Exception as e: logger.error(f"Failed to save TTS audio: {e}") return True, metrics except Exception as e: logger.error(f"PCM streaming failed: {e}") return False, metrics finally: if stream: stream.close() async def stream_tts_audio( text: str, openai_client, request_params: dict, debug: bool = False, save_audio: bool = False, audio_dir: Optional[Path] = None, conversation_id: Optional[str] = None ) -> Tuple[bool, StreamMetrics]: """Stream TTS audio with progressive playback. Args: text: Text to convert to speech openai_client: OpenAI client instance request_params: Parameters for TTS request debug: Enable debug logging Returns: Tuple of (success, metrics) """ format = request_params.get('response_format', 'pcm') logger.info(f"Starting streaming TTS with format: {format}") # PCM is best for streaming (no decoding needed) # For other formats, we may need buffering if format == 'pcm': return await stream_pcm_audio( text=text, openai_client=openai_client, request_params=request_params, debug=debug, save_audio=save_audio, audio_dir=audio_dir, conversation_id=conversation_id ) else: # Use buffered streaming for formats that need decoding return await stream_with_buffering( text=text, openai_client=openai_client, request_params=request_params, debug=debug, save_audio=save_audio, audio_dir=audio_dir, conversation_id=conversation_id ) # Fallback for complex formats - buffer and decode complete file async def stream_with_buffering( text: str, openai_client, request_params: dict, sample_rate: int = 24000, # TTS standard sample rate debug: bool = False, save_audio: bool = False, audio_dir: Optional[Path] = None, conversation_id: Optional[str] = None ) -> Tuple[bool, StreamMetrics]: """Fallback streaming that buffers enough data to decode reliably. This is used for formats like MP3, Opus, etc where frame boundaries are critical. For Opus, we download the complete audio before playing. """ format = request_params.get('response_format', 'pcm') logger.info(f"Using buffered streaming for format: {format}") metrics = StreamMetrics() start_time = time.perf_counter() # Buffer for accumulating chunks buffer = io.BytesIO() # Separate buffer for saving complete audio save_buffer = io.BytesIO() if save_audio else None audio_started = False stream = None try: # Setup sounddevice stream stream = sd.OutputStream( samplerate=sample_rate, channels=1, dtype='float32' ) stream.start() # Don't add stream parameter - Kokoro defaults to true, OpenAI doesn't support it # Use the streaming response API for true HTTP streaming async with openai_client.audio.speech.with_streaming_response.create( **request_params ) as response: first_chunk_time = None # Stream chunks as they arrive async for chunk in response.iter_bytes(chunk_size=STREAM_CHUNK_SIZE): if chunk: # Track first chunk for TTFA if first_chunk_time is None: first_chunk_time = time.perf_counter() metrics.ttfa = first_chunk_time - start_time logger.info(f"First chunk received - TTFA: {metrics.ttfa:.3f}s") buffer.write(chunk) metrics.chunks_received += 1 # Also accumulate in save buffer if saving is enabled if save_buffer: save_buffer.write(chunk) # Try to decode when we have enough data (e.g., 32KB) if buffer.tell() > 32768 and not audio_started: buffer.seek(0) try: # Attempt to decode what we have audio = AudioSegment.from_file(buffer, format=format) samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 # Start playback metrics.ttfa = time.perf_counter() - start_time audio_started = True logger.info(f"Buffered streaming started - TTFA: {metrics.ttfa:.3f}s") # Play audio stream.write(samples) metrics.chunks_played += len(samples) // 1024 # Reset buffer for next batch buffer = io.BytesIO() except Exception as e: # Not enough valid data yet buffer.seek(0, io.SEEK_END) # Process any remaining data if buffer.tell() > 0: buffer.seek(0) try: audio = AudioSegment.from_file(buffer, format=format) samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 if not audio_started: metrics.ttfa = time.perf_counter() - start_time stream.write(samples) metrics.chunks_played += len(samples) // 1024 except Exception as e: logger.error(f"Failed to decode final buffer: {e}") metrics.generation_time = time.perf_counter() - start_time metrics.playback_time = metrics.generation_time # Approximate # Save audio if enabled if save_audio and save_buffer and audio_dir: try: from .core import save_debug_file save_buffer.seek(0) audio_data = save_buffer.read() audio_path = save_debug_file(audio_data, "tts", format, audio_dir, True, conversation_id) if audio_path: logger.info(f"TTS audio saved to: {audio_path}") except Exception as e: logger.error(f"Failed to save TTS audio: {e}") return True, metrics except Exception as e: logger.error(f"Buffered streaming failed: {e}") return False, metrics finally: if stream: stream.stop() stream.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mbailey/voicemode'

If you have feedback or need assistance with the MCP directory API, please join our Discord server