"""Speaker tool for Reachy Mini MCP server."""
from __future__ import annotations
import logging
from typing import Any
from mcp.server.fastmcp import Context, FastMCP
logger = logging.getLogger(__name__)
# TTS availability check
try:
from pocket_tts import TTSModel
TTS_AVAILABLE = True
except ImportError:
logger.warning("pocket-tts not available. Install with: pip install pocket-tts")
TTSModel = None
TTS_AVAILABLE = False
# Global TTS model instance (lazy loaded)
_tts_model = None
_voice_state = None
# Built-in voices (no voice cloning required)
AVAILABLE_VOICES = ["alba", "marius", "javert", "jean", "fantine", "cosette", "eponine", "azelma"]
DEFAULT_VOICE = "alba"
def _get_tts_model(voice: str | None = None):
"""Get or initialize the TTS model and voice state (lazy loading).
Args:
voice: Voice name to use. Can be a built-in voice name (alba, marius, etc.)
or a path for voice cloning (requires HuggingFace access).
If None, uses DEFAULT_VOICE.
Returns:
Tuple of (tts_model, voice_state) or (None, None) if unavailable.
"""
global _tts_model, _voice_state
if not TTS_AVAILABLE or TTSModel is None:
return None, None
voice_to_use = voice or DEFAULT_VOICE
# Load model if not already loaded
if _tts_model is None:
logger.info("Loading TTS model...")
_tts_model = TTSModel.load_model()
logger.info("TTS model loaded successfully")
# Load voice state (cache default voice, always load custom voices fresh)
if voice is None:
# Use cached default voice state
if _voice_state is None:
logger.info(f"Loading default voice: {voice_to_use}")
_voice_state = _tts_model.get_state_for_audio_prompt(voice_to_use)
if _voice_state is None:
logger.error(f"Failed to load default voice: {voice_to_use}")
return _tts_model, _voice_state
else:
# Load custom voice fresh each time
logger.info(f"Loading voice: {voice_to_use}")
custom_voice_state = _tts_model.get_state_for_audio_prompt(voice_to_use)
if custom_voice_state is None:
logger.error(f"Failed to load voice: {voice_to_use}")
return _tts_model, custom_voice_state
def register_speaker_tool(mcp: FastMCP) -> None:
"""Register the speaker tools with the MCP server."""
@mcp.tool()
def speak(
ctx: Context,
text: str,
voice: str | None = None,
) -> dict[str, Any]:
"""Make the Reachy Mini robot speak using text-to-speech.
Uses the pocket-tts library to convert text to speech and streams it
through the robot's speaker.
Args:
text: The text to speak.
voice: Voice to use. Available built-in voices:
alba (default), marius, javert, jean, fantine, cosette, eponine, azelma.
For voice cloning (requires HuggingFace access), provide a path like
"hf://kyutai/tts-voices/alba-mackenna/casual.wav".
Returns:
Status dict with "status" key indicating success or error.
"""
robot_manager = ctx.request_context.lifespan_context.robot_manager
if not robot_manager.is_connected():
return {"status": "error", "error": "Robot not connected"}
if not TTS_AVAILABLE:
return {
"status": "error",
"error": "TTS not available. Install with: pip install pocket-tts",
}
try:
import threading
import numpy as np
# Get TTS model and voice state
tts_model, voice_state = _get_tts_model(voice)
if tts_model is None:
return {"status": "error", "error": "Failed to load TTS model"}
if voice_state is None:
return {
"status": "error",
"error": f"Failed to load voice: {voice or DEFAULT_VOICE}",
}
# Start streaming in background thread to avoid MCP timeout
def _generate_and_stream():
try:
robot = robot_manager.robot
if robot is None:
logger.error("Robot not available for streaming")
return
import time
from scipy.signal import resample
from reachy_mini_mcp.audio.speech_tapper import SwayRollRT
input_sample_rate = tts_model.sample_rate
output_sample_rate = robot.media.get_output_audio_samplerate()
# Initialize head wobbler for speech animation
sway = SwayRollRT()
# Enable head tracking for speech animation
robot_manager.set_head_tracking(True)
robot.media.start_playing()
# Track timing to sync push rate with playback rate
start_time = time.monotonic()
total_samples_pushed = 0
movement_idx = 0
movement_results: list = []
# Stream audio chunks as they're generated
for audio_chunk in tts_model.generate_audio_stream(voice_state, text):
# Convert to float32 numpy array
chunk_data = audio_chunk.numpy().astype(np.float32).flatten()
# Analyze audio for head movement (use original sample rate)
results = sway.feed(chunk_data, input_sample_rate)
movement_results.extend(results)
# Resample for audio playback if needed
if input_sample_rate != output_sample_rate:
num_samples = int(len(chunk_data) * output_sample_rate / input_sample_rate)
chunk_data = resample(chunk_data, num_samples).astype(np.float32)
robot.media.push_audio_sample(chunk_data)
total_samples_pushed += len(chunk_data)
# Apply head movements accumulated so far
while movement_idx < len(movement_results):
r = movement_results[movement_idx]
offsets = (
r["x_mm"] / 1000.0,
r["y_mm"] / 1000.0,
r["z_mm"] / 1000.0,
r["roll_rad"],
r["pitch_rad"],
r["yaw_rad"],
)
robot_manager.set_head_tracking_offsets(offsets)
movement_idx += 1
# Calculate target time and sleep only the remaining difference
target_time = start_time + (total_samples_pushed / output_sample_rate)
sleep_time = target_time - time.monotonic()
if sleep_time > 0:
time.sleep(sleep_time)
# Reset head tracking offsets after speech
time.sleep(1) # brief pause to ensure last movements are applied
robot_manager.set_head_tracking_offsets((0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
robot.media.stop_playing()
except Exception as e:
logger.error(f"TTS streaming error: {e}")
try:
robot_manager.set_head_tracking_offsets((0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
robot_manager.robot.media.stop_playing()
except Exception:
pass
logger.info(f"Starting speech stream for: {text[:50]}...")
stream_thread = threading.Thread(target=_generate_and_stream, daemon=True)
stream_thread.start()
return {
"status": "speaking",
"text": text,
"voice": voice or DEFAULT_VOICE,
}
except Exception as e:
logger.error(f"TTS error: {e}")
return {"status": "error", "error": str(e)}