"""Move classes for the Reachy Mini MCP server.
This module provides Move implementations that can be queued and executed
by the RobotManager's movement loop.
"""
from __future__ import annotations
import logging
from typing import Tuple
import numpy as np
from numpy.typing import NDArray
from reachy_mini.motion.move import Move
from reachy_mini.motion.recorded_move import RecordedMoves
from reachy_mini.utils import create_head_pose
from reachy_mini.utils.interpolation import linear_pose_interpolation
from reachy_mini_dances_library.dance_move import DanceMove
logger = logging.getLogger(__name__)
class DanceQueueMove(Move): # type: ignore[misc]
"""Wrapper for dance moves to work with the movement queue system."""
def __init__(self, move_name: str):
"""Initialize a DanceQueueMove."""
self.dance_move = DanceMove(move_name)
self.move_name = move_name
@property
def duration(self) -> float:
"""Duration property required by official Move interface."""
return float(self.dance_move.duration)
def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
"""Evaluate dance move at time t."""
try:
head_pose, antennas, body_yaw = self.dance_move.evaluate(t)
if isinstance(antennas, tuple):
antennas = np.array([antennas[0], antennas[1]])
return (head_pose, antennas, body_yaw)
except Exception as e:
logger.error(f"Error evaluating dance move '{self.move_name}' at t={t}: {e}")
neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
return (neutral_head_pose, np.array([0.0, 0.0], dtype=np.float64), 0.0)
class EmotionQueueMove(Move): # type: ignore[misc]
"""Wrapper for emotion moves to work with the movement queue system."""
def __init__(self, emotion_name: str, recorded_moves: RecordedMoves):
"""Initialize an EmotionQueueMove."""
self.emotion_move = recorded_moves.get(emotion_name)
self.emotion_name = emotion_name
@property
def duration(self) -> float:
"""Duration property required by official Move interface."""
return float(self.emotion_move.duration)
def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
"""Evaluate emotion move at time t."""
try:
head_pose, antennas, body_yaw = self.emotion_move.evaluate(t)
if isinstance(antennas, tuple):
antennas = np.array([antennas[0], antennas[1]])
return (head_pose, antennas, body_yaw)
except Exception as e:
logger.error(f"Error evaluating emotion '{self.emotion_name}' at t={t}: {e}")
neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
return (neutral_head_pose, np.array([0.0, 0.0], dtype=np.float64), 0.0)
class GotoQueueMove(Move): # type: ignore[misc]
"""Wrapper for goto moves to work with the movement queue system."""
def __init__(
self,
target_head_pose: NDArray[np.float32],
start_head_pose: NDArray[np.float32] | None = None,
target_antennas: Tuple[float, float] = (0, 0),
start_antennas: Tuple[float, float] | None = None,
target_body_yaw: float = 0,
start_body_yaw: float | None = None,
duration: float = 1.0,
):
"""Initialize a GotoQueueMove."""
self._duration = duration
self.target_head_pose = target_head_pose
self.start_head_pose = start_head_pose
self.target_antennas = target_antennas
self.start_antennas = start_antennas or (0, 0)
self.target_body_yaw = target_body_yaw
self.start_body_yaw = start_body_yaw or 0
@property
def duration(self) -> float:
"""Duration property required by official Move interface."""
return self._duration
def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
"""Evaluate goto move at time t using linear interpolation."""
try:
t_clamped = max(0, min(1, t / self.duration))
if self.start_head_pose is not None:
start_pose = self.start_head_pose
else:
start_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
head_pose = linear_pose_interpolation(start_pose, self.target_head_pose, t_clamped)
antennas = np.array(
[
self.start_antennas[0] + (self.target_antennas[0] - self.start_antennas[0]) * t_clamped,
self.start_antennas[1] + (self.target_antennas[1] - self.start_antennas[1]) * t_clamped,
],
dtype=np.float64,
)
body_yaw = self.start_body_yaw + (self.target_body_yaw - self.start_body_yaw) * t_clamped
return (head_pose, antennas, body_yaw)
except Exception as e:
logger.error(f"Error evaluating goto move at t={t}: {e}")
target_head_pose_f64 = self.target_head_pose.astype(np.float64)
target_antennas_array = np.array([self.target_antennas[0], self.target_antennas[1]], dtype=np.float64)
return (target_head_pose_f64, target_antennas_array, self.target_body_yaw)
class BreathingMove(Move): # type: ignore[misc]
"""Breathing move with interpolation to neutral and then continuous breathing patterns."""
def __init__(
self,
interpolation_start_pose: NDArray[np.float32],
interpolation_start_antennas: Tuple[float, float],
interpolation_duration: float = 1.0,
):
"""Initialize breathing move."""
self.interpolation_start_pose = interpolation_start_pose
self.interpolation_start_antennas = np.array(interpolation_start_antennas)
self.interpolation_duration = interpolation_duration
self.neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
self.neutral_antennas = np.array([0.0, 0.0])
self.breathing_z_amplitude = 0.005 # 5mm gentle breathing
self.breathing_frequency = 0.1 # Hz (6 breaths per minute)
self.antenna_sway_amplitude = np.deg2rad(15) # 15 degrees
self.antenna_frequency = 0.5 # Hz
@property
def duration(self) -> float:
"""Duration property required by official Move interface."""
return float("inf") # Continuous breathing
def evaluate(self, t: float) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, float | None]:
"""Evaluate breathing move at time t."""
if t < self.interpolation_duration:
interpolation_t = t / self.interpolation_duration
head_pose = linear_pose_interpolation(
self.interpolation_start_pose,
self.neutral_head_pose,
interpolation_t,
)
antennas_interp = (1 - interpolation_t) * self.interpolation_start_antennas + (
interpolation_t * self.neutral_antennas
)
antennas = antennas_interp.astype(np.float64)
else:
breathing_time = t - self.interpolation_duration
z_offset = self.breathing_z_amplitude * np.sin(2 * np.pi * self.breathing_frequency * breathing_time)
head_pose = create_head_pose(x=0, y=0, z=z_offset, roll=0, pitch=0, yaw=0, degrees=True, mm=False)
antenna_sway = self.antenna_sway_amplitude * np.sin(2 * np.pi * self.antenna_frequency * breathing_time)
antennas = np.array([antenna_sway, -antenna_sway], dtype=np.float64)
return (head_pose, antennas, 0.0)