"""Yahboom ROS 2 robot client - Raspbot-V2 and similar models."""
import asyncio
import json
from typing import Any, Dict, Optional
import structlog
from pydantic import BaseModel
logger = structlog.get_logger(__name__)
class YahboomRobotConfig(BaseModel):
"""Configuration for Yahboom robots."""
robot_id: str
ip_address: str
port: int = 9090 # ROS bridge default
ros_domain_id: int = 0
camera_enabled: bool = True
navigation_enabled: bool = True
arm_enabled: bool = False # Enable when arm addon purchased
mock_mode: bool = True
class YahboomClient:
"""ROS 2 client for Yahboom robots (Raspbot-V2, etc.)."""
def __init__(self, config: YahboomRobotConfig):
"""Initialize Yahboom ROS 2 client.
Args:
config: Robot configuration.
"""
self.config = config
self.connected = False
self.websocket = None
self._status_cache = {}
# ROS 2 topics for Raspbot-V2
self.topics = {
"cmd_vel": "/cmd_vel", # Movement commands
"odom": "/odom", # Odometry
"scan": "/scan", # LiDAR (if equipped)
"battery": "/battery", # Battery status
"camera": "/camera/image_raw", # Camera feed
"imu": "/imu", # IMU data
}
if self.config.arm_enabled:
self.topics.update({
"joint_states": "/joint_states", # Arm joint positions
"arm_cmd": "/arm_controller/command", # Arm commands
"gripper_cmd": "/gripper_controller/command", # Gripper commands
})
async def connect(self) -> bool:
"""Connect to robot via ROS bridge WebSocket.
Returns:
True if connection successful.
"""
if self.config.mock_mode:
logger.info("Running in mock mode", robot_id=self.config.robot_id)
self.connected = True
return True
try:
# TODO: Implement actual ROS bridge WebSocket connection
# For now, simulate connection
logger.info("Connecting to Yahboom robot", robot_id=self.config.robot_id,
address=f"{self.config.ip_address}:{self.config.port}")
await asyncio.sleep(0.1) # Simulate connection delay
self.connected = True
return True
except Exception as e:
logger.error("Failed to connect to Yahboom robot",
robot_id=self.config.robot_id, error=str(e))
return False
async def disconnect(self):
"""Disconnect from robot."""
if self.websocket:
# TODO: Close WebSocket connection
pass
self.connected = False
async def get_status(self) -> Dict[str, Any]:
"""Get comprehensive robot status.
Returns:
Status dictionary with battery, position, sensors, etc.
"""
if not self.connected:
return {"error": "Not connected"}
if self.config.mock_mode:
# Mock status for Raspbot-V2
return {
"robot_id": self.config.robot_id,
"model": "Raspbot-V2",
"connected": True,
"battery": {
"voltage": 11.8,
"percentage": 85,
"charging": False,
},
"position": {
"x": 0.0,
"y": 0.0,
"theta": 0.0,
},
"sensors": {
"camera": self.config.camera_enabled,
"imu": True,
"lidar": False, # No LiDAR on base model
},
"capabilities": {
"navigation": self.config.navigation_enabled,
"arm": self.config.arm_enabled,
"camera_streaming": self.config.camera_enabled,
},
"mock": True,
}
# TODO: Implement actual ROS status queries
return {"error": "ROS integration not implemented"}
async def move(self, linear: float, angular: float, duration: Optional[float] = None) -> Dict[str, Any]:
"""Move robot with velocity commands.
Args:
linear: Linear velocity (m/s)
angular: Angular velocity (rad/s)
duration: Movement duration in seconds
Returns:
Movement result.
"""
if not self.connected:
return {"error": "Not connected"}
if self.config.mock_mode:
logger.info("Mock movement command",
robot_id=self.config.robot_id,
linear=linear,
angular=angular,
duration=duration)
# Simulate movement time
if duration:
await asyncio.sleep(min(duration, 1.0)) # Cap at 1 second for safety
return {
"success": True,
"robot_id": self.config.robot_id,
"command": "move",
"linear": linear,
"angular": angular,
"duration": duration,
"mock": True,
}
# TODO: Publish to /cmd_vel topic
return {"error": "ROS integration not implemented"}
async def stop(self) -> Dict[str, Any]:
"""Emergency stop."""
if not self.connected:
return {"error": "Not connected"}
if self.config.mock_mode:
logger.info("Mock emergency stop", robot_id=self.config.robot_id)
return {
"success": True,
"robot_id": self.config.robot_id,
"command": "stop",
"mock": True,
}
# TODO: Publish zero velocity to /cmd_vel
return {"error": "ROS integration not implemented"}
async def get_camera_frame(self) -> Optional[bytes]:
"""Get latest camera frame.
Returns:
JPEG image data or None if no camera.
"""
if not self.config.camera_enabled:
return None
if self.config.mock_mode:
# Return placeholder for mock mode
return b"mock_camera_data"
# TODO: Subscribe to /camera/image_raw topic
return None
async def control_arm(self, joint_angles: Dict[str, float]) -> Dict[str, Any]:
"""Control robotic arm joints.
Args:
joint_angles: Dictionary of joint names to target angles
Returns:
Arm control result.
"""
if not self.config.arm_enabled:
return {"error": "Arm not enabled/configured"}
if self.config.mock_mode:
logger.info("Mock arm control",
robot_id=self.config.robot_id,
joints=joint_angles)
return {
"success": True,
"robot_id": self.config.robot_id,
"command": "arm_control",
"joint_angles": joint_angles,
"mock": True,
}
# TODO: Publish to /arm_controller/command topic
return {"error": "ROS integration not implemented"}
async def control_gripper(self, action: str, force: Optional[float] = None) -> Dict[str, Any]:
"""Control gripper.
Args:
action: "open", "close", "stop"
force: Grip force (0-1)
Returns:
Gripper control result.
"""
if not self.config.arm_enabled:
return {"error": "Gripper not available (arm not configured)"}
if self.config.mock_mode:
logger.info("Mock gripper control",
robot_id=self.config.robot_id,
action=action,
force=force)
return {
"success": True,
"robot_id": self.config.robot_id,
"command": "gripper_control",
"action": action,
"force": force,
"mock": True,
}
# TODO: Publish to /gripper_controller/command topic
return {"error": "ROS integration not implemented"}
async def navigate_to_pose(self, x: float, y: float, theta: float) -> Dict[str, Any]:
"""Navigate to specific pose using ROS Navigation stack.
Args:
x: Target X position
y: Target Y position
theta: Target orientation
Returns:
Navigation result.
"""
if not self.config.navigation_enabled:
return {"error": "Navigation not enabled"}
if self.config.mock_mode:
logger.info("Mock navigation",
robot_id=self.config.robot_id,
target={"x": x, "y": y, "theta": theta})
return {
"success": True,
"robot_id": self.config.robot_id,
"command": "navigate",
"target_pose": {"x": x, "y": y, "theta": theta},
"mock": True,
}
# TODO: Use ROS Navigation action client
return {"error": "ROS Navigation integration not implemented"}