"""Robot manager for Reachy Mini MCP server.
This module provides a simplified RobotManager that handles:
- Robot SDK connection
- 100Hz movement control loop
- Move queue management
- Face tracking offsets (when enabled)
"""
from __future__ import annotations
import base64
import logging
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from queue import Empty, Queue
from typing import Any, Dict, Tuple
import numpy as np
from numpy.typing import NDArray
from reachy_mini import ReachyMini
from reachy_mini.motion.move import Move
from reachy_mini.utils import create_head_pose
from reachy_mini_mcp.config import Config
from reachy_mini_mcp.moves import BreathingMove
logger = logging.getLogger(__name__)
# Control loop frequency
CONTROL_LOOP_FREQUENCY_HZ = 100.0
# Type definitions
FullBodyPose = Tuple[NDArray[np.float32], Tuple[float, float], float]
def clone_full_body_pose(pose: FullBodyPose) -> FullBodyPose:
"""Create a deep copy of a full body pose tuple."""
head, antennas, body_yaw = pose
return (head.copy(), (float(antennas[0]), float(antennas[1])), float(body_yaw))
@dataclass
class RobotState:
"""State tracking for the robot."""
current_move: Move | None = None
move_start_time: float | None = None
last_activity_time: float = 0.0
last_primary_pose: FullBodyPose | None = None
head_tracking_enabled: bool = False
head_tracking_offsets: Tuple[float, float, float, float, float, float] = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
@dataclass
class RobotStatus:
"""Status information for external queries."""
connected: bool = False
queue_size: int = 0
current_move: str | None = None
breathing_active: bool = False
head_tracking_enabled: bool = False
last_pose: Dict[str, Any] = field(default_factory=dict)
class RobotManager:
"""Manages Reachy Mini robot connection and movement control.
Provides a simplified 100Hz control loop for executing queued moves.
"""
def __init__(self, config: Config, robot: ReachyMini | None = None):
"""Initialize the robot manager.
Args:
config: Configuration settings.
robot: Optional pre-connected ReachyMini instance (e.g., from ReachyMiniApp).
If provided, connect() will be skipped and disconnect() won't close it.
"""
self.config = config
self.robot: ReachyMini | None = robot
self._connected = robot is not None
self._externally_managed = robot is not None # Don't close robot if externally provided
self._now = time.monotonic
# Movement state
self.state = RobotState()
self.move_queue: deque[Move] = deque()
# Configuration
self.idle_inactivity_delay = 0.5 # seconds
self.target_frequency = CONTROL_LOOP_FREQUENCY_HZ
self.target_period = 1.0 / self.target_frequency
# Threading
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._command_queue: Queue[Tuple[str, Any]] = Queue()
self._status_lock = threading.Lock()
self._breathing_active = False
# Last commanded pose for status
neutral_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
self._last_commanded_pose: FullBodyPose = (neutral_pose, (0.0, 0.0), 0.0)
self.state.last_primary_pose = clone_full_body_pose(self._last_commanded_pose)
self.state.last_activity_time = self._now()
# Error throttling
self._last_set_target_err = 0.0
self._set_target_err_interval = 1.0
def connect(self) -> bool:
"""Connect to the Reachy Mini robot.
If a robot instance was provided in __init__, this is a no-op.
"""
# Already connected via constructor
if self._connected and self.robot is not None:
logger.info("Robot already connected (provided via constructor)")
return True
try:
logger.info(f"Connecting to robot: {self.config.robot_name}")
self.robot = ReachyMini()
self._connected = True
logger.info("Robot connected successfully")
return True
except Exception as e:
logger.error(f"Failed to connect to robot: {e}")
self._connected = False
return False
def disconnect(self) -> None:
"""Disconnect from the robot.
If the robot was externally provided (e.g., from ReachyMiniApp),
this only resets to neutral but doesn't close the connection.
"""
if self.robot is not None:
try:
# Reset to neutral position
neutral_head_pose = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
self.robot.goto_target(
head=neutral_head_pose,
antennas=[0.0, 0.0],
duration=1.0,
body_yaw=0.0,
)
except Exception as e:
logger.warning(f"Failed to reset to neutral: {e}")
# Clear robot reference if we created it (SDK handles cleanup internally)
if not self._externally_managed:
self.robot = None
self._connected = False
logger.info("Robot disconnected")
def is_connected(self) -> bool:
"""Check if robot is connected."""
return self._connected and self.robot is not None
def queue_move(self, move: Move) -> None:
"""Queue a move to be executed."""
self._command_queue.put(("queue_move", move))
def clear_queue(self) -> int:
"""Clear the move queue and stop current move. Returns number of cleared moves."""
self._command_queue.put(("clear_queue", None))
return len(self.move_queue)
def set_head_tracking(self, enabled: bool) -> None:
"""Enable or disable head tracking."""
self._command_queue.put(("set_head_tracking", enabled))
def set_head_tracking_offsets(self, offsets: Tuple[float, float, float, float, float, float]) -> None:
"""Set face tracking offsets (x, y, z, roll, pitch, yaw)."""
self._command_queue.put(("set_head_tracking_offsets", offsets))
def get_status(self) -> RobotStatus:
"""Get current robot status."""
with self._status_lock:
pose_snapshot = clone_full_body_pose(self._last_commanded_pose)
current_move_name = None
if self.state.current_move is not None:
current_move_name = getattr(self.state.current_move, "move_name", None) or getattr(
self.state.current_move, "emotion_name", None
)
if current_move_name is None:
current_move_name = type(self.state.current_move).__name__
return RobotStatus(
connected=self._connected,
queue_size=len(self.move_queue),
current_move=current_move_name,
breathing_active=self._breathing_active,
head_tracking_enabled=self.state.head_tracking_enabled,
last_pose={
"head": pose_snapshot[0].tolist(),
"antennas": list(pose_snapshot[1]),
"body_yaw": pose_snapshot[2],
},
)
def capture_image(self) -> Tuple[str, str] | None:
"""Capture an image from the robot's camera.
Returns:
Tuple of (base64_data, format) or None if camera not available.
"""
if not self.config.enable_camera:
return None
if self.robot is None:
return None
try:
frame = self.robot.media.get_frame()
if frame is None:
return None
# Import cv2 only when needed
import cv2
success, buffer = cv2.imencode(".jpg", frame)
if not success:
return None
b64_data = base64.b64encode(buffer.tobytes()).decode("utf-8")
return (b64_data, "jpeg")
except Exception as e:
logger.error(f"Failed to capture image: {e}")
return None
def play_sound(self, sound_file: str) -> bool:
"""Play a sound file on the robot's speaker.
Args:
sound_file: Path to the sound file to play.
Returns:
True if playback started successfully, False otherwise.
"""
if self.robot is None:
return False
try:
self.robot.media.play_sound(sound_file)
return True
except Exception as e:
logger.error(f"Failed to play sound: {e}")
return False
def stream_audio(
self, audio_data: NDArray[np.float32], sample_rate: int, blocking: bool = False
) -> bool:
"""Stream audio data to the robot's speaker.
Args:
audio_data: Audio samples as float32 numpy array (mono).
sample_rate: Sample rate of the input audio.
blocking: If True, wait for audio to finish. If False, play in background.
Returns:
True if streaming started successfully, False otherwise.
"""
if self.robot is None:
return False
try:
from scipy.signal import resample
# Get robot's output sample rate
output_sample_rate = self.robot.media.get_output_audio_samplerate()
# Ensure mono audio
if audio_data.ndim == 2:
if audio_data.shape[1] > audio_data.shape[0]:
audio_data = audio_data.T
if audio_data.shape[1] > 1:
audio_data = audio_data[:, 0]
audio_data = audio_data.flatten()
# Ensure float32
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
# Resample if needed
if sample_rate != output_sample_rate:
num_samples = int(len(audio_data) * output_sample_rate / sample_rate)
audio_data = resample(audio_data, num_samples).astype(np.float32)
def _play_audio():
try:
# Calculate audio duration for waiting
audio_duration = len(audio_data) / output_sample_rate
# Stream to robot in chunks
chunk_size = output_sample_rate // 10 # 100ms chunks
self.robot.media.start_playing()
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
self.robot.media.push_audio_sample(chunk)
# Wait for audio to finish playing before stopping
time.sleep(audio_duration + 0.1) # Add small buffer
self.robot.media.stop_playing()
except Exception as e:
logger.error(f"Failed during audio playback: {e}")
try:
self.robot.media.stop_playing()
except Exception:
pass
if blocking:
_play_audio()
else:
# Play in background thread
audio_thread = threading.Thread(target=_play_audio, daemon=True)
audio_thread.start()
return True
except Exception as e:
logger.error(f"Failed to stream audio: {e}")
return False
def start(self) -> None:
"""Start the movement control loop."""
if self._thread is not None and self._thread.is_alive():
logger.warning("Movement loop already running")
return
if not self.is_connected():
logger.error("Cannot start movement loop: robot not connected")
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._control_loop, daemon=True)
self._thread.start()
logger.info("Movement control loop started")
def stop(self) -> None:
"""Stop the movement control loop."""
if self._thread is None or not self._thread.is_alive():
return
logger.info("Stopping movement control loop...")
self.clear_queue()
self._stop_event.set()
self._thread.join(timeout=2.0)
self._thread = None
logger.info("Movement control loop stopped")
def _control_loop(self) -> None:
"""Main 100Hz control loop."""
logger.debug("Starting control loop")
while not self._stop_event.is_set():
loop_start = self._now()
# Process commands
self._process_commands()
# Manage move queue
self._manage_queue(loop_start)
# Manage idle breathing
self._manage_breathing(loop_start)
# Get current pose
head, antennas, body_yaw = self._get_current_pose(loop_start)
# Apply head tracking offsets if enabled
if self.state.head_tracking_enabled:
head = self._apply_tracking_offsets(head)
# Send to robot
self._send_to_robot(head, antennas, body_yaw)
# Sleep to maintain frequency
elapsed = self._now() - loop_start
sleep_time = max(0.0, self.target_period - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
logger.debug("Control loop stopped")
def _process_commands(self) -> None:
"""Process queued commands."""
while True:
try:
command, payload = self._command_queue.get_nowait()
except Empty:
break
if command == "queue_move":
if isinstance(payload, Move):
self.move_queue.append(payload)
self.state.last_activity_time = self._now()
logger.debug(f"Queued move, queue size: {len(self.move_queue)}")
elif command == "clear_queue":
self.move_queue.clear()
self.state.current_move = None
self.state.move_start_time = None
self._breathing_active = False
logger.info("Cleared move queue")
elif command == "set_head_tracking":
self.state.head_tracking_enabled = bool(payload)
logger.info(f"Head tracking: {'enabled' if payload else 'disabled'}")
elif command == "set_head_tracking_offsets":
if isinstance(payload, tuple) and len(payload) == 6:
self.state.head_tracking_offsets = payload
def _manage_queue(self, current_time: float) -> None:
"""Manage the move queue."""
# Check if current move is finished
if self.state.current_move is not None and self.state.move_start_time is not None:
elapsed = current_time - self.state.move_start_time
if elapsed >= self.state.current_move.duration:
self.state.current_move = None
self.state.move_start_time = None
# Start next move if available
if self.state.current_move is None and self.move_queue:
self.state.current_move = self.move_queue.popleft()
self.state.move_start_time = current_time
self._breathing_active = isinstance(self.state.current_move, BreathingMove)
logger.debug(f"Starting move: {type(self.state.current_move).__name__}")
def _manage_breathing(self, current_time: float) -> None:
"""Start breathing when idle."""
if (
self.state.current_move is None
and not self.move_queue
and not self._breathing_active
and self.robot is not None
):
idle_time = current_time - self.state.last_activity_time
if idle_time >= self.idle_inactivity_delay:
try:
_, current_antennas = self.robot.get_current_joint_positions()
current_head_pose = self.robot.get_current_head_pose()
breathing_move = BreathingMove(
interpolation_start_pose=current_head_pose,
interpolation_start_antennas=current_antennas,
interpolation_duration=1.0,
)
self.move_queue.append(breathing_move)
self._breathing_active = True
self.state.last_activity_time = current_time
logger.debug("Started breathing")
except Exception as e:
logger.error(f"Failed to start breathing: {e}")
# Stop breathing if new moves queued
if isinstance(self.state.current_move, BreathingMove) and self.move_queue:
self.state.current_move = None
self.state.move_start_time = None
self._breathing_active = False
logger.debug("Stopped breathing for new move")
def _get_current_pose(self, current_time: float) -> FullBodyPose:
"""Get the current pose from active move or last pose."""
if self.state.current_move is not None and self.state.move_start_time is not None:
move_time = current_time - self.state.move_start_time
head, antennas, body_yaw = self.state.current_move.evaluate(move_time)
if head is None:
head = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
if antennas is None:
antennas = np.array([0.0, 0.0])
if body_yaw is None:
body_yaw = 0.0
antennas_tuple = (float(antennas[0]), float(antennas[1]))
pose = (head.copy(), antennas_tuple, float(body_yaw))
self.state.last_primary_pose = clone_full_body_pose(pose)
return pose
elif self.state.last_primary_pose is not None:
return clone_full_body_pose(self.state.last_primary_pose)
else:
neutral = create_head_pose(0, 0, 0, 0, 0, 0, degrees=True)
return (neutral, (0.0, 0.0), 0.0)
def _apply_tracking_offsets(self, head: NDArray[np.float32]) -> NDArray[np.float32]:
"""Apply face tracking offsets to head pose."""
from reachy_mini.utils.interpolation import compose_world_offset
offsets = self.state.head_tracking_offsets
offset_pose = create_head_pose(
x=offsets[0],
y=offsets[1],
z=offsets[2],
roll=offsets[3],
pitch=offsets[4],
yaw=offsets[5],
degrees=False,
mm=False,
)
return compose_world_offset(head, offset_pose, reorthonormalize=True)
def _send_to_robot(self, head: NDArray[np.float32], antennas: Tuple[float, float], body_yaw: float) -> None:
"""Send pose to robot."""
if self.robot is None:
return
try:
self.robot.set_target(head=head, antennas=antennas, body_yaw=body_yaw)
with self._status_lock:
self._last_commanded_pose = clone_full_body_pose((head, antennas, body_yaw))
except Exception as e:
now = self._now()
if now - self._last_set_target_err >= self._set_target_err_interval:
logger.error(f"Failed to set robot target: {e}")
self._last_set_target_err = now