"""ROS connection management singleton."""
import os
import logging
from threading import Lock, Thread
from typing import Optional
logger = logging.getLogger(__name__)
class ROSBridge:
"""
Singleton class for managing ROS node connection.
This class ensures that only one ROS node is initialized for the MCP server,
and provides thread-safe access to ROS functionality.
"""
_instance: Optional["ROSBridge"] = None
_lock = Lock()
def __new__(cls) -> "ROSBridge":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
cls._instance._node_name = "mcp_ros_server"
cls._instance._spin_thread: Optional[Thread] = None
return cls._instance
@property
def node_name(self) -> str:
"""Get the ROS node name."""
return self._node_name
@node_name.setter
def node_name(self, value: str) -> None:
"""Set the ROS node name (must be set before initialization)."""
if self._initialized:
logger.warning("Cannot change node name after initialization")
return
self._node_name = value
def initialize(self, node_name: Optional[str] = None) -> None:
"""
Initialize the ROS node.
Args:
node_name: Optional name for the ROS node.
"""
if self._initialized:
logger.debug("ROS bridge already initialized")
return
with self._lock:
if self._initialized:
return
# Import rospy here to avoid issues when ROS is not installed
try:
import rospy
except ImportError as e:
raise RuntimeError(
"rospy not found. Make sure ROS Noetic is installed and sourced."
) from e
if node_name:
self._node_name = node_name
# Check ROS_MASTER_URI
master_uri = os.environ.get("ROS_MASTER_URI", "http://localhost:11311")
logger.info(f"Connecting to ROS master at {master_uri}")
try:
# Initialize the ROS node
rospy.init_node(
self._node_name,
anonymous=True,
disable_signals=True, # Let FastMCP handle signals
)
self._initialized = True
logger.info(f"ROS node '{rospy.get_name()}' initialized successfully")
except rospy.ROSInitException as e:
raise RuntimeError(f"Failed to initialize ROS node: {e}") from e
@property
def is_connected(self) -> bool:
"""Check if connected to ROS master."""
if not self._initialized:
return False
try:
import rospy
return rospy.core.is_initialized() and not rospy.is_shutdown()
except Exception:
return False
def ensure_connected(self) -> None:
"""Ensure ROS connection is active, raise if not."""
if not self.is_connected:
raise RuntimeError(
"ROS is not connected. Check ROS_MASTER_URI and ensure roscore is running."
)
def shutdown(self) -> None:
"""Shutdown the ROS node."""
if not self._initialized:
return
with self._lock:
if not self._initialized:
return
try:
import rospy
if not rospy.is_shutdown():
logger.info("Shutting down ROS node")
rospy.signal_shutdown("MCP server shutting down")
except Exception as e:
logger.warning(f"Error during ROS shutdown: {e}")
finally:
self._initialized = False
def get_master_uri(self) -> str:
"""Get the ROS master URI."""
return os.environ.get("ROS_MASTER_URI", "http://localhost:11311")
def check_master(self) -> bool:
"""Check if ROS master is reachable."""
try:
import rospy
import rosgraph
return rosgraph.is_master_online()
except Exception:
return False