"""Move head tool for Reachy Mini MCP server."""
from __future__ import annotations
import logging
from typing import Any, Dict, Literal, Tuple
from mcp.server.fastmcp import Context, FastMCP
from reachy_mini.utils import create_head_pose
from reachy_mini_mcp.moves import GotoQueueMove
logger = logging.getLogger(__name__)
Direction = Literal["left", "right", "up", "down", "front"]
# Direction to head pose deltas (x, y, z, roll, pitch, yaw) in degrees
DIRECTION_DELTAS: Dict[str, Tuple[int, int, int, int, int, int]] = {
"left": (0, 0, 0, 0, 0, 40),
"right": (0, 0, 0, 0, 0, -40),
"up": (0, 0, 0, 0, -30, 0),
"down": (0, 0, 0, 0, 30, 0),
"front": (0, 0, 0, 0, 0, 0),
}
def register_move_head_tool(mcp: FastMCP) -> None:
"""Register the move_head tool with the MCP server."""
@mcp.tool()
def move_head(
ctx: Context,
direction: Literal["left", "right", "up", "down", "front"],
duration: float = 1.0,
) -> dict[str, Any]:
"""Move the robot's head in a specific direction.
Args:
direction: Direction to move the head.
- "left": Turn head 40 degrees to the left
- "right": Turn head 40 degrees to the right
- "up": Tilt head 30 degrees upward
- "down": Tilt head 30 degrees downward
- "front": Return head to neutral forward position
duration: How long the movement should take in seconds (default 1.0).
Returns:
Status dict with "status" and "direction" keys.
"""
robot_manager = ctx.request_context.lifespan_context.robot_manager
if not robot_manager.is_connected():
return {"status": "error", "error": "Robot not connected"}
# Get direction deltas
deltas = DIRECTION_DELTAS.get(direction, DIRECTION_DELTAS["front"])
target_pose = create_head_pose(*deltas, degrees=True)
# Get current state for smooth interpolation
try:
robot = robot_manager.robot
if robot is None:
return {"status": "error", "error": "Robot not available"}
current_head_pose = robot.get_current_head_pose()
_, current_antennas = robot.get_current_joint_positions()
# Create goto move
goto_move = GotoQueueMove(
target_head_pose=target_pose,
start_head_pose=current_head_pose,
target_antennas=(0.0, 0.0),
start_antennas=(current_antennas[0], current_antennas[1]),
target_body_yaw=0.0,
start_body_yaw=0.0,
duration=duration,
)
robot_manager.queue_move(goto_move)
logger.info(f"Queued move_head: {direction} ({duration}s)")
return {"status": "queued", "direction": direction, "duration": duration}
except Exception as e:
logger.error(f"Failed to queue move_head: {e}")
return {"status": "error", "error": str(e)}