"""ROS node tools for MCP server."""
import logging
from typing import Any, Dict, List
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
logger = logging.getLogger(__name__)
@mcp.tool
def ros_list_nodes() -> List[str]:
"""
List all active ROS nodes.
Returns:
List of node names.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rosnode
try:
nodes = rosnode.get_node_names()
return sorted(nodes)
except Exception as e:
logger.error(f"Failed to list nodes: {e}")
return []
@mcp.tool
def ros_get_node_info(node: str) -> Dict[str, Any]:
"""
Get detailed information about a ROS node.
Args:
node: The node name (e.g., "/rosout", "/robot_state_publisher").
Returns:
Dictionary containing node information including publications,
subscriptions, and services.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
import rosnode
# Normalize node name
if not node.startswith("/"):
node = "/" + node
try:
# Check if node exists
nodes = rosnode.get_node_names()
if node not in nodes:
return {
"node": node,
"exists": False,
"error": f"Node '{node}' not found",
}
# Get node info using master API
master = rospy.get_master()
state = master.getSystemState()
publications = []
subscriptions = []
services = []
if state[0] == 1: # Success
pub_list, sub_list, srv_list = state[2]
# Find publications for this node
for topic, nodes_list in pub_list:
if node in nodes_list:
publications.append(topic)
# Find subscriptions for this node
for topic, nodes_list in sub_list:
if node in nodes_list:
subscriptions.append(topic)
# Find services for this node
for service, nodes_list in srv_list:
if node in nodes_list:
services.append(service)
# Get node URI
try:
node_uri = rosnode.get_api_uri(master, node)
except Exception:
node_uri = None
# Try to get machine info
machine = None
try:
code, msg, machine = master.lookupNode(node)
if code != 1:
machine = None
except Exception:
pass
return {
"node": node,
"exists": True,
"uri": node_uri,
"machine": machine,
"publications": sorted(publications),
"subscriptions": sorted(subscriptions),
"services": sorted(services),
"num_publications": len(publications),
"num_subscriptions": len(subscriptions),
"num_services": len(services),
}
except Exception as e:
logger.error(f"Failed to get node info for {node}: {e}")
return {
"node": node,
"exists": False,
"error": str(e),
}
@mcp.tool
def ros_ping_node(node: str, max_count: int = 3) -> Dict[str, Any]:
"""
Ping a ROS node to check if it's responsive.
Args:
node: The node name to ping.
max_count: Maximum number of ping attempts.
Returns:
Dictionary with ping results.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rosnode
# Normalize node name
if not node.startswith("/"):
node = "/" + node
try:
# Check if node exists first
nodes = rosnode.get_node_names()
if node not in nodes:
return {
"node": node,
"reachable": False,
"error": f"Node '{node}' not found",
}
# Try to ping the node
success = rosnode.rosnode_ping(node, max_count=max_count)
return {
"node": node,
"reachable": success,
"ping_count": max_count,
}
except Exception as e:
logger.error(f"Failed to ping node {node}: {e}")
return {
"node": node,
"reachable": False,
"error": str(e),
}
@mcp.tool
def ros_get_master_info() -> Dict[str, Any]:
"""
Get information about the ROS master.
Returns:
Dictionary containing ROS master information.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
import os
try:
master = rospy.get_master()
# Get master URI
master_uri = os.environ.get("ROS_MASTER_URI", "http://localhost:11311")
# Get system state
state = master.getSystemState()
num_publishers = 0
num_subscribers = 0
num_services = 0
topics = set()
if state[0] == 1:
pub_list, sub_list, srv_list = state[2]
num_publishers = sum(len(nodes) for _, nodes in pub_list)
num_subscribers = sum(len(nodes) for _, nodes in sub_list)
num_services = len(srv_list)
topics = set(topic for topic, _ in pub_list) | set(topic for topic, _ in sub_list)
# Get parameter count
try:
params = rospy.get_param_names()
num_params = len(params)
except Exception:
num_params = 0
return {
"success": True,
"master_uri": master_uri,
"is_online": True,
"num_topics": len(topics),
"num_publishers": num_publishers,
"num_subscribers": num_subscribers,
"num_services": num_services,
"num_parameters": num_params,
}
except Exception as e:
logger.error(f"Failed to get master info: {e}")
return {
"success": False,
"master_uri": os.environ.get("ROS_MASTER_URI", "http://localhost:11311"),
"is_online": False,
"error": str(e),
}