"""ROS parameter tools for MCP server."""
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
logger = logging.getLogger(__name__)
@mcp.tool
def ros_list_params(namespace: str = "") -> List[str]:
"""
List all ROS parameters.
Args:
namespace: Optional namespace to filter parameters (e.g., "/robot").
If empty, lists all parameters.
Returns:
List of parameter names.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
try:
# Get all parameter names
params = rospy.get_param_names()
# Filter by namespace if provided
if namespace:
if not namespace.startswith("/"):
namespace = "/" + namespace
params = [p for p in params if p.startswith(namespace)]
return sorted(params)
except Exception as e:
logger.error(f"Failed to list parameters: {e}")
return []
@mcp.tool
def ros_get_param(name: str, default: Optional[Any] = None) -> Dict[str, Any]:
"""
Get a ROS parameter value.
Args:
name: The parameter name (e.g., "/robot_description", "/use_sim_time").
default: Default value to return if parameter doesn't exist.
Returns:
Dictionary containing the parameter value or error.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize parameter name
if not name.startswith("/"):
name = "/" + name
try:
if rospy.has_param(name):
value = rospy.get_param(name)
return {
"success": True,
"name": name,
"value": value,
"exists": True,
}
elif default is not None:
return {
"success": True,
"name": name,
"value": default,
"exists": False,
"used_default": True,
}
else:
return {
"success": False,
"name": name,
"exists": False,
"error": f"Parameter '{name}' does not exist",
}
except Exception as e:
logger.error(f"Failed to get parameter {name}: {e}")
return {
"success": False,
"name": name,
"error": str(e),
}
@mcp.tool
def ros_set_param(name: str, value: Any) -> Dict[str, Any]:
"""
Set a ROS parameter value.
Args:
name: The parameter name.
value: The value to set. Can be string, number, boolean, list, or dict.
Returns:
Dictionary with status of the set operation.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize parameter name
if not name.startswith("/"):
name = "/" + name
try:
rospy.set_param(name, value)
return {
"success": True,
"name": name,
"value": value,
}
except Exception as e:
logger.error(f"Failed to set parameter {name}: {e}")
return {
"success": False,
"name": name,
"error": str(e),
}
@mcp.tool
def ros_delete_param(name: str) -> Dict[str, Any]:
"""
Delete a ROS parameter.
Args:
name: The parameter name to delete.
Returns:
Dictionary with status of the delete operation.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize parameter name
if not name.startswith("/"):
name = "/" + name
try:
if not rospy.has_param(name):
return {
"success": False,
"name": name,
"error": f"Parameter '{name}' does not exist",
}
rospy.delete_param(name)
return {
"success": True,
"name": name,
"deleted": True,
}
except Exception as e:
logger.error(f"Failed to delete parameter {name}: {e}")
return {
"success": False,
"name": name,
"error": str(e),
}
@mcp.tool
def ros_has_param(name: str) -> Dict[str, Any]:
"""
Check if a ROS parameter exists.
Args:
name: The parameter name to check.
Returns:
Dictionary with existence status.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
# Normalize parameter name
if not name.startswith("/"):
name = "/" + name
try:
exists = rospy.has_param(name)
return {
"success": True,
"name": name,
"exists": exists,
}
except Exception as e:
logger.error(f"Failed to check parameter {name}: {e}")
return {
"success": False,
"name": name,
"error": str(e),
}
@mcp.tool
def ros_search_param(name: str) -> Dict[str, Any]:
"""
Search for a ROS parameter in the parameter tree.
This searches up the namespace hierarchy for the parameter,
similar to how ROS nodes resolve parameters.
Args:
name: The parameter name to search for (without leading slash).
Returns:
Dictionary with the found parameter path or error.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
try:
result = rospy.search_param(name)
if result:
return {
"success": True,
"searched_for": name,
"found_at": result,
"value": rospy.get_param(result),
}
else:
return {
"success": False,
"searched_for": name,
"found": False,
"error": f"Parameter '{name}' not found in parameter tree",
}
except Exception as e:
logger.error(f"Failed to search for parameter {name}: {e}")
return {
"success": False,
"searched_for": name,
"error": str(e),
}