"""ROS topic tools for MCP server."""
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
from ..utils.message_utils import (
message_to_dict,
dict_to_message,
get_topic_type,
get_message_class,
)
logger = logging.getLogger(__name__)
@mcp.tool
def ros_list_topics() -> List[Dict[str, str]]:
"""
List all active ROS topics.
Returns a list of topics with their message types.
Returns:
List of dictionaries containing 'name' and 'type' for each topic.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Get list of published topics
topics = rospy.get_published_topics()
return [{"name": name, "type": msg_type} for name, msg_type in sorted(topics)]
@mcp.tool
def ros_get_topic_info(topic: str) -> Dict[str, Any]:
"""
Get detailed information about a ROS topic.
Args:
topic: The topic name (e.g., "/cmd_vel", "/odom").
Returns:
Dictionary containing topic information including message type,
publishers, and subscribers.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
import rostopic
# Normalize topic name
if not topic.startswith("/"):
topic = "/" + topic
# Get topic type
topic_type = get_topic_type(topic)
if topic_type is None:
return {
"topic": topic,
"exists": False,
"error": f"Topic '{topic}' not found or has no publishers",
}
# Get publisher and subscriber info
try:
master = rospy.get_master()
state = master.getSystemState()
publishers = []
subscribers = []
if state[0] == 1: # Success
pub_list, sub_list, _ = state[2]
for t, nodes in pub_list:
if t == topic:
publishers = nodes
break
for t, nodes in sub_list:
if t == topic:
subscribers = nodes
break
return {
"topic": topic,
"exists": True,
"type": topic_type,
"publishers": publishers,
"subscribers": subscribers,
"num_publishers": len(publishers),
"num_subscribers": len(subscribers),
}
except Exception as e:
logger.warning(f"Error getting topic info: {e}")
return {
"topic": topic,
"exists": True,
"type": topic_type,
"error": str(e),
}
@mcp.tool
def ros_publish(
topic: str,
message: Dict[str, Any],
msg_type: Optional[str] = None,
latch: bool = False,
queue_size: int = 10,
) -> Dict[str, Any]:
"""
Publish a message to a ROS topic.
Args:
topic: The topic name to publish to.
message: The message data as a dictionary.
msg_type: Message type (e.g., "std_msgs/String"). If not provided,
will attempt to auto-detect from existing publishers.
latch: If True, the last message will be saved and sent to new subscribers.
queue_size: Size of the outgoing message queue.
Returns:
Dictionary with status of the publish operation.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize topic name
if not topic.startswith("/"):
topic = "/" + topic
# Auto-detect message type if not provided
if msg_type is None:
msg_type = get_topic_type(topic)
if msg_type is None:
return {
"success": False,
"error": f"Could not determine message type for topic '{topic}'. "
"Please provide msg_type parameter.",
}
try:
# Get message class and create message
msg_class = get_message_class(msg_type)
ros_msg = dict_to_message(message, msg_type)
# Create publisher and publish
pub = rospy.Publisher(topic, msg_class, queue_size=queue_size, latch=latch)
# Wait a moment for publisher to connect
rospy.sleep(0.1)
pub.publish(ros_msg)
return {
"success": True,
"topic": topic,
"msg_type": msg_type,
"message": message,
}
except Exception as e:
logger.error(f"Failed to publish to {topic}: {e}")
return {
"success": False,
"topic": topic,
"error": str(e),
}
@mcp.tool
def ros_subscribe_once(
topic: str,
timeout: float = 5.0,
msg_type: Optional[str] = None,
) -> Dict[str, Any]:
"""
Subscribe to a topic and get a single message.
Args:
topic: The topic name to subscribe to.
timeout: Maximum time to wait for a message in seconds.
msg_type: Message type (auto-detected if not provided).
Returns:
Dictionary containing the received message or error.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize topic name
if not topic.startswith("/"):
topic = "/" + topic
# Auto-detect message type if not provided
if msg_type is None:
msg_type = get_topic_type(topic)
if msg_type is None:
return {
"success": False,
"error": f"Could not determine message type for topic '{topic}'. "
"Topic may not exist or have no publishers.",
}
try:
msg_class = get_message_class(msg_type)
# Use wait_for_message
msg = rospy.wait_for_message(topic, msg_class, timeout=timeout)
return {
"success": True,
"topic": topic,
"msg_type": msg_type,
"message": message_to_dict(msg),
}
except rospy.ROSException as e:
return {
"success": False,
"topic": topic,
"error": f"Timeout waiting for message: {e}",
}
except Exception as e:
logger.error(f"Failed to subscribe to {topic}: {e}")
return {
"success": False,
"topic": topic,
"error": str(e),
}
@mcp.tool
def ros_echo_topic(
topic: str,
count: int = 5,
timeout: float = 10.0,
msg_type: Optional[str] = None,
) -> Dict[str, Any]:
"""
Echo multiple messages from a topic.
Args:
topic: The topic name to subscribe to.
count: Number of messages to collect (default: 5).
timeout: Maximum time to wait for all messages in seconds.
msg_type: Message type (auto-detected if not provided).
Returns:
Dictionary containing the list of received messages or error.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
import threading
# Normalize topic name
if not topic.startswith("/"):
topic = "/" + topic
# Auto-detect message type if not provided
if msg_type is None:
msg_type = get_topic_type(topic)
if msg_type is None:
return {
"success": False,
"error": f"Could not determine message type for topic '{topic}'.",
}
try:
msg_class = get_message_class(msg_type)
messages = []
lock = threading.Lock()
done = threading.Event()
def callback(msg):
with lock:
if len(messages) < count:
messages.append(message_to_dict(msg))
if len(messages) >= count:
done.set()
sub = rospy.Subscriber(topic, msg_class, callback)
try:
# Wait for messages or timeout
done.wait(timeout=timeout)
finally:
sub.unregister()
return {
"success": True,
"topic": topic,
"msg_type": msg_type,
"count_requested": count,
"count_received": len(messages),
"messages": messages,
}
except Exception as e:
logger.error(f"Failed to echo topic {topic}: {e}")
return {
"success": False,
"topic": topic,
"error": str(e),
}