"""ROS message introspection tools for MCP server."""
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
from ..utils.message_utils import get_message_definition, get_service_definition
logger = logging.getLogger(__name__)
@mcp.tool
def ros_show_msg(msg_type: str) -> Dict[str, Any]:
"""
Show the definition of a ROS message type.
Args:
msg_type: Full message type string (e.g., "std_msgs/String",
"geometry_msgs/Pose", "sensor_msgs/Image").
Returns:
Dictionary containing the message definition and field information.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
import rosmsg
# Get full message definition
definition = get_message_definition(msg_type)
# Parse fields from definition
fields = []
for line in definition.split("\n"):
line = line.strip()
if line and not line.startswith("#") and not line.startswith("="):
parts = line.split()
if len(parts) >= 2:
field_type = parts[0]
field_name = parts[1]
# Handle default values
default = None
if "=" in line:
default = line.split("=", 1)[1].strip()
fields.append({
"type": field_type,
"name": field_name,
"default": default,
})
return {
"success": True,
"msg_type": msg_type,
"definition": definition,
"fields": fields,
}
except Exception as e:
logger.error(f"Failed to get message definition for {msg_type}: {e}")
return {
"success": False,
"msg_type": msg_type,
"error": str(e),
}
@mcp.tool
def ros_show_srv(srv_type: str) -> Dict[str, Any]:
"""
Show the definition of a ROS service type.
Args:
srv_type: Full service type string (e.g., "std_srvs/Empty",
"std_srvs/SetBool", "nav_msgs/GetPlan").
Returns:
Dictionary containing the service definition with request and response.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
# Get full service definition
definition = get_service_definition(srv_type)
# Split into request and response parts
parts = definition.split("---")
request_def = parts[0].strip() if len(parts) > 0 else ""
response_def = parts[1].strip() if len(parts) > 1 else ""
def parse_fields(text: str) -> List[Dict[str, Any]]:
fields = []
for line in text.split("\n"):
line = line.strip()
if line and not line.startswith("#"):
parts = line.split()
if len(parts) >= 2:
fields.append({
"type": parts[0],
"name": parts[1],
})
return fields
return {
"success": True,
"srv_type": srv_type,
"definition": definition,
"request": {
"definition": request_def,
"fields": parse_fields(request_def),
},
"response": {
"definition": response_def,
"fields": parse_fields(response_def),
},
}
except Exception as e:
logger.error(f"Failed to get service definition for {srv_type}: {e}")
return {
"success": False,
"srv_type": srv_type,
"error": str(e),
}
@mcp.tool
def ros_list_msg_types(package: str = "") -> Dict[str, Any]:
"""
List available ROS message types.
Args:
package: Optional package name to filter by (e.g., "std_msgs", "geometry_msgs").
If empty, lists messages from all packages.
Returns:
Dictionary containing list of message types.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
import rosmsg
if package:
# List messages for specific package
try:
messages = rosmsg.list_msgs(package)
return {
"success": True,
"package": package,
"messages": sorted(messages),
"count": len(messages),
}
except Exception as e:
return {
"success": False,
"package": package,
"error": f"Package '{package}' not found or has no messages: {e}",
}
else:
# List all available packages with messages
packages = rosmsg.list_packages()
all_messages = []
for pkg in packages:
try:
messages = rosmsg.list_msgs(pkg)
all_messages.extend([f"{pkg}/{msg}" for msg in messages])
except Exception:
continue
return {
"success": True,
"messages": sorted(all_messages),
"count": len(all_messages),
"packages": sorted(packages),
}
except Exception as e:
logger.error(f"Failed to list message types: {e}")
return {
"success": False,
"error": str(e),
}
@mcp.tool
def ros_list_srv_types(package: str = "") -> Dict[str, Any]:
"""
List available ROS service types.
Args:
package: Optional package name to filter by (e.g., "std_srvs").
If empty, lists services from all packages.
Returns:
Dictionary containing list of service types.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
import rossrv
if package:
# List services for specific package
try:
services = rossrv.list_srvs(package)
return {
"success": True,
"package": package,
"services": sorted(services),
"count": len(services),
}
except Exception as e:
return {
"success": False,
"package": package,
"error": f"Package '{package}' not found or has no services: {e}",
}
else:
# List all available packages with services
packages = rossrv.list_packages()
all_services = []
for pkg in packages:
try:
services = rossrv.list_srvs(pkg)
all_services.extend([f"{pkg}/{srv}" for srv in services])
except Exception:
continue
return {
"success": True,
"services": sorted(all_services),
"count": len(all_services),
"packages": sorted(packages),
}
except Exception as e:
logger.error(f"Failed to list service types: {e}")
return {
"success": False,
"error": str(e),
}
@mcp.tool
def ros_msg_md5(msg_type: str) -> Dict[str, Any]:
"""
Get the MD5 sum of a ROS message type.
The MD5 sum is used by ROS to verify message compatibility
between publishers and subscribers.
Args:
msg_type: Full message type string.
Returns:
Dictionary containing the MD5 sum.
"""
bridge = ROSBridge()
bridge.ensure_connected()
try:
from ..utils.message_utils import get_message_class
msg_class = get_message_class(msg_type)
md5sum = msg_class._md5sum
return {
"success": True,
"msg_type": msg_type,
"md5sum": md5sum,
}
except Exception as e:
logger.error(f"Failed to get MD5 for {msg_type}: {e}")
return {
"success": False,
"msg_type": msg_type,
"error": str(e),
}