Skip to main content
Glama
robot_control.py11.3 kB
"""Robot control portmanteau tool - Unified bot + vbot control.""" from typing import Any, Dict, Literal, Optional import structlog from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error logger = structlog.get_logger(__name__) class RobotControlTool: """Portmanteau tool for unified robot control (bot + vbot).""" def __init__(self, mcp: Any, state_manager: Any): """Initialize robot control tool. Args: mcp: FastMCP server instance. state_manager: Robot state manager instance. """ self.mcp = mcp self.state_manager = state_manager def register(self): """Register robot control tool with MCP server.""" @self.mcp.tool() async def robot_control( robot_id: str, action: Literal[ "get_status", "move", "stop", "return_to_dock", "stand", "sit", "walk", "sync_vbot", ], linear: Optional[float] = None, angular: Optional[float] = None, duration: Optional[float] = None, ) -> Dict[str, Any]: """Unified robot control (works for both physical bot and virtual bot). This portmanteau tool provides a unified interface for controlling both physical robots (via ROS) and virtual robots (via Unity/VRChat). The tool automatically routes commands to the appropriate handler based on robot type. Args: robot_id: Robot identifier (e.g., "scout_01", "vbot_scout_01"). action: Operation to perform: - "get_status": Get robot status (battery, position, state) - "move": Control movement (linear/angular velocity) - "stop": Emergency stop - "return_to_dock": Return to charging dock (physical bot only) - "stand": Stand up (Unitree G1, physical bot only) - "sit": Sit down (Unitree G1, physical bot only) - "walk": Walking gait (Unitree, physical bot only) - "sync_vbot": Sync virtual bot with physical bot state linear: Linear velocity (m/s) for move action. angular: Angular velocity (rad/s) for move action. duration: Movement duration (seconds). **kwargs: Additional action-specific parameters. Returns: Dictionary containing operation result. Examples: Get robot status: result = await robot_control(robot_id="scout_01", action="get_status") Move robot forward: result = await robot_control( robot_id="scout_01", action="move", linear=0.2, angular=0.0 ) Stop robot: result = await robot_control(robot_id="scout_01", action="stop") """ try: robot = self.state_manager.get_robot(robot_id) if not robot: return format_error_response( f"Robot {robot_id} not found", error_type="not_found", robot_id=robot_id, action=action, ) # Route to appropriate handler if robot.is_virtual: return await self._handle_virtual_robot(robot, action, linear, angular, duration) else: return await self._handle_physical_robot(robot, action, linear, angular, duration) except Exception as e: return handle_tool_error("robot_control", e, robot_id=robot_id, action=action) async def _handle_physical_robot( self, robot: Any, action: str, linear: Optional[float], angular: Optional[float], duration: Optional[float], ) -> Dict[str, Any]: """Handle physical robot commands. Args: robot: Robot state. action: Action to perform. linear: Linear velocity. angular: Angular velocity. duration: Movement duration. **kwargs: Additional parameters. Returns: Operation result. """ try: # TODO: Implement ROS bridge integration logger.info("Physical robot command", robot_id=robot.robot_id, action=action) return format_success_response( f"Physical robot {action} command sent (mock - ROS integration pending)", robot_id=robot.robot_id, action=action, data={ "note": "ROS bridge integration not yet implemented", "mock": True, }, ) except Exception as e: return handle_tool_error("_handle_physical_robot", e, robot_id=robot.robot_id, action=action) async def _handle_virtual_robot( self, robot: Any, action: str, linear: Optional[float], angular: Optional[float], duration: Optional[float], ) -> Dict[str, Any]: """Handle virtual robot commands. Args: robot: Robot state. action: Action to perform. linear: Linear velocity. angular: Angular velocity. duration: Movement duration. **kwargs: Additional parameters. Returns: Operation result. """ from fastmcp import Client logger.info("Virtual robot command", robot_id=robot.robot_id, action=action, platform=robot.platform) try: if action == "move": if robot.platform == "unity": # Use avatar-mcp or unity3d-mcp for movement async with Client(self.mcp) as client: # Try avatar-mcp first for smooth locomotion try: await client.call_tool( "avatar_movement_walk", avatar_id=robot.robot_id, direction="forward", speed=linear or 0.0, ) if angular: await client.call_tool( "avatar_movement_turn", avatar_id=robot.robot_id, angle=angular, ) return { "status": "success", "message": f"Virtual robot moved via avatar-mcp", "robot_id": robot.robot_id, "action": action, "linear": linear, "angular": angular, } except Exception: # Fallback to Unity direct control await client.call_tool( "unity_execute_method", class_name="RobotController", method_name="Move", parameters={ "robotId": robot.robot_id, "linear": linear or 0.0, "angular": angular or 0.0, }, ) return { "status": "success", "message": f"Virtual robot moved via Unity", "robot_id": robot.robot_id, "action": action, } elif robot.platform == "vrchat": # Use VRChat OSC for movement async with Client(self.mcp) as client: await client.call_tool( "vrchat_send_osc_message", address=f"/robot/{robot.robot_id}/move", args=[linear or 0.0, angular or 0.0], ) return { "status": "success", "message": f"Virtual robot moved via VRChat OSC", "robot_id": robot.robot_id, "action": action, } elif action == "stop": async with Client(self.mcp) as client: if robot.platform == "vrchat": await client.call_tool( "vrchat_send_osc_message", address=f"/robot/{robot.robot_id}/stop", args=[1], ) else: await client.call_tool( "avatar_movement_walk", avatar_id=robot.robot_id, direction="forward", speed=0.0, ) return { "status": "success", "message": f"Virtual robot stopped", "robot_id": robot.robot_id, "action": action, } elif action == "get_status": return { "status": "success", "robot": robot.to_dict(), "action": action, } else: return { "status": "success", "message": f"Virtual robot {action} command sent", "robot_id": robot.robot_id, "action": action, } except Exception as e: return handle_tool_error("_handle_virtual_robot", e, robot_id=robot.robot_id, action=action) async def handle_action(self, robot_id: str, action: str, params: Dict[str, Any]) -> Dict[str, Any]: """Handle robot action (for HTTP API). Args: robot_id: Robot identifier. action: Action to perform. params: Action parameters. Returns: Operation result. """ robot = self.state_manager.get_robot(robot_id) if not robot: return {"status": "error", "message": f"Robot {robot_id} not found"} linear = params.get("linear") angular = params.get("angular") duration = params.get("duration") if robot.is_virtual: return await self._handle_virtual_robot(robot, action, linear, angular, duration, **params) else: return await self._handle_physical_robot(robot, action, linear, angular, duration, **params)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/robotics-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server