tts_engine.py9.96 kB
from contextlib import contextmanager
import os
import time
import uuid
import threading
import queue
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, Dict, Any
import logging
import pyaudio
from openai import OpenAI
# Audio constants for OpenAI "pcm" response format (16-bit little-endian, mono, 24 kHz)
PA_SAMPLE_FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 24_000
CHUNK_SIZE = 4096
logger = logging.getLogger(__name__)
class AudioStatus(Enum):
"""Status of an audio message in the queue"""
QUEUED = "queued"
PLAYING = "playing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AudioMessage:
"""Message containing TTS request and status"""
text: str
instructions: Optional[str] = None
voice: str = "alloy"
model: str = "gpt-4o-mini-tts"
status: AudioStatus = AudioStatus.QUEUED
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
error_message: Optional[str] = None
first_chunk_time_ms: Optional[float] = None
completion_event: threading.Event = field(default_factory=threading.Event)
class TTSEngineError(Exception):
"""Base exception for TTS engine errors"""
pass
class TTSEngine:
"""OpenAI Text-to-Speech engine with queue-based non-blocking audio playback"""
def __init__(self, api_key: Optional[str] = None):
"""Initialize TTS engine with OpenAI client, PyAudio, and audio worker thread
Args:
api_key: OpenAI API key. If None, loads from OPENAI_API_KEY env var
"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise TTSEngineError("OPENAI_API_KEY environment variable not set")
self.client = OpenAI(api_key=self.api_key)
# Initialize PyAudio once
try:
self._pa = pyaudio.PyAudio()
logger.info("TTS Engine initialized with PyAudio")
except Exception as e:
raise TTSEngineError(f"Failed to initialize PyAudio: {e}")
# Initialize queue and worker thread
self._audio_queue = queue.Queue()
self._worker_thread = None
self._stop_worker = threading.Event()
self._start_worker_thread()
@contextmanager
def _audio_stream(self):
"""Context manager for audio stream with proper cleanup"""
stream = None
try:
stream = self._pa.open(
format=PA_SAMPLE_FORMAT,
channels=CHANNELS,
rate=RATE,
output=True,
frames_per_buffer=CHUNK_SIZE,
)
yield stream
except Exception as e:
raise TTSEngineError(f"Failed to open audio stream: {e}")
finally:
if stream is not None:
try:
stream.stop_stream()
stream.close()
except Exception as e:
logger.warning(f"Error closing audio stream: {e}")
def _start_worker_thread(self):
"""Start the dedicated audio worker thread"""
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_worker.clear()
self._worker_thread = threading.Thread(target=self._audio_player_worker, daemon=True)
self._worker_thread.start()
logger.info("Audio worker thread started")
def _audio_player_worker(self):
"""Worker thread that continuously processes audio messages from the queue"""
logger.info("Audio worker thread running")
while not self._stop_worker.is_set():
try:
# Wait for a message with timeout to allow for shutdown
try:
message = self._audio_queue.get(block=True, timeout=0.01)
except queue.Empty:
continue
# Process the audio message
self._process_audio_message(message)
# Mark task as done
self._audio_queue.task_done()
except Exception as e:
logger.error(f"Error in audio worker thread: {e}")
# Continue running even if one message fails
logger.info("Audio worker thread stopped")
def _process_audio_message(self, message: AudioMessage):
"""Process a single audio message (convert to speech and play)"""
try:
logger.info(f"Processing audio message {message.id}: '{message.text[:50]}...'")
# Update status to playing
message.status = AudioStatus.PLAYING
start_time = time.time()
first_chunk_time = None
# Request streaming PCM audio from OpenAI
create_params = {
"model": message.model,
"voice": message.voice,
"input": message.text,
"response_format": "pcm",
}
if message.instructions:
create_params["instructions"] = message.instructions
with self.client.audio.speech.with_streaming_response.create(**create_params) as response:
# Use context manager for audio stream
with self._audio_stream() as stream:
# Stream audio chunks directly to playback
for chunk in response.iter_bytes(CHUNK_SIZE):
if first_chunk_time is None:
first_chunk_time = (time.time() - start_time) * 1000
message.first_chunk_time_ms = first_chunk_time
logger.debug(f"First chunk latency for {message.id}: {first_chunk_time:.1f}ms")
stream.write(chunk)
# Mark as completed
message.status = AudioStatus.COMPLETED
total_time = (time.time() - start_time) * 1000
logger.info(f"Completed audio message {message.id} in {total_time:.1f}ms")
except Exception as e:
# Mark as failed and log error
message.status = AudioStatus.FAILED
message.error_message = str(e)
logger.error(f"Failed to process audio message {message.id}: {e}")
finally:
# Signal completion regardless of success/failure
message.completion_event.set()
def speak(self, text: str, voice: str = "alloy", model: str = "gpt-4o-mini-tts", instructions: Optional[str] = None, blocking: bool = False) -> Dict[str, Any]:
"""Add text to speech queue and optionally wait for completion
Args:
text: Text to convert to speech
voice: OpenAI voice to use
model: OpenAI TTS model to use
blocking: If True, wait for audio to complete playing before returning
Returns:
Dict with success status, message ID, and queue position (returns immediately if non-blocking)
"""
try:
# Create audio message
message = AudioMessage(
text=text,
instructions=instructions,
voice=voice,
model=model
)
# Add to queue
self._audio_queue.put(message)
queue_size = self._audio_queue.qsize()
# Truncate text for response
display_text = text[:50] + "..." if len(text) > 50 else text
# If blocking mode, wait for completion
if blocking:
logger.info(f"Waiting for audio message {message.id} to complete...")
message.completion_event.wait()
if message.status == AudioStatus.FAILED:
return {
"success": False,
"message": f"Failed to play audio: {message.error_message}",
"error": message.error_message
}
return {
"success": True,
"message": f"Completed: {display_text}",
"message_id": message.id,
"status": message.status.value,
"voice": voice,
"model": model
}
logger.info(f"Queued audio message '{display_text}' (queue size: {queue_size})")
return {
"success": True,
"message": f"Queued: {display_text}",
"message_id": message.id,
"queue_position": queue_size,
"voice": voice,
"model": model
}
except Exception as e:
logger.error(f"Error queuing TTS message: {e}")
return {
"success": False,
"message": f"Error: {str(e)}",
"error": str(e)
}
def __del__(self):
"""Cleanup on destruction"""
try:
logger.info("Shutting down TTS engine")
# Stop the worker thread
self._stop_worker.set()
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=5.0)
# Clean up PyAudio
if hasattr(self, '_pa') and self._pa:
try:
self._pa.terminate()
logger.debug("PyAudio terminated")
except Exception as e:
logger.warning(f"Error terminating PyAudio: {e}")
except Exception as e:
logger.warning(f"Error during TTS engine cleanup: {e}")