"""ROS service tools for MCP server."""
import logging
from typing import Any, Dict, List, Optional
from ..server import mcp
from ..utils.ros_bridge import ROSBridge
from ..utils.message_utils import (
message_to_dict,
get_service_class,
get_service_type,
)
logger = logging.getLogger(__name__)
@mcp.tool
def ros_list_services() -> List[Dict[str, str]]:
"""
List all available ROS services.
Returns a list of services with their types.
Returns:
List of dictionaries containing 'name' and 'type' for each service.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rosservice
try:
services = rosservice.get_service_list()
result = []
for service in sorted(services):
try:
srv_type = rosservice.get_service_type(service)
result.append({"name": service, "type": srv_type or "unknown"})
except Exception:
result.append({"name": service, "type": "unknown"})
return result
except Exception as e:
logger.error(f"Failed to list services: {e}")
return []
@mcp.tool
def ros_get_service_info(service: str) -> Dict[str, Any]:
"""
Get detailed information about a ROS service.
Args:
service: The service name (e.g., "/rosout/get_loggers").
Returns:
Dictionary containing service information including type and node.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rosservice
import rospy
# Normalize service name
if not service.startswith("/"):
service = "/" + service
try:
# Check if service exists
services = rosservice.get_service_list()
if service not in services:
return {
"service": service,
"exists": False,
"error": f"Service '{service}' not found",
}
# Get service type
srv_type = rosservice.get_service_type(service)
# Get service node
srv_node = rosservice.get_service_node(service)
# Get service args (request fields)
try:
srv_args = rosservice.get_service_args(service)
except Exception:
srv_args = None
return {
"service": service,
"exists": True,
"type": srv_type,
"node": srv_node,
"args": srv_args,
}
except Exception as e:
logger.error(f"Failed to get service info for {service}: {e}")
return {
"service": service,
"exists": False,
"error": str(e),
}
@mcp.tool
def ros_call_service(
service: str,
request: Optional[Dict[str, Any]] = None,
srv_type: Optional[str] = None,
timeout: float = 5.0,
) -> Dict[str, Any]:
"""
Call a ROS service.
Args:
service: The service name to call.
request: The service request data as a dictionary. Use empty dict or None
for services with no request parameters.
srv_type: Service type (e.g., "std_srvs/Empty"). Auto-detected if not provided.
timeout: Maximum time to wait for service in seconds.
Returns:
Dictionary containing the service response or error.
"""
bridge = ROSBridge()
bridge.ensure_connected()
import rospy
import rosservice
# Normalize service name
if not service.startswith("/"):
service = "/" + service
# Default empty request
if request is None:
request = {}
# Auto-detect service type if not provided
if srv_type is None:
srv_type = get_service_type(service)
if srv_type is None:
return {
"success": False,
"error": f"Could not determine service type for '{service}'. "
"Service may not exist.",
}
try:
# Get service class
srv_class = get_service_class(srv_type)
# Wait for service to be available
rospy.wait_for_service(service, timeout=timeout)
# Create service proxy
srv_proxy = rospy.ServiceProxy(service, srv_class)
# Create request message
request_class = srv_class._request_class
req_msg = request_class()
# Fill request fields
for field, value in request.items():
if hasattr(req_msg, field):
setattr(req_msg, field, value)
else:
logger.warning(f"Unknown request field '{field}' for service {service}")
# Call service
response = srv_proxy(req_msg)
return {
"success": True,
"service": service,
"srv_type": srv_type,
"response": message_to_dict(response),
}
except rospy.ServiceException as e:
logger.error(f"Service call failed: {e}")
return {
"success": False,
"service": service,
"error": f"Service call failed: {e}",
}
except rospy.ROSException as e:
return {
"success": False,
"service": service,
"error": f"Service not available: {e}",
}
except Exception as e:
logger.error(f"Failed to call service {service}: {e}")
return {
"success": False,
"service": service,
"error": str(e),
}