"""ROS MCP resources for exposing ROS system state."""
import json
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
from ..utils.message_utils import message_to_dict, get_topic_type, get_message_class
logger = logging.getLogger(__name__)
@mcp.resource("ros://system/status")
def ros_system_status() -> str:
"""
Get the current ROS system status.
Returns connectivity information and basic statistics about the ROS system.
"""
bridge = ROSBridge()
try:
import rospy
import os
master_uri = os.environ.get("ROS_MASTER_URI", "http://localhost:11311")
is_connected = bridge.is_connected
if not is_connected:
return json.dumps({
"connected": False,
"master_uri": master_uri,
"error": "Not connected to ROS master",
}, indent=2)
# Get basic stats
try:
topics = rospy.get_published_topics()
params = rospy.get_param_names()
import rosnode
nodes = rosnode.get_node_names()
return json.dumps({
"connected": True,
"master_uri": master_uri,
"node_name": rospy.get_name(),
"stats": {
"topics": len(topics),
"nodes": len(nodes),
"parameters": len(params),
},
}, indent=2)
except Exception as e:
return json.dumps({
"connected": True,
"master_uri": master_uri,
"error": f"Failed to get stats: {e}",
}, indent=2)
except Exception as e:
return json.dumps({
"connected": False,
"error": str(e),
}, indent=2)
@mcp.resource("ros://topics")
def ros_topics_resource() -> str:
"""
List of all active ROS topics with their message types.
"""
bridge = ROSBridge()
try:
bridge.ensure_connected()
import rospy
topics = rospy.get_published_topics()
result = [{"name": name, "type": msg_type} for name, msg_type in sorted(topics)]
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource("ros://nodes")
def ros_nodes_resource() -> str:
"""
List of all active ROS nodes.
"""
bridge = ROSBridge()
try:
bridge.ensure_connected()
import rosnode
nodes = rosnode.get_node_names()
return json.dumps(sorted(nodes), indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource("ros://services")
def ros_services_resource() -> str:
"""
List of all available ROS services with their types.
"""
bridge = ROSBridge()
try:
bridge.ensure_connected()
import rosservice
services = rosservice.get_service_list()
result = []
for service in sorted(services):
try:
srv_type = rosservice.get_service_type(service)
result.append({"name": service, "type": srv_type or "unknown"})
except Exception:
result.append({"name": service, "type": "unknown"})
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource("ros://params")
def ros_params_resource() -> str:
"""
List of all ROS parameters.
"""
bridge = ROSBridge()
try:
bridge.ensure_connected()
import rospy
params = rospy.get_param_names()
return json.dumps(sorted(params), indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp.resource("ros://topic/{topic_path}")
def ros_topic_info_resource(topic_path: str) -> str:
"""
Get information about a specific topic including latest message.
The topic_path should not include the leading slash (e.g., "cmd_vel" not "/cmd_vel").
"""
bridge = ROSBridge()
# Reconstruct topic name with leading slash
topic = "/" + topic_path
try:
bridge.ensure_connected()
import rospy
import rostopic
# Get topic type
topic_type = get_topic_type(topic)
if topic_type is None:
return json.dumps({
"topic": topic,
"exists": False,
"error": "Topic not found or has no publishers",
}, indent=2)
# Get publisher/subscriber info
master = rospy.get_master()
state = master.getSystemState()
publishers = []
subscribers = []
if state[0] == 1:
pub_list, sub_list, _ = state[2]
for t, nodes in pub_list:
if t == topic:
publishers = nodes
break
for t, nodes in sub_list:
if t == topic:
subscribers = nodes
break
# Try to get the latest message
latest_message = None
try:
msg_class = get_message_class(topic_type)
msg = rospy.wait_for_message(topic, msg_class, timeout=1.0)
latest_message = message_to_dict(msg)
except Exception:
latest_message = None
return json.dumps({
"topic": topic,
"exists": True,
"type": topic_type,
"publishers": publishers,
"subscribers": subscribers,
"latest_message": latest_message,
}, indent=2)
except Exception as e:
return json.dumps({
"topic": topic,
"error": str(e),
}, indent=2)
@mcp.resource("ros://node/{node_path}")
def ros_node_info_resource(node_path: str) -> str:
"""
Get information about a specific ROS node.
The node_path should not include the leading slash.
"""
bridge = ROSBridge()
# Reconstruct node name with leading slash
node = "/" + node_path
try:
bridge.ensure_connected()
import rospy
import rosnode
# Check if node exists
nodes = rosnode.get_node_names()
if node not in nodes:
return json.dumps({
"node": node,
"exists": False,
"error": "Node not found",
}, indent=2)
# Get node info
master = rospy.get_master()
state = master.getSystemState()
publications = []
subscriptions = []
services = []
if state[0] == 1:
pub_list, sub_list, srv_list = state[2]
for topic, nodes_list in pub_list:
if node in nodes_list:
publications.append(topic)
for topic, nodes_list in sub_list:
if node in nodes_list:
subscriptions.append(topic)
for service, nodes_list in srv_list:
if node in nodes_list:
services.append(service)
return json.dumps({
"node": node,
"exists": True,
"publications": sorted(publications),
"subscriptions": sorted(subscriptions),
"services": sorted(services),
}, indent=2)
except Exception as e:
return json.dumps({
"node": node,
"error": str(e),
}, indent=2)
@mcp.resource("ros://param/{param_path}")
def ros_param_value_resource(param_path: str) -> str:
"""
Get the value of a specific ROS parameter.
The param_path should not include the leading slash.
"""
bridge = ROSBridge()
# Reconstruct parameter name with leading slash
param = "/" + param_path
try:
bridge.ensure_connected()
import rospy
if rospy.has_param(param):
value = rospy.get_param(param)
return json.dumps({
"name": param,
"exists": True,
"value": value,
}, indent=2)
else:
return json.dumps({
"name": param,
"exists": False,
"error": "Parameter not found",
}, indent=2)
except Exception as e:
return json.dumps({
"name": param,
"error": str(e),
}, indent=2)
@mcp.resource("ros://tf/frames")
def ros_tf_frames_resource() -> str:
"""
List of all TF coordinate frames.
"""
bridge = ROSBridge()
try:
bridge.ensure_connected()
import tf2_ros
import rospy
tf_buffer = tf2_ros.Buffer()
tf_listener = tf2_ros.TransformListener(tf_buffer)
# Wait for transforms
rospy.sleep(0.5)
# Get frames
frames_yaml = tf_buffer.all_frames_as_yaml()
# Parse frame names
frames = []
for line in frames_yaml.split("\n"):
line = line.strip()
if line.endswith(":"):
frame_name = line[:-1].strip()
if frame_name:
frames.append(frame_name)
return json.dumps({
"frames": sorted(set(frames)),
"count": len(set(frames)),
}, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)