Skip to main content
Glama

Text to Speech MCP Server

by JosefGold
tts_engine.py9.96 kB
from contextlib import contextmanager import os import time import uuid import threading import queue from dataclasses import dataclass, field from enum import Enum from typing import Optional, Dict, Any import logging import pyaudio from openai import OpenAI # Audio constants for OpenAI "pcm" response format (16-bit little-endian, mono, 24 kHz) PA_SAMPLE_FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 24_000 CHUNK_SIZE = 4096 logger = logging.getLogger(__name__) class AudioStatus(Enum): """Status of an audio message in the queue""" QUEUED = "queued" PLAYING = "playing" COMPLETED = "completed" FAILED = "failed" @dataclass class AudioMessage: """Message containing TTS request and status""" text: str instructions: Optional[str] = None voice: str = "alloy" model: str = "gpt-4o-mini-tts" status: AudioStatus = AudioStatus.QUEUED id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) error_message: Optional[str] = None first_chunk_time_ms: Optional[float] = None completion_event: threading.Event = field(default_factory=threading.Event) class TTSEngineError(Exception): """Base exception for TTS engine errors""" pass class TTSEngine: """OpenAI Text-to-Speech engine with queue-based non-blocking audio playback""" def __init__(self, api_key: Optional[str] = None): """Initialize TTS engine with OpenAI client, PyAudio, and audio worker thread Args: api_key: OpenAI API key. If None, loads from OPENAI_API_KEY env var """ self.api_key = api_key or os.getenv("OPENAI_API_KEY") if not self.api_key: raise TTSEngineError("OPENAI_API_KEY environment variable not set") self.client = OpenAI(api_key=self.api_key) # Initialize PyAudio once try: self._pa = pyaudio.PyAudio() logger.info("TTS Engine initialized with PyAudio") except Exception as e: raise TTSEngineError(f"Failed to initialize PyAudio: {e}") # Initialize queue and worker thread self._audio_queue = queue.Queue() self._worker_thread = None self._stop_worker = threading.Event() self._start_worker_thread() @contextmanager def _audio_stream(self): """Context manager for audio stream with proper cleanup""" stream = None try: stream = self._pa.open( format=PA_SAMPLE_FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK_SIZE, ) yield stream except Exception as e: raise TTSEngineError(f"Failed to open audio stream: {e}") finally: if stream is not None: try: stream.stop_stream() stream.close() except Exception as e: logger.warning(f"Error closing audio stream: {e}") def _start_worker_thread(self): """Start the dedicated audio worker thread""" if self._worker_thread is None or not self._worker_thread.is_alive(): self._stop_worker.clear() self._worker_thread = threading.Thread(target=self._audio_player_worker, daemon=True) self._worker_thread.start() logger.info("Audio worker thread started") def _audio_player_worker(self): """Worker thread that continuously processes audio messages from the queue""" logger.info("Audio worker thread running") while not self._stop_worker.is_set(): try: # Wait for a message with timeout to allow for shutdown try: message = self._audio_queue.get(block=True, timeout=0.01) except queue.Empty: continue # Process the audio message self._process_audio_message(message) # Mark task as done self._audio_queue.task_done() except Exception as e: logger.error(f"Error in audio worker thread: {e}") # Continue running even if one message fails logger.info("Audio worker thread stopped") def _process_audio_message(self, message: AudioMessage): """Process a single audio message (convert to speech and play)""" try: logger.info(f"Processing audio message {message.id}: '{message.text[:50]}...'") # Update status to playing message.status = AudioStatus.PLAYING start_time = time.time() first_chunk_time = None # Request streaming PCM audio from OpenAI create_params = { "model": message.model, "voice": message.voice, "input": message.text, "response_format": "pcm", } if message.instructions: create_params["instructions"] = message.instructions with self.client.audio.speech.with_streaming_response.create(**create_params) as response: # Use context manager for audio stream with self._audio_stream() as stream: # Stream audio chunks directly to playback for chunk in response.iter_bytes(CHUNK_SIZE): if first_chunk_time is None: first_chunk_time = (time.time() - start_time) * 1000 message.first_chunk_time_ms = first_chunk_time logger.debug(f"First chunk latency for {message.id}: {first_chunk_time:.1f}ms") stream.write(chunk) # Mark as completed message.status = AudioStatus.COMPLETED total_time = (time.time() - start_time) * 1000 logger.info(f"Completed audio message {message.id} in {total_time:.1f}ms") except Exception as e: # Mark as failed and log error message.status = AudioStatus.FAILED message.error_message = str(e) logger.error(f"Failed to process audio message {message.id}: {e}") finally: # Signal completion regardless of success/failure message.completion_event.set() def speak(self, text: str, voice: str = "alloy", model: str = "gpt-4o-mini-tts", instructions: Optional[str] = None, blocking: bool = False) -> Dict[str, Any]: """Add text to speech queue and optionally wait for completion Args: text: Text to convert to speech voice: OpenAI voice to use model: OpenAI TTS model to use blocking: If True, wait for audio to complete playing before returning Returns: Dict with success status, message ID, and queue position (returns immediately if non-blocking) """ try: # Create audio message message = AudioMessage( text=text, instructions=instructions, voice=voice, model=model ) # Add to queue self._audio_queue.put(message) queue_size = self._audio_queue.qsize() # Truncate text for response display_text = text[:50] + "..." if len(text) > 50 else text # If blocking mode, wait for completion if blocking: logger.info(f"Waiting for audio message {message.id} to complete...") message.completion_event.wait() if message.status == AudioStatus.FAILED: return { "success": False, "message": f"Failed to play audio: {message.error_message}", "error": message.error_message } return { "success": True, "message": f"Completed: {display_text}", "message_id": message.id, "status": message.status.value, "voice": voice, "model": model } logger.info(f"Queued audio message '{display_text}' (queue size: {queue_size})") return { "success": True, "message": f"Queued: {display_text}", "message_id": message.id, "queue_position": queue_size, "voice": voice, "model": model } except Exception as e: logger.error(f"Error queuing TTS message: {e}") return { "success": False, "message": f"Error: {str(e)}", "error": str(e) } def __del__(self): """Cleanup on destruction""" try: logger.info("Shutting down TTS engine") # Stop the worker thread self._stop_worker.set() if self._worker_thread and self._worker_thread.is_alive(): self._worker_thread.join(timeout=5.0) # Clean up PyAudio if hasattr(self, '_pa') and self._pa: try: self._pa.terminate() logger.debug("PyAudio terminated") except Exception as e: logger.warning(f"Error terminating PyAudio: {e}") except Exception as e: logger.warning(f"Error during TTS engine cleanup: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/JosefGold/tts-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server