"""ROS TF transform tools for MCP server."""
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
logger = logging.getLogger(__name__)
# Global TF buffer and listener (lazy initialization)
_tf_buffer = None
_tf_listener = None
def _get_tf_buffer():
"""Get or create the TF2 buffer and listener."""
global _tf_buffer, _tf_listener
if _tf_buffer is None:
import tf2_ros
import rospy
_tf_buffer = tf2_ros.Buffer()
_tf_listener = tf2_ros.TransformListener(_tf_buffer)
# Wait a moment for transforms to be received
rospy.sleep(0.5)
return _tf_buffer
@mcp.tool
def ros_list_frames() -> Dict[str, Any]:
"""
List all TF frames currently available.
Returns:
Dictionary containing list of frame names.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
tf_buffer = _get_tf_buffer()
# Get all frames as YAML
frames_yaml = tf_buffer.all_frames_as_yaml()
# Parse frame names from YAML
frames = []
for line in frames_yaml.split("\n"):
line = line.strip()
if line.endswith(":"):
frame_name = line[:-1].strip()
if frame_name:
frames.append(frame_name)
return {
"success": True,
"frames": sorted(set(frames)),
"count": len(set(frames)),
}
except Exception as e:
logger.error(f"Failed to list TF frames: {e}")
return {
"success": False,
"error": str(e),
}
@mcp.tool
def ros_lookup_transform(
target_frame: str,
source_frame: str,
time: float = 0.0,
timeout: float = 1.0,
) -> Dict[str, Any]:
"""
Look up the transform between two coordinate frames.
Args:
target_frame: The target frame to transform into.
source_frame: The source frame to transform from.
time: Time at which to get the transform (0 = latest available).
timeout: Maximum time to wait for the transform in seconds.
Returns:
Dictionary containing the transform (translation and rotation).
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
try:
tf_buffer = _get_tf_buffer()
# Convert time to rospy.Time
if time == 0.0:
ros_time = rospy.Time(0) # Latest available
else:
ros_time = rospy.Time.from_sec(time)
# Look up transform
transform = tf_buffer.lookup_transform(
target_frame,
source_frame,
ros_time,
rospy.Duration(timeout),
)
# Extract transform components
trans = transform.transform.translation
rot = transform.transform.rotation
return {
"success": True,
"target_frame": target_frame,
"source_frame": source_frame,
"stamp": {
"secs": transform.header.stamp.secs,
"nsecs": transform.header.stamp.nsecs,
},
"translation": {
"x": trans.x,
"y": trans.y,
"z": trans.z,
},
"rotation": {
"x": rot.x,
"y": rot.y,
"z": rot.z,
"w": rot.w,
},
}
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to lookup transform from {source_frame} to {target_frame}: {e}")
return {
"success": False,
"target_frame": target_frame,
"source_frame": source_frame,
"error": error_msg,
}
@mcp.tool
def ros_can_transform(
target_frame: str,
source_frame: str,
time: float = 0.0,
timeout: float = 0.1,
) -> Dict[str, Any]:
"""
Check if a transform is available between two frames.
Args:
target_frame: The target frame.
source_frame: The source frame.
time: Time at which to check (0 = latest available).
timeout: Maximum time to wait in seconds.
Returns:
Dictionary with availability status and any error reason.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
try:
tf_buffer = _get_tf_buffer()
# Convert time to rospy.Time
if time == 0.0:
ros_time = rospy.Time(0)
else:
ros_time = rospy.Time.from_sec(time)
# Check if transform can be performed
can_transform = tf_buffer.can_transform(
target_frame,
source_frame,
ros_time,
rospy.Duration(timeout),
)
return {
"success": True,
"target_frame": target_frame,
"source_frame": source_frame,
"can_transform": can_transform,
}
except Exception as e:
logger.error(f"Failed to check transform availability: {e}")
return {
"success": False,
"target_frame": target_frame,
"source_frame": source_frame,
"can_transform": False,
"error": str(e),
}
@mcp.tool
def ros_get_frame_tree() -> Dict[str, Any]:
"""
Get the TF frame tree structure.
Returns:
Dictionary containing the frame tree with parent-child relationships.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
tf_buffer = _get_tf_buffer()
# Get all frames as YAML and parse
frames_yaml = tf_buffer.all_frames_as_yaml()
# Parse the YAML to build tree structure
tree = {}
current_frame = None
for line in frames_yaml.split("\n"):
if not line.strip():
continue
# Frame definitions start without indentation
if line.startswith("Frame ") or (not line.startswith(" ") and line.endswith(":")):
# Extract frame name
if line.startswith("Frame "):
current_frame = line.split()[1].rstrip(":")
else:
current_frame = line.rstrip(":").strip()
if current_frame:
tree[current_frame] = {
"parent": None,
"broadcaster": None,
"average_rate": None,
}
elif current_frame and line.strip().startswith("parent:"):
parent = line.split(":", 1)[1].strip()
tree[current_frame]["parent"] = parent
elif current_frame and "broadcaster:" in line.lower():
broadcaster = line.split(":", 1)[1].strip()
tree[current_frame]["broadcaster"] = broadcaster
elif current_frame and "average rate:" in line.lower():
try:
rate = float(line.split(":", 1)[1].strip().split()[0])
tree[current_frame]["average_rate"] = rate
except (ValueError, IndexError):
pass
# Build parent-child relationships
children = {frame: [] for frame in tree}
roots = []
for frame, info in tree.items():
parent = info.get("parent")
if parent and parent in children:
children[parent].append(frame)
elif parent is None:
roots.append(frame)
return {
"success": True,
"frames": tree,
"roots": roots,
"children": children,
"count": len(tree),
}
except Exception as e:
logger.error(f"Failed to get TF frame tree: {e}")
return {
"success": False,
"error": str(e),
}
@mcp.tool
def ros_transform_point(
point: Dict[str, float],
source_frame: str,
target_frame: str,
time: float = 0.0,
timeout: float = 1.0,
) -> Dict[str, Any]:
"""
Transform a point from one frame to another.
Args:
point: Dictionary with 'x', 'y', 'z' coordinates.
source_frame: The frame the point is currently in.
target_frame: The frame to transform the point into.
time: Time at which to transform (0 = latest available).
timeout: Maximum time to wait for the transform.
Returns:
Dictionary containing the transformed point.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
from geometry_msgs.msg import PointStamped
import tf2_geometry_msgs
try:
tf_buffer = _get_tf_buffer()
# Create PointStamped message
point_stamped = PointStamped()
point_stamped.header.frame_id = source_frame
if time == 0.0:
point_stamped.header.stamp = rospy.Time(0)
else:
point_stamped.header.stamp = rospy.Time.from_sec(time)
point_stamped.point.x = point.get("x", 0.0)
point_stamped.point.y = point.get("y", 0.0)
point_stamped.point.z = point.get("z", 0.0)
# Transform the point
transformed = tf_buffer.transform(
point_stamped,
target_frame,
rospy.Duration(timeout),
)
return {
"success": True,
"source_frame": source_frame,
"target_frame": target_frame,
"original_point": point,
"transformed_point": {
"x": transformed.point.x,
"y": transformed.point.y,
"z": transformed.point.z,
},
}
except Exception as e:
logger.error(f"Failed to transform point: {e}")
return {
"success": False,
"source_frame": source_frame,
"target_frame": target_frame,
"error": str(e),
}
@mcp.tool
def ros_transform_pose(
pose: Dict[str, Any],
source_frame: str,
target_frame: str,
time: float = 0.0,
timeout: float = 1.0,
) -> Dict[str, Any]:
"""
Transform a pose from one frame to another.
Args:
pose: Dictionary with 'position' (x, y, z) and 'orientation' (x, y, z, w).
source_frame: The frame the pose is currently in.
target_frame: The frame to transform the pose into.
time: Time at which to transform (0 = latest available).
timeout: Maximum time to wait for the transform.
Returns:
Dictionary containing the transformed pose.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
from geometry_msgs.msg import PoseStamped
import tf2_geometry_msgs
try:
tf_buffer = _get_tf_buffer()
# Create PoseStamped message
pose_stamped = PoseStamped()
pose_stamped.header.frame_id = source_frame
if time == 0.0:
pose_stamped.header.stamp = rospy.Time(0)
else:
pose_stamped.header.stamp = rospy.Time.from_sec(time)
# Set position
pos = pose.get("position", {})
pose_stamped.pose.position.x = pos.get("x", 0.0)
pose_stamped.pose.position.y = pos.get("y", 0.0)
pose_stamped.pose.position.z = pos.get("z", 0.0)
# Set orientation
orient = pose.get("orientation", {"w": 1.0})
pose_stamped.pose.orientation.x = orient.get("x", 0.0)
pose_stamped.pose.orientation.y = orient.get("y", 0.0)
pose_stamped.pose.orientation.z = orient.get("z", 0.0)
pose_stamped.pose.orientation.w = orient.get("w", 1.0)
# Transform the pose
transformed = tf_buffer.transform(
pose_stamped,
target_frame,
rospy.Duration(timeout),
)
return {
"success": True,
"source_frame": source_frame,
"target_frame": target_frame,
"original_pose": pose,
"transformed_pose": {
"position": {
"x": transformed.pose.position.x,
"y": transformed.pose.position.y,
"z": transformed.pose.position.z,
},
"orientation": {
"x": transformed.pose.orientation.x,
"y": transformed.pose.orientation.y,
"z": transformed.pose.orientation.z,
"w": transformed.pose.orientation.w,
},
},
}
except Exception as e:
logger.error(f"Failed to transform pose: {e}")
return {
"success": False,
"source_frame": source_frame,
"target_frame": target_frame,
"error": str(e),
}