"""MCP server for Reachy Mini robot control.
This module provides the main MCP server implementation using FastMCP.
It initializes the robot on startup and exposes tools for controlling it.
Supports two modes:
1. Standalone: Run directly via `reachy-mini-mcp` command (connects to robot itself)
2. App mode: Run as a ReachyMiniApp (receives pre-connected robot from dashboard)
"""
from __future__ import annotations
import asyncio
import logging
import threading
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from reachy_mini import ReachyMini, ReachyMiniApp
from reachy_mini_mcp.config import Config
from reachy_mini_mcp.context import AppContext
from reachy_mini_mcp.robot_manager import RobotManager
from reachy_mini_mcp.tools.camera import register_camera_tool
from reachy_mini_mcp.tools.dance import register_dance_tool
from reachy_mini_mcp.tools.emotion import register_emotion_tool
from reachy_mini_mcp.tools.get_status import register_get_status_tool
from reachy_mini_mcp.tools.head_tracking import register_head_tracking_tool
from reachy_mini_mcp.tools.move_head import register_move_head_tool
from reachy_mini_mcp.tools.speaker import register_speaker_tool
from reachy_mini_mcp.tools.stop_motion import register_stop_motion_tool
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def register_all_tools(server: FastMCP) -> None:
"""Register all MCP tools on the given server instance."""
register_dance_tool(server)
register_emotion_tool(server)
register_move_head_tool(server)
register_camera_tool(server)
register_head_tracking_tool(server)
register_speaker_tool(server)
register_stop_motion_tool(server)
register_get_status_tool(server)
def create_lifespan(
robot: Optional[ReachyMini] = None,
stop_event: Optional[threading.Event] = None,
):
"""Create a lifespan context manager with optional pre-connected robot.
Args:
robot: Pre-connected ReachyMini instance (from ReachyMiniApp mode).
stop_event: Threading event for graceful shutdown (from ReachyMiniApp mode).
"""
@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[AppContext]:
"""Lifespan context manager for the MCP server.
Initializes the robot connection on startup and cleans up on shutdown.
"""
# Load configuration
config = Config.from_env()
logger.info(f"Starting Reachy Mini MCP server with config: {config}")
# Initialize robot manager (with pre-connected robot if provided)
robot_manager = RobotManager(config, robot=robot)
# Connect to robot (no-op if robot was already provided)
if robot_manager.connect():
# Start the movement control loop
robot_manager.start()
logger.info("Robot initialized and movement loop started")
else:
logger.warning("Failed to connect to robot - running in disconnected mode")
# Create context
ctx = AppContext(robot_manager=robot_manager, config=config)
# If we have a stop event, monitor it for shutdown
shutdown_task = None
if stop_event is not None:
async def monitor_stop_event():
while not stop_event.is_set():
await asyncio.sleep(0.1)
logger.info("Stop event received, initiating shutdown...")
shutdown_task = asyncio.create_task(monitor_stop_event())
try:
yield ctx
finally:
# Cleanup
logger.info("Shutting down Reachy Mini MCP server...")
if shutdown_task is not None:
shutdown_task.cancel()
try:
await shutdown_task
except asyncio.CancelledError:
pass
robot_manager.stop()
robot_manager.disconnect()
logger.info("Cleanup complete")
return lifespan
# Default lifespan for standalone mode
@asynccontextmanager
async def default_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Default lifespan for standalone mode (connects to robot itself)."""
async with create_lifespan()(server) as ctx:
yield ctx
# Create the MCP server for standalone mode
mcp = FastMCP(
"Reachy Mini",
instructions="Control a Reachy Mini robot - dance, express emotions, move head, and more",
lifespan=default_lifespan,
)
# Register tools on the standalone server
register_all_tools(mcp)
def create_mcp_server(
robot: Optional[ReachyMini] = None,
stop_event: Optional[threading.Event] = None,
) -> FastMCP:
"""Create an MCP server instance with optional pre-connected robot.
Args:
robot: Pre-connected ReachyMini instance.
stop_event: Threading event for graceful shutdown.
Returns:
Configured FastMCP server instance.
"""
server = FastMCP(
"Reachy Mini",
instructions="Control a Reachy Mini robot - dance, express emotions, move head, and more",
lifespan=create_lifespan(robot=robot, stop_event=stop_event),
)
# Register tools on the new server instance
register_all_tools(server)
return server
def main() -> None:
"""Entry point for the MCP server (standalone mode)."""
config = Config.from_env()
if config.transport == "sse":
logger.info(f"Starting Reachy Mini MCP server (SSE mode) at {config.sse_url}")
mcp.run(transport="sse",)
else:
logger.info("Starting Reachy Mini MCP server (stdio mode)...")
mcp.run()
class ReachyMiniMCPApp(ReachyMiniApp): # type: ignore[misc]
"""Reachy Mini Apps entry point for the MCP server.
This allows the MCP server to be launched from the Reachy Mini dashboard
with automatic robot connection management.
"""
# These will be set dynamically based on transport config
custom_app_url: str | None = None
dont_start_webserver: bool = True
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
"""Run the MCP server with a pre-connected robot.
Args:
reachy_mini: Pre-connected ReachyMini instance from the dashboard.
stop_event: Threading event signaled when the app should stop.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
config = Config.from_env()
# Create a new MCP server with the pre-connected robot
server = create_mcp_server(robot=reachy_mini, stop_event=stop_event)
try:
if config.transport == "sse":
logger.info(f"Starting Reachy Mini MCP server (app+SSE mode) at {config.sse_url}")
server.run(transport="sse", host=config.sse_host, port=config.sse_port)
else:
logger.info("Starting Reachy Mini MCP server (app+stdio mode)...")
server.run()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received, shutting down...")
finally:
logger.info("MCP server stopped")
if __name__ == "__main__":
# When run directly, use the ReachyMiniMCPApp entry point
app = ReachyMiniMCPApp()
try:
app.wrapped_run()
except KeyboardInterrupt:
app.stop()