"""Emotion tool for Reachy Mini MCP server."""
from __future__ import annotations
import logging
from typing import Any
from mcp.server.fastmcp import Context, FastMCP
from reachy_mini_mcp.moves import EmotionQueueMove
logger = logging.getLogger(__name__)
# Available emotions
try:
from reachy_mini.motion.recorded_move import RecordedMoves
RECORDED_MOVES = RecordedMoves("pollen-robotics/reachy-mini-emotions-library")
EMOTION_AVAILABLE = True
EMOTION_LIST = RECORDED_MOVES.list_moves()
except ImportError:
logger.warning("Emotion library not available")
RECORDED_MOVES = None
EMOTION_AVAILABLE = False
EMOTION_LIST = []
def get_emotions_description() -> str:
"""Get formatted description of available emotions."""
if not EMOTION_AVAILABLE or RECORDED_MOVES is None:
return "Emotions not available"
try:
lines = []
for name in EMOTION_LIST:
desc = RECORDED_MOVES.get(name).description
lines.append(f"- {name}: {desc}")
return "\n ".join(lines)
except Exception:
return ", ".join(EMOTION_LIST)
def register_emotion_tool(mcp: FastMCP) -> None:
"""Register the play_emotion tool with the MCP server."""
@mcp.tool()
def play_emotion(
ctx: Context,
emotion: str,
) -> dict[str, Any]:
f"""Play a pre-recorded emotion on the Reachy Mini robot.
Args:
emotion: Name of the emotion to play.
Available emotions:
{get_emotions_description()}
Returns:
Status dict with "status" and "emotion" keys.
"""
if not EMOTION_AVAILABLE or RECORDED_MOVES is None:
return {"status": "error", "error": "Emotion library not available"}
robot_manager = ctx.request_context.lifespan_context.robot_manager
if not robot_manager.is_connected():
return {"status": "error", "error": "Robot not connected"}
# Validate emotion
if emotion not in EMOTION_LIST:
return {
"status": "error",
"error": f"Unknown emotion '{emotion}'. Available: {EMOTION_LIST}",
}
# Queue the emotion
emotion_move = EmotionQueueMove(emotion, RECORDED_MOVES)
robot_manager.queue_move(emotion_move)
logger.info(f"Queued emotion: {emotion}")
return {"status": "queued", "emotion": emotion}