Skip to main content
Glama
robot_behavior.py31.5 kB
"""Robot behavior portmanteau tool - Animation, camera, navigation, and manipulation control. Consolidates animation, camera, navigation, and manipulation (arms/grippers) operations into a single unified tool. """ from typing import Any, Dict, List, Literal, Optional import structlog from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error from ..utils.mcp_client_helper import call_mounted_server_tool logger = structlog.get_logger(__name__) class RobotBehaviorTool: """Portmanteau tool for robot behavior: animation, camera, and navigation.""" def __init__(self, mcp: Any, state_manager: Any, mounted_servers: Optional[Dict[str, Any]] = None): """Initialize robot behavior tool. Args: mcp: FastMCP server instance. state_manager: Robot state manager instance. mounted_servers: Dictionary of mounted MCP servers. """ self.mcp = mcp self.state_manager = state_manager self.mounted_servers = mounted_servers or {} def register(self): """Register robot behavior tool with MCP server.""" @self.mcp.tool() async def robot_behavior( robot_id: str, category: Literal["animation", "camera", "navigation", "manipulation"], action: str, # Animation parameters wheel_speeds: Optional[Dict[str, float]] = None, animation_name: Optional[str] = None, pose: Optional[str] = None, animation_speed: Optional[float] = None, loop: bool = False, # Camera parameters angle_x: Optional[float] = None, angle_y: Optional[float] = None, output_path: Optional[str] = None, stream_url: Optional[str] = None, # Navigation parameters start_position: Optional[Dict[str, float]] = None, goal_position: Optional[Dict[str, float]] = None, waypoint: Optional[Dict[str, float]] = None, obstacle_position: Optional[Dict[str, float]] = None, path_id: Optional[str] = None, # Manipulation parameters joint_positions: Optional[Dict[str, float]] = None, end_effector_pose: Optional[Dict[str, Any]] = None, gripper_position: Optional[float] = None, arm_id: Optional[str] = None, force_limit: Optional[float] = None, manipulation_speed: Optional[float] = None, ) -> Dict[str, Any]: """Robot behavior control portmanteau - Animation, camera, navigation, and manipulation. PORTMANTEAU PATTERN: Consolidates animation, camera, navigation, and manipulation operations into a single unified tool. This reduces tool explosion while maintaining full functionality across all behavior categories. CATEGORIES: - animation: Animation and pose control - camera: Camera feed and visual control - navigation: Path planning and navigation - manipulation: Arm and gripper control ANIMATION ACTIONS: - animate_wheels: Rotate wheels during movement (Scout mecanum wheels) - animate_movement: Play movement animations (walk, turn, etc.) - set_pose: Set robot pose (sitting, standing, etc. for Unitree) - play_animation: Play custom animations - stop_animation: Stop current animation - get_animation_state: Get current animation state CAMERA ACTIONS: - get_camera_feed: Get live camera feed (physical Scout camera) - get_virtual_camera: Get Unity camera view from robot perspective - set_camera_angle: Adjust camera angle - capture_image: Capture still image - start_streaming: Start video stream - stop_streaming: Stop video stream - get_camera_status: Get camera status and settings NAVIGATION ACTIONS: - plan_path: Plan path from A to B (A* or RRT) - follow_path: Execute planned path - set_waypoint: Set navigation waypoint - clear_waypoints: Clear waypoint list - get_path_status: Check path execution status - avoid_obstacle: Dynamic obstacle avoidance - get_current_path: Get current path being followed MANIPULATION ACTIONS: - move_arm: Move arm to target joint positions or end-effector pose - set_joint_positions: Set individual joint positions (dict of joint_name: angle) - set_end_effector_pose: Move end-effector to target pose (position + orientation) - get_arm_state: Get current arm joint positions and end-effector pose - open_gripper: Open gripper fully - close_gripper: Close gripper fully - set_gripper_position: Set gripper position (0.0 = open, 1.0 = closed) - get_gripper_state: Get current gripper position and force feedback - move_to_pose: Move arm to target pose with IK (inverse kinematics) - home_arm: Return arm to home/rest position Args: robot_id: Robot identifier (e.g., "scout_01", "vbot_scout_01"). category: Behavior category: "animation", "camera", or "navigation". action: Action to perform (see category-specific actions above). # Animation parameters (used when category="animation") wheel_speeds: Wheel speeds for animate_wheels. animation_name: Animation name for play_animation. pose: Pose name for set_pose. animation_speed: Animation speed multiplier. loop: Whether to loop animation. # Camera parameters (used when category="camera") angle_x: Camera angle X (pitch) in degrees. angle_y: Camera angle Y (yaw) in degrees. output_path: Output file path for capture_image. stream_url: Stream URL for start_streaming. # Navigation parameters (used when category="navigation") start_position: Start position (x, y, z) for plan_path. goal_position: Goal position (x, y, z) for plan_path. waypoint: Waypoint position (x, y, z) for set_waypoint. obstacle_position: Obstacle position (x, y, z) for avoid_obstacle. path_id: Path identifier for follow_path or get_path_status. # Manipulation parameters (used when category="manipulation") joint_positions: Joint positions dict (e.g., {"shoulder": 45.0, "elbow": 90.0}). end_effector_pose: End-effector pose dict with position and orientation. gripper_position: Gripper position (0.0 = open, 1.0 = closed). arm_id: Arm identifier for multi-arm robots (e.g., "left", "right"). force_limit: Maximum force/torque limit for movement. manipulation_speed: Movement speed (0.0-1.0) for arm/gripper motion. Returns: Dictionary containing operation result. Examples: # Animation result = await robot_behavior( robot_id="scout_01", category="animation", action="play_animation", animation_name="walk", animation_speed=1.0, loop=True ) # Camera result = await robot_behavior( robot_id="scout_01", category="camera", action="capture_image", output_path="C:/Images/capture.jpg" ) # Navigation result = await robot_behavior( robot_id="scout_01", category="navigation", action="plan_path", start_position={"x": 0, "y": 0, "z": 0}, goal_position={"x": 5, "y": 0, "z": 0} ) # Manipulation - Move arm result = await robot_behavior( robot_id="g1_01", category="manipulation", action="move_arm", joint_positions={"shoulder": 45.0, "elbow": 90.0, "wrist": 0.0} ) # Manipulation - Open gripper result = await robot_behavior( robot_id="g1_01", category="manipulation", action="open_gripper" ) """ try: robot = self.state_manager.get_robot(robot_id) if not robot: return format_error_response( f"Robot {robot_id} not found", error_type="not_found", robot_id=robot_id, category=category, action=action, ) # Route to category handler if category == "animation": return await self._handle_animation( robot, action, wheel_speeds, animation_name, pose, speed, loop ) elif category == "camera": return await self._handle_camera(robot, action, angle_x, angle_y, output_path, stream_url) elif category == "navigation": return await self._handle_navigation( robot, action, start_position, goal_position, waypoint, obstacle_position, path_id ) elif category == "manipulation": return await self._handle_manipulation( robot, action, joint_positions, end_effector_pose, gripper_position, arm_id, force_limit, manipulation_speed ) else: return format_error_response( f"Unknown category: {category}", error_type="validation_error", valid_categories=["animation", "camera", "navigation", "manipulation"], ) except Exception as e: return handle_tool_error("robot_behavior", e, robot_id=robot_id, category=category, action=action) async def _handle_animation( self, robot: Any, action: str, wheel_speeds: Optional[Dict[str, float]], animation_name: Optional[str], pose: Optional[str], animation_speed: Optional[float], loop: bool, ) -> Dict[str, Any]: """Handle animation operations.""" if robot.is_virtual: return await self._handle_virtual_animation(robot, action, wheel_speeds, animation_name, pose, animation_speed, loop) else: return await self._handle_physical_animation(robot, action, wheel_speeds, animation_name, pose, animation_speed, loop) async def _handle_camera( self, robot: Any, action: str, angle_x: Optional[float], angle_y: Optional[float], output_path: Optional[str], stream_url: Optional[str], ) -> Dict[str, Any]: """Handle camera operations.""" if robot.is_virtual: return await self._handle_virtual_camera(robot, action, angle_x, angle_y, output_path, stream_url) else: return await self._handle_physical_camera(robot, action, angle_x, angle_y, output_path, stream_url) async def _handle_navigation( self, robot: Any, action: str, start_position: Optional[Dict[str, float]], goal_position: Optional[Dict[str, float]], waypoint: Optional[Dict[str, float]], obstacle_position: Optional[Dict[str, float]], path_id: Optional[str], ) -> Dict[str, Any]: """Handle navigation operations.""" if robot.is_virtual: return await self._handle_virtual_navigation( robot, action, start_position, goal_position, waypoint, obstacle_position, path_id ) else: return await self._handle_physical_navigation( robot, action, start_position, goal_position, waypoint, obstacle_position, path_id ) async def _handle_manipulation( self, robot: Any, action: str, joint_positions: Optional[Dict[str, float]], end_effector_pose: Optional[Dict[str, Any]], gripper_position: Optional[float], arm_id: Optional[str], force_limit: Optional[float], manipulation_speed: Optional[float], ) -> Dict[str, Any]: """Handle manipulation operations (arms and grippers).""" if robot.is_virtual: return await self._handle_virtual_manipulation( robot, action, joint_positions, end_effector_pose, gripper_position, arm_id, force_limit, manipulation_speed ) else: return await self._handle_physical_manipulation( robot, action, joint_positions, end_effector_pose, gripper_position, arm_id, force_limit, manipulation_speed ) # Virtual animation handlers (from robot_animation.py) async def _handle_virtual_animation( self, robot: Any, action: str, wheel_speeds: Optional[Dict[str, float]], animation_name: Optional[str], pose: Optional[str], animation_speed: Optional[float], loop: bool, ) -> Dict[str, Any]: """Handle virtual robot animation.""" try: if robot.platform == "unity" and "unity" in self.mounted_servers: method_map = { "animate_wheels": ("RobotAnimator", "AnimateWheels", {"robotId": robot.robot_id, "wheelSpeeds": wheel_speeds or {}}), "animate_movement": ("RobotAnimator", "AnimateMovement", {"robotId": robot.robot_id, "animationName": animation_name or "walk", "speed": animation_speed or 1.0, "loop": loop}), "set_pose": ("RobotAnimator", "SetPose", {"robotId": robot.robot_id, "pose": pose or "idle"}), "play_animation": ("RobotAnimator", "PlayAnimation", {"robotId": robot.robot_id, "animationName": animation_name or "idle", "speed": animation_speed or 1.0, "loop": loop}), "stop_animation": ("RobotAnimator", "StopAnimation", {"robotId": robot.robot_id}), "get_animation_state": ("RobotAnimator", "GetAnimationState", {"robotId": robot.robot_id}), } if action not in method_map: return format_error_response(f"Unknown animation action: {action}", error_type="validation_error") class_name, method_name, params = method_map[action] result = await call_mounted_server_tool( self.mounted_servers, "unity", "execute_unity_method", { "class_name": class_name, "method_name": method_name, "parameters": params, }, ) return format_success_response( f"Animation {action} executed for {robot.robot_id}", robot_id=robot.robot_id, category="animation", action=action, data=result, ) else: logger.info("Mock animation", robot_id=robot.robot_id, action=action) return format_success_response( f"Mock animation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="animation", action=action, data={"note": "Mock mode - Unity/VRChat not available"}, ) except Exception as e: return handle_tool_error("_handle_virtual_animation", e, robot_id=robot.robot_id, action=action) async def _handle_physical_animation( self, robot: Any, action: str, wheel_speeds: Optional[Dict[str, float]], animation_name: Optional[str], pose: Optional[str], animation_speed: Optional[float], loop: bool, ) -> Dict[str, Any]: """Handle physical robot animation.""" logger.info("Physical robot animation", robot_id=robot.robot_id, action=action) return format_success_response( f"Physical robot animation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="animation", action=action, data={"note": "Physical robot animation not yet implemented (requires ROS integration)"}, ) # Virtual camera handlers (from robot_camera.py) async def _handle_virtual_camera( self, robot: Any, action: str, angle_x: Optional[float], angle_y: Optional[float], output_path: Optional[str], stream_url: Optional[str], ) -> Dict[str, Any]: """Handle virtual robot camera.""" try: if robot.platform == "unity" and "unity" in self.mounted_servers: method_map = { "get_camera_feed": ("RobotCamera", "GetCameraFeed", {"robotId": robot.robot_id}), "get_virtual_camera": ("RobotCamera", "GetCameraFeed", {"robotId": robot.robot_id}), "set_camera_angle": ("RobotCamera", "SetCameraAngle", {"robotId": robot.robot_id, "angleX": angle_x or 0.0, "angleY": angle_y or 0.0}), "capture_image": ("RobotCamera", "CaptureImage", {"robotId": robot.robot_id, "outputPath": output_path or ""}), "start_streaming": ("RobotCamera", "StartStreaming", {"robotId": robot.robot_id, "streamUrl": stream_url or ""}), "stop_streaming": ("RobotCamera", "StopStreaming", {"robotId": robot.robot_id}), "get_camera_status": ("RobotCamera", "GetCameraStatus", {"robotId": robot.robot_id}), } if action not in method_map: return format_error_response(f"Unknown camera action: {action}", error_type="validation_error") class_name, method_name, params = method_map[action] result = await call_mounted_server_tool( self.mounted_servers, "unity", "execute_unity_method", { "class_name": class_name, "method_name": method_name, "parameters": params, }, ) return format_success_response( f"Camera {action} executed for {robot.robot_id}", robot_id=robot.robot_id, category="camera", action=action, data=result, ) else: logger.info("Mock camera", robot_id=robot.robot_id, action=action) return format_success_response( f"Mock camera: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="camera", action=action, data={"note": "Mock mode - Unity not available"}, ) except Exception as e: return handle_tool_error("_handle_virtual_camera", e, robot_id=robot.robot_id, action=action) async def _handle_physical_camera( self, robot: Any, action: str, angle_x: Optional[float], angle_y: Optional[float], output_path: Optional[str], stream_url: Optional[str], ) -> Dict[str, Any]: """Handle physical robot camera.""" logger.info("Physical robot camera", robot_id=robot.robot_id, action=action) return format_success_response( f"Physical robot camera: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="camera", action=action, data={"note": "Physical robot camera not yet implemented (requires ROS integration)"}, ) # Virtual navigation handlers (from robot_navigation.py) async def _handle_virtual_navigation( self, robot: Any, action: str, start_position: Optional[Dict[str, float]], goal_position: Optional[Dict[str, float]], waypoint: Optional[Dict[str, float]], obstacle_position: Optional[Dict[str, float]], path_id: Optional[str], ) -> Dict[str, Any]: """Handle virtual robot navigation.""" try: if robot.platform == "unity" and "unity" in self.mounted_servers: method_map = { "plan_path": ("RobotNavigator", "PlanPath", {"robotId": robot.robot_id, "startPosition": start_position or {}, "goalPosition": goal_position or {}}), "follow_path": ("RobotNavigator", "FollowPath", {"robotId": robot.robot_id, "pathId": path_id or ""}), "set_waypoint": ("RobotNavigator", "SetWaypoint", {"robotId": robot.robot_id, "waypoint": waypoint or {}}), "clear_waypoints": ("RobotNavigator", "ClearWaypoints", {"robotId": robot.robot_id}), "get_path_status": ("RobotNavigator", "GetPathStatus", {"robotId": robot.robot_id, "pathId": path_id or ""}), "avoid_obstacle": ("RobotNavigator", "AvoidObstacle", {"robotId": robot.robot_id, "obstaclePosition": obstacle_position or {}}), "get_current_path": ("RobotNavigator", "GetCurrentPath", {"robotId": robot.robot_id}), } if action not in method_map: return format_error_response(f"Unknown navigation action: {action}", error_type="validation_error") class_name, method_name, params = method_map[action] result = await call_mounted_server_tool( self.mounted_servers, "unity", "execute_unity_method", { "class_name": class_name, "method_name": method_name, "parameters": params, }, ) return format_success_response( f"Navigation {action} executed for {robot.robot_id}", robot_id=robot.robot_id, category="navigation", action=action, data=result, ) else: logger.info("Mock navigation", robot_id=robot.robot_id, action=action) mock_path = { "path_id": path_id or "mock_path_123", "waypoints": [ start_position or {"x": 0, "y": 0, "z": 0}, goal_position or {"x": 5, "y": 0, "z": 0}, ], "status": "planned", } return format_success_response( f"Mock navigation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="navigation", action=action, data={"note": "Mock mode - Unity not available", "path": mock_path}, ) except Exception as e: return handle_tool_error("_handle_virtual_navigation", e, robot_id=robot.robot_id, action=action) async def _handle_physical_navigation( self, robot: Any, action: str, start_position: Optional[Dict[str, float]], goal_position: Optional[Dict[str, float]], waypoint: Optional[Dict[str, float]], obstacle_position: Optional[Dict[str, float]], path_id: Optional[str], ) -> Dict[str, Any]: """Handle physical robot navigation.""" logger.info("Physical robot navigation", robot_id=robot.robot_id, action=action) return format_success_response( f"Physical robot navigation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="navigation", action=action, data={"note": "Physical robot navigation not yet implemented (requires ROS navigation stack)"}, ) # Virtual manipulation handlers async def _handle_virtual_manipulation( self, robot: Any, action: str, joint_positions: Optional[Dict[str, float]], end_effector_pose: Optional[Dict[str, Any]], gripper_position: Optional[float], arm_id: Optional[str], force_limit: Optional[float], manipulation_speed: Optional[float], ) -> Dict[str, Any]: """Handle virtual robot manipulation (arms and grippers).""" try: if robot.platform == "unity" and "unity" in self.mounted_servers: method_map = { "move_arm": ( "RobotManipulator", "MoveArm", { "robotId": robot.robot_id, "jointPositions": joint_positions or {}, "endEffectorPose": end_effector_pose or {}, "armId": arm_id or "default", "speed": manipulation_speed or 0.5, }, ), "set_joint_positions": ( "RobotManipulator", "SetJointPositions", {"robotId": robot.robot_id, "jointPositions": joint_positions or {}, "armId": arm_id or "default"}, ), "set_end_effector_pose": ( "RobotManipulator", "SetEndEffectorPose", { "robotId": robot.robot_id, "pose": end_effector_pose or {}, "armId": arm_id or "default", "speed": manipulation_speed or 0.5, }, ), "get_arm_state": ("RobotManipulator", "GetArmState", {"robotId": robot.robot_id, "armId": arm_id or "default"}), "open_gripper": ("RobotGripper", "OpenGripper", {"robotId": robot.robot_id, "armId": arm_id or "default", "speed": manipulation_speed or 0.5}), "close_gripper": ( "RobotGripper", "CloseGripper", {"robotId": robot.robot_id, "armId": arm_id or "default", "speed": manipulation_speed or 0.5, "forceLimit": force_limit}, ), "set_gripper_position": ( "RobotGripper", "SetGripperPosition", {"robotId": robot.robot_id, "position": gripper_position or 0.0, "armId": arm_id or "default", "speed": manipulation_speed or 0.5}, ), "get_gripper_state": ("RobotGripper", "GetGripperState", {"robotId": robot.robot_id, "armId": arm_id or "default"}), "move_to_pose": ( "RobotManipulator", "MoveToPose", { "robotId": robot.robot_id, "pose": end_effector_pose or {}, "armId": arm_id or "default", "speed": manipulation_speed or 0.5, }, ), "home_arm": ("RobotManipulator", "HomeArm", {"robotId": robot.robot_id, "armId": arm_id or "default", "speed": manipulation_speed or 0.5}), } if action not in method_map: return format_error_response(f"Unknown manipulation action: {action}", error_type="validation_error") class_name, method_name, params = method_map[action] result = await call_mounted_server_tool( self.mounted_servers, "unity", "execute_unity_method", { "class_name": class_name, "method_name": method_name, "parameters": params, }, ) return format_success_response( f"Manipulation {action} executed for {robot.robot_id}", robot_id=robot.robot_id, category="manipulation", action=action, data=result, ) else: logger.info("Mock manipulation", robot_id=robot.robot_id, action=action) mock_data = {} if action in ["get_arm_state", "get_gripper_state"]: mock_data = { "joint_positions": joint_positions or {"shoulder": 0.0, "elbow": 0.0, "wrist": 0.0}, "end_effector_pose": end_effector_pose or {"position": {"x": 0.0, "y": 0.5, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}, "gripper_position": gripper_position or 0.0, } return format_success_response( f"Mock manipulation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="manipulation", action=action, data={"note": "Mock mode - Unity not available", **mock_data}, ) except Exception as e: return handle_tool_error("_handle_virtual_manipulation", e, robot_id=robot.robot_id, action=action) async def _handle_physical_manipulation( self, robot: Any, action: str, joint_positions: Optional[Dict[str, float]], end_effector_pose: Optional[Dict[str, Any]], gripper_position: Optional[float], arm_id: Optional[str], force_limit: Optional[float], manipulation_speed: Optional[float], ) -> Dict[str, Any]: """Handle physical robot manipulation (arms and grippers).""" logger.info("Physical robot manipulation", robot_id=robot.robot_id, action=action) return format_success_response( f"Physical robot manipulation: {action} for {robot.robot_id}", robot_id=robot.robot_id, category="manipulation", action=action, data={ "note": "Physical robot manipulation not yet implemented (requires ROS MoveIt! or similar arm control stack)", "ros_topics": { "joint_states": "/joint_states", "arm_command": "/arm_controller/command", "gripper_command": "/gripper_controller/command", }, }, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/robotics-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server