server.py•9.7 kB
import asyncio
import logging
import io
# import base64 # No longer needed for returning audio data
import json
from typing import Sequence
# MCP imports mirroring the time server
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from mcp.shared.exceptions import McpError
# Kokoro and audio processing imports
from kokoro import KPipeline
import soundfile as sf
import torch
import numpy as np
import sounddevice as sd # Added for playback
logger = logging.getLogger(__name__)
# --- Constants ---
TOOL_NAME = "synthesize_and_play_speech" # Renamed tool for clarity
DEFAULT_LANG_CODE = 'a'
DEFAULT_VOICE = 'af_heart'
KOKORO_SAMPLE_RATE = 24000
# --- Core TTS Logic Class ---
class KokoroTtsServer:
"""Encapsulates the Kokoro TTS pipeline and synthesis logic."""
def __init__(self, default_lang_code: str = DEFAULT_LANG_CODE):
self.pipeline: KPipeline | None = None
self.default_lang_code = default_lang_code
self._initialize_pipeline()
def _initialize_pipeline(self):
"""Initializes the KPipeline instance."""
if self.pipeline is None:
logger.info(f"Initializing Kokoro KPipeline with lang_code='{self.default_lang_code}'...")
try:
self.pipeline = KPipeline(lang_code=self.default_lang_code)
logger.info("Kokoro KPipeline initialized.")
except Exception as e:
logger.error(f"Failed to initialize Kokoro KPipeline: {e}", exc_info=True)
raise RuntimeError(f"Kokoro KPipeline initialization failed: {e}")
def synthesize_and_play(
self,
text: str,
voice: str | None = DEFAULT_VOICE,
) -> str:
"""
Synthesizes text to speech and plays it for the user.
Args:
text: The text to synthesize.
voice: The voice to use (e.g., 'af_heart').
Returns:
A status message indicating the outcome.
Raises:
RuntimeError: If the pipeline is not initialized or synthesis/playback fails.
ValueError: If input text is empty.
"""
if self.pipeline is None:
logger.error("Action failed: Kokoro pipeline is not initialized.")
raise RuntimeError("TTS Pipeline not available.")
if not text.strip():
logger.warning("Action requested for empty text.")
raise ValueError("Input text cannot be empty.")
current_voice = voice or DEFAULT_VOICE
logger.info(f"Synthesizing speech for: '{text[:50]}...' with voice: {current_voice}")
try:
all_audio_chunks = []
generator = self.pipeline(text=text, voice=current_voice)
for i, (_gs, _ps, audio_chunk) in enumerate(generator):
if isinstance(audio_chunk, torch.Tensor):
audio_chunk = audio_chunk.detach().cpu().numpy()
if not isinstance(audio_chunk, np.ndarray):
try:
audio_chunk = np.array(audio_chunk)
except Exception:
logger.warning(f"Audio chunk {i} type {type(audio_chunk)} not convertible to np.array, skipping.")
continue # Skip this chunk
if audio_chunk is not None and audio_chunk.size > 0:
all_audio_chunks.append(audio_chunk)
else:
logger.warning(f"Received empty or invalid audio chunk at index {i}.")
if not all_audio_chunks:
logger.error("No valid audio data generated by Kokoro pipeline.")
raise RuntimeError("TTS generation failed: No audio output received from pipeline")
final_audio: np.ndarray
if len(all_audio_chunks) > 1:
if all(isinstance(chunk, np.ndarray) for chunk in all_audio_chunks):
try:
final_audio = np.concatenate(all_audio_chunks, axis=0)
except ValueError as concat_err:
logger.error(f"Failed to concatenate audio chunks: {concat_err}", exc_info=True)
raise RuntimeError(f"Audio chunk concatenation failed: {concat_err}")
else:
valid_chunks = [chunk for chunk in all_audio_chunks if isinstance(chunk, np.ndarray)]
if not valid_chunks:
raise RuntimeError("No valid numpy audio chunks found to process after filtering.")
final_audio = np.concatenate(valid_chunks, axis=0) if len(valid_chunks) > 1 else valid_chunks[0]
logger.warning("Some audio chunks were not numpy arrays and were excluded from concatenation.")
elif isinstance(all_audio_chunks[0], np.ndarray):
final_audio = all_audio_chunks[0]
else:
raise RuntimeError("Single audio chunk is not a valid numpy array.")
logger.info(f"Successfully synthesized audio. Attempting playback...")
sd.play(final_audio, KOKORO_SAMPLE_RATE, blocking=False) # Play non-blocking
# sd.wait() # Would block until playback is finished, usually not desired for MCP server response.
return f"Audio playback initiated for: '{text[:100]}{'...' if len(text) > 100 else ''}'"
except (ValueError, RuntimeError) as e:
logger.error(f"Error during synthesis or playback: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error during speech synthesis or playback: {e}", exc_info=True)
raise RuntimeError(f"Unexpected TTS synthesis/playback failed: {str(e)}")
# --- MCP Server Setup ---
async def main():
"""Main function to run the MCP server."""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.info("Starting TTS MCP Server with Kokoro (Audio Playback Mode)...")
try:
kokoro_tts_server = KokoroTtsServer()
except RuntimeError as e:
logger.critical(f"Failed to initialize TTS Server: {e}. Exiting.")
return
server = Server("mcp-server-tts-player") # Slightly changed server name for clarity
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available TTS tools."""
logger.info("Listing available tools...")
return [
Tool(
name=TOOL_NAME, # Updated tool name
description="Generates speech from text using Kokoro and plays it on the server's audio output.",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The text to synthesize and play.",
},
"voice": {
"type": "string",
"description": f"Optional voice to use (e.g., 'af_heart'). Defaults to '{DEFAULT_VOICE}'.",
"default": DEFAULT_VOICE,
},
"lang_code": {
"type": "string",
"description": f"Optional language code. Pipeline uses '{kokoro_tts_server.default_lang_code}'.",
"default": kokoro_tts_server.default_lang_code,
}
},
"required": ["text"],
},
)
]
@server.call_tool()
async def call_tool(
name: str, arguments: dict
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls for TTS and playback."""
logger.info(f"Received tool call: {name} with args: {arguments}")
if name != TOOL_NAME:
logger.error(f"Unknown tool called: {name}")
raise ValueError(f"Unknown tool: {name}")
try:
text_to_synthesize = arguments.get("text")
if not text_to_synthesize:
raise ValueError("Missing required argument: text")
voice = arguments.get("voice", DEFAULT_VOICE)
status_message = kokoro_tts_server.synthesize_and_play(
text=text_to_synthesize,
voice=voice,
)
result_payload = {
"status": "success",
"message": status_message,
}
return [TextContent(type="text", text=json.dumps(result_payload, indent=2))]
except (ValueError, RuntimeError) as e:
logger.warning(f"Tool call failed: {e}")
raise ValueError(f"Error processing TTS request: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error handling tool call {name}: {e}", exc_info=True)
raise Exception(f"Unexpected error during TTS tool execution: {str(e)}")
options = server.create_initialization_options()
logger.info("Starting MCP server run loop via stdio...")
try:
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
except KeyboardInterrupt:
logger.info("TTS MCP Server shutting down due to KeyboardInterrupt...")
except Exception as e:
logger.error(f"Server run loop encountered an error: {e}", exc_info=True)
finally:
logger.info("TTS MCP Server has stopped.")
if __name__ == "__main__":
asyncio.run(main())