"""Utilities for ROS message serialization and deserialization."""
import logging
from typing import Any, Dict, List, Optional, Type, Union
logger = logging.getLogger(__name__)
def get_message_class(msg_type: str) -> Type:
"""
Get the ROS message class for a given message type.
Args:
msg_type: Full message type string (e.g., "std_msgs/String", "geometry_msgs/Pose")
Returns:
The message class.
Raises:
ValueError: If the message type cannot be found.
"""
try:
import roslib.message
msg_class = roslib.message.get_message_class(msg_type)
if msg_class is None:
raise ValueError(f"Message type '{msg_type}' not found")
return msg_class
except Exception as e:
raise ValueError(f"Failed to get message class for '{msg_type}': {e}") from e
def get_service_class(srv_type: str) -> Type:
"""
Get the ROS service class for a given service type.
Args:
srv_type: Full service type string (e.g., "std_srvs/Empty", "std_srvs/SetBool")
Returns:
The service class.
Raises:
ValueError: If the service type cannot be found.
"""
try:
import roslib.message
srv_class = roslib.message.get_service_class(srv_type)
if srv_class is None:
raise ValueError(f"Service type '{srv_type}' not found")
return srv_class
except Exception as e:
raise ValueError(f"Failed to get service class for '{srv_type}': {e}") from e
def message_to_dict(msg: Any) -> Any:
"""
Convert a ROS message to a JSON-serializable dictionary.
Handles nested messages, arrays, and special types like Time and Duration.
Args:
msg: A ROS message instance or primitive value.
Returns:
A JSON-serializable representation of the message.
"""
try:
import genpy
import rospy
except ImportError:
# If ROS is not available, just return the value
return msg
# Handle None
if msg is None:
return None
# Handle ROS Time and Duration
if isinstance(msg, (rospy.Time, rospy.Duration, genpy.Time, genpy.Duration)):
return {
"secs": msg.secs,
"nsecs": msg.nsecs,
}
# Handle ROS messages
if isinstance(msg, genpy.Message):
result = {}
for field, field_type in zip(msg.__slots__, msg._slot_types):
value = getattr(msg, field)
result[field] = message_to_dict(value)
return result
# Handle lists and tuples
if isinstance(msg, (list, tuple)):
return [message_to_dict(item) for item in msg]
# Handle bytes
if isinstance(msg, bytes):
try:
return msg.decode("utf-8")
except UnicodeDecodeError:
# Return as base64 for binary data
import base64
return {"_binary": base64.b64encode(msg).decode("ascii")}
# Handle numpy arrays (common in ROS)
if hasattr(msg, "tolist"):
return msg.tolist()
# Return primitive types as-is
return msg
def dict_to_message(data: Dict[str, Any], msg_type: str) -> Any:
"""
Convert a dictionary to a ROS message.
Args:
data: Dictionary with message field values.
msg_type: Full message type string.
Returns:
A ROS message instance.
Raises:
ValueError: If the message cannot be created.
"""
msg_class = get_message_class(msg_type)
return _fill_message(msg_class(), data)
def _fill_message(msg: Any, data: Dict[str, Any]) -> Any:
"""
Fill a ROS message with data from a dictionary.
Args:
msg: A ROS message instance to fill.
data: Dictionary with field values.
Returns:
The filled message.
"""
try:
import genpy
import rospy
except ImportError:
raise RuntimeError("ROS is not available")
if not isinstance(data, dict):
return data
for field, value in data.items():
if not hasattr(msg, field):
logger.warning(f"Unknown field '{field}' for message type {type(msg).__name__}")
continue
current_value = getattr(msg, field)
# Handle Time and Duration
if isinstance(current_value, (rospy.Time, genpy.Time)):
if isinstance(value, dict):
setattr(msg, field, rospy.Time(value.get("secs", 0), value.get("nsecs", 0)))
elif isinstance(value, (int, float)):
setattr(msg, field, rospy.Time.from_sec(value))
continue
if isinstance(current_value, (rospy.Duration, genpy.Duration)):
if isinstance(value, dict):
setattr(
msg, field, rospy.Duration(value.get("secs", 0), value.get("nsecs", 0))
)
elif isinstance(value, (int, float)):
setattr(msg, field, rospy.Duration.from_sec(value))
continue
# Handle nested messages
if isinstance(current_value, genpy.Message):
if isinstance(value, dict):
_fill_message(current_value, value)
continue
# Handle arrays of messages
if isinstance(current_value, list) and isinstance(value, list):
# Try to determine the element type from the slot type
slot_idx = msg.__slots__.index(field)
slot_type = msg._slot_types[slot_idx]
# Check if it's an array of messages
if slot_type.endswith("[]"):
element_type = slot_type[:-2]
if "/" in element_type:
# It's a message array
try:
element_class = get_message_class(element_type)
filled_list = []
for item in value:
if isinstance(item, dict):
new_msg = element_class()
_fill_message(new_msg, item)
filled_list.append(new_msg)
else:
filled_list.append(item)
setattr(msg, field, filled_list)
continue
except ValueError:
pass
setattr(msg, field, value)
continue
# Handle binary data
if isinstance(value, dict) and "_binary" in value:
import base64
setattr(msg, field, base64.b64decode(value["_binary"]))
continue
# Set primitive values directly
setattr(msg, field, value)
return msg
def get_topic_type(topic_name: str) -> Optional[str]:
"""
Get the message type for a topic.
Args:
topic_name: The topic name.
Returns:
The message type string, or None if the topic doesn't exist.
"""
try:
import rostopic
topic_type, _, _ = rostopic.get_topic_type(topic_name)
return topic_type
except Exception as e:
logger.warning(f"Failed to get topic type for '{topic_name}': {e}")
return None
def get_service_type(service_name: str) -> Optional[str]:
"""
Get the service type for a service.
Args:
service_name: The service name.
Returns:
The service type string, or None if the service doesn't exist.
"""
try:
import rosservice
return rosservice.get_service_type(service_name)
except Exception as e:
logger.warning(f"Failed to get service type for '{service_name}': {e}")
return None
def get_message_definition(msg_type: str) -> str:
"""
Get the full definition of a message type.
Args:
msg_type: Full message type string.
Returns:
The message definition string.
"""
try:
import rosmsg
return rosmsg.get_msg_text(msg_type)
except Exception as e:
raise ValueError(f"Failed to get message definition for '{msg_type}': {e}") from e
def get_service_definition(srv_type: str) -> str:
"""
Get the full definition of a service type.
Args:
srv_type: Full service type string.
Returns:
The service definition string.
"""
try:
import rossrv
return rossrv.get_srv_text(srv_type)
except Exception as e:
raise ValueError(f"Failed to get service definition for '{srv_type}': {e}") from e