Skip to main content
Glama

ROS MCP Server

Apache 2.0
744
  • Apple
  • Linux
server.py59.7 kB
import argparse import io import json import os import time from typing import Any, Dict, List, Optional, Union from fastmcp import FastMCP from fastmcp.utilities.types import Image from PIL import Image as PILImage from utils.config_utils import get_robot_specifications, parse_robot_config from utils.network_utils import ping_ip_and_port from utils.websocket_manager import WebSocketManager, parse_image, parse_json # ROS bridge connection settings ROSBRIDGE_IP = "127.0.0.1" # Default is localhost. Replace with your local IPor set using the LLM. ROSBRIDGE_PORT = ( 9090 # Rosbridge default is 9090. Replace with your rosbridge port or set using the LLM. ) # MCP transport settings MCP_TRANSPORT = os.getenv("MCP_TRANSPORT", "stdio").lower() # Default is stdio. # MCP connection settings (streamable-http) MCP_HOST = os.getenv( "MCP_HOST", "127.0.0.1" ) # Default is localhost. Replace with the address of your remote MCP server. # MCP port settings (default=9000) MCP_PORT = int( os.getenv("MCP_PORT", "9000") ) # Default is 9000. Replace with the port of your remote MCP server. # Initialize MCP server and WebSocket manager mcp = FastMCP("ros-mcp-server") ws_manager = WebSocketManager( ROSBRIDGE_IP, ROSBRIDGE_PORT, default_timeout=5.0 ) # Increased default timeout for ROS operations @mcp.tool(description=("Get robot configuration from YAML file.")) def get_robot_config(name: str) -> dict: """ Get the robot configuration from the YAML file for connecting to the robot and knowing its capabilities. Returns: dict: The robot configuration. """ robot_config = parse_robot_config(name) if len(robot_config) > 1: return { "error": f"Multiple configurations found for robot '{name}'. Please specify a more precise name." } elif not robot_config: return { "error": f"No configuration found for robot '{name}'. Please check the name and try again. Or you can set the IP/port manually using the 'connect_to_robot' tool." } return {"robot_config": robot_config} @mcp.tool( description=("List all available robot specifications that can be used with get_robot_config.") ) def list_verified_robot_specifications() -> dict: """ Get a list of all available robot specification files. Returns: dict: List of available robot names that can be used with get_robot_config. """ return get_robot_specifications() @mcp.tool( description=( "After getting the robot config, connect to the robot by setting the IP/port and testing connectivity." ) ) def connect_to_robot( ip: Optional[str] = None, port: Optional[Union[int, str]] = None, ping_timeout: float = 2.0, port_timeout: float = 2.0, ) -> dict: """ Connect to a robot by setting the IP and port for the WebSocket connection, then testing connectivity. Args: ip (Optional[str]): The IP address of the rosbridge server. Defaults to "127.0.0.1" (localhost). port (Optional[int]): The port number of the rosbridge server. Defaults to 9090. ping_timeout (float): Timeout for ping in seconds. Default = 2.0. port_timeout (float): Timeout for port check in seconds. Default = 2.0. Returns: dict: Connection status with ping and port check results. """ # Set default values if None actual_ip = ip if ip is not None else "127.0.0.1" actual_port = int(port) if port is not None else 9090 # Set the IP and port ws_manager.set_ip(actual_ip, actual_port) # Test connectivity ping_result = ping_ip_and_port(actual_ip, actual_port, ping_timeout, port_timeout) # Combine the results return { "message": f"WebSocket IP set to {actual_ip}:{actual_port}", "connectivity_test": ping_result, } @mcp.tool(description="Detect the ROS version and distribution via rosbridge.") def detect_ros_version() -> dict: """ Detects the ROS version and distro via rosbridge WebSocket. Returns: dict: {'version': <version or '1'>, 'distro': <distro>} or error info. """ # Try ROS2 detection ros2_request = { "op": "call_service", "id": "ros2_version_check", "service": "/rosapi/get_ros_version", "args": {}, } with ws_manager: response = ws_manager.request(ros2_request) values = response.get("values") if response else None if isinstance(values, dict) and "version" in values: return {"version": values.get("version"), "distro": values.get("distro")} # Fallback to ROS1 detection ros1_request = { "op": "call_service", "id": "ros1_distro_check", "service": "/rosapi/get_param", "args": {"name": "/rosdistro"}, } response = ws_manager.request(ros1_request) value = response.get("values") if response else None if value: distro = value.get("value") if isinstance(value, dict) else value distro_clean = str(distro).strip('"').replace("\\n", "").replace("\n", "") return {"version": "1", "distro": distro_clean} return {"error": "Could not detect ROS version"} @mcp.tool(description=("Fetch available topics from the ROS bridge.\nExample:\nget_topics()")) def get_topics() -> dict: """ Fetch available topics from the ROS bridge. Returns: dict: Contains two lists - 'topics' and 'types', or a message string if no topics are found. """ # rosbridge service call to get topic list message = { "op": "call_service", "service": "/rosapi/topics", "type": "rosapi/Topics", "id": "get_topics_request_1", } # Request topic list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return topic info if present if response and "values" in response: return response["values"] else: return {"warning": "No topics found"} @mcp.tool( description=("Get the message type for a specific topic.\nExample:\nget_topic_type('/cmd_vel')") ) def get_topic_type(topic: str) -> dict: """ Get the message type for a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains the 'type' field with the message type, or an error message if topic doesn't exist. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get topic type message = { "op": "call_service", "service": "/rosapi/topic_type", "type": "rosapi/TopicType", "args": {"topic": topic}, "id": f"get_topic_type_request_{topic.replace('/', '_')}", } # Request topic type from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return topic type if present if response and "values" in response: topic_type = response["values"].get("type", "") if topic_type: return {"topic": topic, "type": topic_type} else: return {"error": f"Topic {topic} does not exist or has no type"} else: return {"error": f"Failed to get type for topic {topic}"} @mcp.tool( description=( "Get the complete structure/definition of a message type.\n" "Example:\n" "get_message_details('geometry_msgs/Twist')" ) ) def get_message_details(message_type: str) -> dict: """ Get the complete structure/definition of a message type. Args: message_type (str): The message type (e.g., 'geometry_msgs/Twist') Returns: dict: Contains the message structure with field names and types, or an error message if the message type doesn't exist. """ # Validate input if not message_type or not message_type.strip(): return {"error": "Message type cannot be empty"} # rosbridge service call to get message details message = { "op": "call_service", "service": "/rosapi/message_details", "type": "rosapi/MessageDetails", "args": {"type": message_type}, "id": f"get_message_details_request_{message_type.replace('/', '_')}", } # Request message details from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return message structure if present if response and "values" in response: typedefs = response["values"].get("typedefs", []) if typedefs: # Parse the structure into a more readable format structure = {} for typedef in typedefs: type_name = typedef.get("type", message_type) field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype structure[type_name] = {"fields": fields, "field_count": len(fields)} return {"message_type": message_type, "structure": structure} else: return {"error": f"Message type {message_type} not found or has no definition"} else: return {"error": f"Failed to get details for message type {message_type}"} @mcp.tool( description=( "Get list of nodes that are publishing to a specific topic.\n" "Example:\n" "get_publishers_for_topic('/cmd_vel')" ) ) def get_publishers_for_topic(topic: str) -> dict: """ Get list of nodes that are publishing to a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains list of publisher node names, or a message if no publishers found. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get publishers message = { "op": "call_service", "service": "/rosapi/publishers", "type": "rosapi/Publishers", "args": {"topic": topic}, "id": f"get_publishers_for_topic_request_{topic.replace('/', '_')}", } # Request publishers from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return publishers if present if response and "values" in response: publishers = response["values"].get("publishers", []) return {"topic": topic, "publishers": publishers, "publisher_count": len(publishers)} else: return {"error": f"Failed to get publishers for topic {topic}"} @mcp.tool( description=( "Get list of nodes that are subscribed to a specific topic.\n" "Example:\n" "get_subscribers_for_topic('/cmd_vel')" ) ) def get_subscribers_for_topic(topic: str) -> dict: """ Get list of nodes that are subscribed to a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains list of subscriber node names, or a message if no subscribers found. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get subscribers message = { "op": "call_service", "service": "/rosapi/subscribers", "type": "rosapi/Subscribers", "args": {"topic": topic}, "id": f"get_subscribers_for_topic_request_{topic.replace('/', '_')}", } # Request subscribers from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return subscribers if present if response and "values" in response: subscribers = response["values"].get("subscribers", []) return {"topic": topic, "subscribers": subscribers, "subscriber_count": len(subscribers)} else: return {"error": f"Failed to get subscribers for topic {topic}"} @mcp.tool( description=( "Get comprehensive information about all ROS topics including publishers, subscribers, and message types. Note that this may take time to execute when three are a large number of topics since it queries each one by one under the hood. \n" "Example:\n" "inspect_all_topics()" ) ) def inspect_all_topics() -> dict: """ Get comprehensive information about all ROS topics including publishers, subscribers, and message types. Returns: dict: Contains detailed information about all topics including: - Topic names and message types - Publishers for each topic - Subscribers for each topic - Connection counts and statistics """ # First get all topics topics_message = { "op": "call_service", "service": "/rosapi/topics", "type": "rosapi/Topics", "args": {}, "id": "inspect_all_topics_request_1", } with ws_manager: topics_response = ws_manager.request(topics_message) if not topics_response or "values" not in topics_response: return {"error": "Failed to get topics list"} topics = topics_response["values"].get("topics", []) types = topics_response["values"].get("types", []) topic_details = {} # Get details for each topic topic_errors = [] for i, topic in enumerate(topics): # Get topic type topic_type = types[i] if i < len(types) else "unknown" # Get publishers for this topic publishers_message = { "op": "call_service", "service": "/rosapi/publishers", "type": "rosapi/Publishers", "args": {"topic": topic}, "id": f"get_publishers_{topic.replace('/', '_')}", } publishers_response = ws_manager.request(publishers_message) publishers = [] if publishers_response and "values" in publishers_response: publishers = publishers_response["values"].get("publishers", []) elif publishers_response and "error" in publishers_response: topic_errors.append(f"Topic {topic} publishers: {publishers_response['error']}") # Get subscribers for this topic subscribers_message = { "op": "call_service", "service": "/rosapi/subscribers", "type": "rosapi/Subscribers", "args": {"topic": topic}, "id": f"get_subscribers_{topic.replace('/', '_')}", } subscribers_response = ws_manager.request(subscribers_message) subscribers = [] if subscribers_response and "values" in subscribers_response: subscribers = subscribers_response["values"].get("subscribers", []) elif subscribers_response and "error" in subscribers_response: topic_errors.append(f"Topic {topic} subscribers: {subscribers_response['error']}") topic_details[topic] = { "type": topic_type, "publishers": publishers, "subscribers": subscribers, "publisher_count": len(publishers), "subscriber_count": len(subscribers), } return { "total_topics": len(topics), "topics": topic_details, "topic_errors": topic_errors, # Include any errors encountered during inspection } @mcp.tool( description=( "Subscribe to a ROS topic and return the first message received.\n" "Example:\n" "subscribe_once(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped')\n" "subscribe_once(topic='/slow_topic', msg_type='my_package/SlowMsg', timeout=None) # Specify timeout only if topic publishes infrequently\n" "subscribe_once(topic='/high_rate_topic', msg_type='sensor_msgs/Image', timeout=None, queue_length=5, throttle_rate_ms=100) # Control message buffering and rate" ) ) def subscribe_once( topic: str = "", msg_type: str = "", timeout: Optional[float] = None, queue_length: Optional[int] = None, throttle_rate_ms: Optional[int] = None, ) -> dict: """ Subscribe to a given ROS topic via rosbridge and return the first message received. Args: topic (str): The ROS topic name (e.g., "/cmd_vel", "/joint_states"). msg_type (str): The ROS message type (e.g., "geometry_msgs/Twist"). timeout (Optional[float]): Timeout in seconds. If None, uses the default timeout. queue_length (Optional[int]): How many messages to buffer before dropping old ones. Must be ≥ 1. throttle_rate_ms (Optional[int]): Minimum interval between messages in milliseconds. Must be ≥ 0. Returns: dict: - {"msg": <parsed ROS message>} if successful - {"error": "<error message>"} if subscription or timeout fails """ # Validate critical args before attempting subscription if not topic or not msg_type: return {"error": "Missing required arguments: topic and msg_type must be provided."} # Validate optional parameters if queue_length is not None and (not isinstance(queue_length, int) or queue_length < 1): return {"error": "queue_length must be an integer ≥ 1"} if throttle_rate_ms is not None and ( not isinstance(throttle_rate_ms, int) or throttle_rate_ms < 0 ): return {"error": "throttle_rate_ms must be an integer ≥ 0"} # Construct the rosbridge subscribe message subscribe_msg: dict = { "op": "subscribe", "topic": topic, "type": msg_type, } # Add optional parameters if provided if queue_length is not None: subscribe_msg["queue_length"] = queue_length if throttle_rate_ms is not None: subscribe_msg["throttle_rate"] = throttle_rate_ms # Subscribe and wait for the first message with ws_manager: # Send subscription request send_error = ws_manager.send(subscribe_msg) if send_error: return {"error": f"Failed to subscribe: {send_error}"} # Use default timeout if none specified actual_timeout = timeout if timeout is not None else ws_manager.default_timeout # Loop until we receive the first message or timeout end_time = time.time() + actual_timeout while time.time() < end_time: response = ws_manager.receive(timeout=0.5) # non-blocking small timeout if response is None: continue # idle timeout: no frame this tick if "Image" in msg_type: msg_data = parse_image(response) else: msg_data = parse_json(response) if not msg_data: continue # non-JSON or empty # Check for status errors from rosbridge if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Rosbridge error: {msg_data.get('msg', 'Unknown error')}"} # Check for the first published message if msg_data.get("op") == "publish" and msg_data.get("topic") == topic: # Unsubscribe before returning the message unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) if "Image" in msg_type: return { "message": "Image received successfully and saved in the MCP server. Run the 'analyze_previously_received_image' tool to analyze it" } else: return {"msg": msg_data.get("msg", {})} # Timeout - unsubscribe and return error unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) return {"error": "Timeout waiting for message from topic"} @mcp.tool( description=( "Publish a single message to a ROS topic.\n" "Example:\n" "publish_once(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', msg={'linear': {'x': 1.0}})" ) ) def publish_once(topic: str = "", msg_type: str = "", msg: dict = {}) -> dict: """ Publish a single message to a ROS topic via rosbridge. Args: topic (str): ROS topic name (e.g., "/cmd_vel") msg_type (str): ROS message type (e.g., "geometry_msgs/Twist") msg (dict): Message payload as a dictionary Returns: dict: - {"success": True} if sent without errors - {"error": "<error message>"} if connection/send failed - If rosbridge responds (usually it doesn’t for publish), parsed JSON or error info """ # Validate critical args before attempting publish if not topic or not msg_type or msg == {}: return { "error": "Missing required arguments: topic, msg_type, and msg must all be provided." } # Use proper advertise → publish → unadvertise pattern with ws_manager: # 1. Advertise the topic advertise_msg = {"op": "advertise", "topic": topic, "type": msg_type} send_error = ws_manager.send(advertise_msg) if send_error: return {"error": f"Failed to advertise topic: {send_error}"} # Check for advertise response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Advertise failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for advertise # 2. Publish the message publish_msg = {"op": "publish", "topic": topic, "msg": msg} send_error = ws_manager.send(publish_msg) if send_error: # Try to unadvertise even if publish failed ws_manager.send({"op": "unadvertise", "topic": topic}) return {"error": f"Failed to publish message: {send_error}"} # Check for publish response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": # Unadvertise before returning error ws_manager.send({"op": "unadvertise", "topic": topic}) return {"error": f"Publish failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for publish # 3. Unadvertise the topic unadvertise_msg = {"op": "unadvertise", "topic": topic} ws_manager.send(unadvertise_msg) return { "success": True, "note": "Message published using advertise → publish → unadvertise pattern", } @mcp.tool( description=( "Subscribe to a topic for a duration and collect messages.\n" "Example:\n" "subscribe_for_duration(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', duration=5, max_messages=10)\n" "subscribe_for_duration(topic='/high_rate_topic', msg_type='sensor_msgs/Image', duration=10, queue_length=5, throttle_rate_ms=100) # Control message buffering and rate" ) ) def subscribe_for_duration( topic: str = "", msg_type: str = "", duration: float = 5.0, max_messages: int = 100, queue_length: Optional[int] = None, throttle_rate_ms: Optional[int] = None, ) -> dict: """ Subscribe to a ROS topic via rosbridge for a fixed duration and collect messages. Args: topic (str): ROS topic name (e.g. "/cmd_vel", "/joint_states") msg_type (str): ROS message type (e.g. "geometry_msgs/Twist") duration (float): How long (seconds) to listen for messages max_messages (int): Maximum number of messages to collect before stopping queue_length (Optional[int]): How many messages to buffer before dropping old ones. Must be ≥ 1. throttle_rate_ms (Optional[int]): Minimum interval between messages in milliseconds. Must be ≥ 0. Returns: dict: { "topic": topic_name, "collected_count": N, "messages": [msg1, msg2, ...] } """ # Validate critical args before subscribing if not topic or not msg_type: return {"error": "Missing required arguments: topic and msg_type must be provided."} # Validate optional parameters if queue_length is not None and (not isinstance(queue_length, int) or queue_length < 1): return {"error": "queue_length must be an integer ≥ 1"} if throttle_rate_ms is not None and ( not isinstance(throttle_rate_ms, int) or throttle_rate_ms < 0 ): return {"error": "throttle_rate_ms must be an integer ≥ 0"} # Send subscription request subscribe_msg: dict = { "op": "subscribe", "topic": topic, "type": msg_type, } # Add optional parameters if provided if queue_length is not None: subscribe_msg["queue_length"] = queue_length if throttle_rate_ms is not None: subscribe_msg["throttle_rate"] = throttle_rate_ms with ws_manager: send_error = ws_manager.send(subscribe_msg) if send_error: return {"error": f"Failed to subscribe: {send_error}"} collected_messages = [] status_errors = [] end_time = time.time() + duration # Loop until duration expires or we hit max_messages while time.time() < end_time and len(collected_messages) < max_messages: response = ws_manager.receive(timeout=0.5) # non-blocking small timeout if response is None: continue # idle timeout: no frame this tick msg_data = parse_json(response) if not msg_data: continue # non-JSON or empty # Check for status errors from rosbridge if msg_data.get("op") == "status" and msg_data.get("level") == "error": status_errors.append(msg_data.get("msg", "Unknown error")) continue # Check for published messages matching our topic if msg_data.get("op") == "publish" and msg_data.get("topic") == topic: collected_messages.append(msg_data.get("msg", {})) # Unsubscribe when done unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) return { "topic": topic, "collected_count": len(collected_messages), "messages": collected_messages, "status_errors": status_errors, # Include any errors encountered during collection } @mcp.tool( description=( "Publish a sequence of messages with delays.\n" "Example:\n" "publish_for_durations(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', messages=[{'linear': {'x': 1.0}}, {'linear': {'x': 0.0}}], durations=[1, 2])" ) ) def publish_for_durations( topic: str = "", msg_type: str = "", messages: List[Dict[str, Any]] = [], durations: List[float] = [], ) -> dict: """ Publish a sequence of messages to a given ROS topic with delays in between. Args: topic (str): ROS topic name (e.g., "/cmd_vel") msg_type (str): ROS message type (e.g., "geometry_msgs/Twist") messages (List[Dict[str, Any]]): A list of message dictionaries (ROS-compatible payloads) durations (List[float]): A list of durations (seconds) to wait between messages Returns: dict: { "success": True, "published_count": <number of messages>, "topic": topic, "msg_type": msg_type } OR {"error": "<error message>"} if something failed """ # Validate critical args before publishing if not topic or not msg_type or messages == [] or durations == []: return { "error": "Missing required arguments: topic, msg_type, messages, and durations must all be provided." } # Ensure same length for messages & durations if len(messages) != len(durations): return {"error": "messages and durations must have the same length"} # Use proper advertise → publish → unadvertise pattern with ws_manager: # 1. Advertise the topic advertise_msg = {"op": "advertise", "topic": topic, "type": msg_type} send_error = ws_manager.send(advertise_msg) if send_error: return {"error": f"Failed to advertise topic: {send_error}"} # Check for advertise response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Advertise failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for advertise published_count = 0 errors = [] # 2. Iterate and publish each message with a delay for i, (msg, delay) in enumerate(zip(messages, durations)): # Build the rosbridge publish message publish_msg = {"op": "publish", "topic": topic, "msg": msg} # Send it send_error = ws_manager.send(publish_msg) if send_error: errors.append(f"Message {i + 1}: {send_error}") continue # Continue with next message instead of failing completely # Check for publish response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": errors.append(f"Message {i + 1}: {msg_data.get('msg', 'Unknown error')}") continue except json.JSONDecodeError: pass # Non-JSON response is usually fine for publish published_count += 1 # Wait before sending the next message time.sleep(delay) # 3. Unadvertise the topic unadvertise_msg = {"op": "unadvertise", "topic": topic} ws_manager.send(unadvertise_msg) return { "success": True, "published_count": published_count, "total_messages": len(messages), "topic": topic, "msg_type": msg_type, "errors": errors, # Include any errors encountered during publishing } ## ############################################################################################## ## ## ## ROS SERVICES ## ## ############################################################################################## ## @mcp.tool(description=("Get list of all available ROS services.\nExample:\nget_services()")) def get_services() -> dict: """ Get list of all available ROS services. Returns: dict: Contains list of all active services, or a message string if no services are found. """ # rosbridge service call to get service list message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "get_services_request_1", } # Request service list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return service info if present if response and "values" in response: services = response["values"].get("services", []) return {"services": services, "service_count": len(services)} else: return {"warning": "No services found"} @mcp.tool( description=( "Get the service type for a specific service.\nExample:\nget_service_type('/rosapi/topics')" ) ) def get_service_type(service: str) -> dict: """ Get the service type for a specific service. Args: service (str): The service name (e.g., '/rosapi/topics') Returns: dict: Contains the service type, or an error message if service doesn't exist. """ # Validate input if not service or not service.strip(): return {"error": "Service name cannot be empty"} # rosbridge service call to get service type message = { "op": "call_service", "service": "/rosapi/service_type", "type": "rosapi/ServiceType", "args": {"service": service}, "id": f"get_service_type_request_{service.replace('/', '_')}", } # Request service type from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return service type if present if response and "values" in response: service_type = response["values"].get("type", "") if service_type: return {"service": service, "type": service_type} else: return {"error": f"Service {service} does not exist or has no type"} else: return {"error": f"Failed to get type for service {service}"} @mcp.tool( description=( "Get complete service details including request and response structures.\n" "Example:\n" "get_service_details('my_package/CustomService')" ) ) def get_service_details(service_type: str) -> dict: """ Get complete service details including request and response structures. Args: service_type (str): The service type (e.g., 'my_package/CustomService') Returns: dict: Contains complete service definition with request and response structures. """ # Validate input if not service_type or not service_type.strip(): return {"error": "Service type cannot be empty"} result = {"service_type": service_type, "request": {}, "response": {}} # Get both request and response details in a single WebSocket context with ws_manager: # Get request details request_message = { "op": "call_service", "service": "/rosapi/service_request_details", "type": "rosapi/ServiceRequestDetails", "args": {"type": service_type}, "id": f"get_service_details_request_{service_type.replace('/', '_')}", } request_response = ws_manager.request(request_message) if request_response and "values" in request_response: typedefs = request_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype result["request"] = {"fields": fields, "field_count": len(fields)} # Get response details response_message = { "op": "call_service", "service": "/rosapi/service_response_details", "type": "rosapi/ServiceResponseDetails", "args": {"type": service_type}, "id": f"get_service_details_response_{service_type.replace('/', '_')}", } response_response = ws_manager.request(response_message) if response_response and "values" in response_response: typedefs = response_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype result["response"] = {"fields": fields, "field_count": len(fields)} # Check if we got any data if not result["request"] and not result["response"]: return {"error": f"Service type {service_type} not found or has no definition"} return result @mcp.tool( description=( "Get list of nodes that provide a specific service.\n" "Example:\n" "get_service_providers('/rosapi/topics')" ) ) def get_service_providers(service: str) -> dict: """ Get list of nodes that provide a specific service. Args: service (str): The service name (e.g., '/rosapi/topics') Returns: dict: Contains list of nodes providing this service, or an error message if service doesn't exist. """ # Validate input if not service or not service.strip(): return {"error": "Service name cannot be empty"} # rosbridge service call to get service providers (using service_node like inspect_all_services) message = { "op": "call_service", "service": "/rosapi/service_node", "type": "rosapi/ServiceNode", "args": {"service": service}, "id": f"get_service_providers_request_{service.replace('/', '_')}", } # Request service providers from rosbridge with ws_manager: response = ws_manager.request(message) # Return service providers if present (using same logic as inspect_all_services) providers = [] # Handle different response formats safely if response and isinstance(response, dict): if "values" in response: node = response["values"].get("node", "") if node: providers = [node] elif "result" in response: node = response["result"].get("node", "") if node: providers = [node] elif "error" in response: return {"error": f"Service call failed: {response['error']}"} elif response is False: return {"error": f"No response received for service {service}"} elif response is True: return {"error": f"Unexpected boolean response for service {service}"} else: return {"error": f"Failed to get providers for service {service}"} return {"service": service, "providers": providers, "provider_count": len(providers)} @mcp.tool( description=( "Get comprehensive information about all services including types and providers. Note that this may take time to execute when three are a large number of services since it queries each one by one under the hood. \n" "Example:\n" "inspect_all_services()" ) ) def inspect_all_services() -> dict: """ Get comprehensive information about all services including types and providers. Returns: dict: Contains detailed information about all services, including service names, types, and provider nodes. """ # First get all services services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "inspect_all_services_request_1", } with ws_manager: services_response = ws_manager.request(services_message) if not services_response or "values" not in services_response: return {"error": "Failed to get services list"} services = services_response["values"].get("services", []) service_details = {} # Get details for each service service_errors = [] for service in services: # Get service type type_message = { "op": "call_service", "service": "/rosapi/service_type", "type": "rosapi/ServiceType", "args": {"service": service}, "id": f"get_type_{service.replace('/', '_')}", } type_response = ws_manager.request(type_message) service_type = "" if type_response and "values" in type_response: service_type = type_response["values"].get("type", "unknown") elif type_response and "error" in type_response: service_errors.append(f"Service {service}: {type_response['error']}") # Get service provider (using service_node instead of service_providers) provider_message = { "op": "call_service", "service": "/rosapi/service_node", "type": "rosapi/ServiceNode", "args": {"service": service}, "id": f"get_provider_{service.replace('/', '_')}", } provider_response = ws_manager.request(provider_message) providers = [] # Handle different response formats safely if provider_response and isinstance(provider_response, dict): if "values" in provider_response: node = provider_response["values"].get("node", "") if node: providers = [node] elif "result" in provider_response: node = provider_response["result"].get("node", "") if node: providers = [node] elif "error" in provider_response: service_errors.append( f"Service {service} provider: {provider_response['error']}" ) elif provider_response is False: service_errors.append(f"Service {service} provider: No response received") elif provider_response is True: service_errors.append(f"Service {service} provider: Unexpected boolean response") service_details[service] = { "type": service_type, "providers": providers, "provider_count": len(providers), } return { "total_services": len(services), "services": service_details, "service_errors": service_errors, # Include any errors encountered during inspection } @mcp.tool( description=( "Call a ROS service with specified request data.\n" "Example:\n" "call_service('/rosapi/topics', 'rosapi/Topics', {})\n" "call_service('/slow_service', 'my_package/SlowService', {}, timeout=10.0) # Specify timeout only for slow services" ) ) def call_service( service_name: str, service_type: str, request: dict, timeout: Optional[float] = None ) -> dict: """ Call a ROS service with specified request data. Args: service_name (str): The service name (e.g., '/rosapi/topics') service_type (str): The service type (e.g., 'rosapi/Topics') request (dict): Service request data as a dictionary timeout (Optional[float]): Timeout in seconds. If None, uses the default timeout. Returns: dict: Contains the service response or error information. """ # rosbridge service call message = { "op": "call_service", "service": service_name, "type": service_type, "args": request, "id": f"call_service_request_{service_name.replace('/', '_')}", } # Call the service through rosbridge with ws_manager: response = ws_manager.request(message, timeout=timeout) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return { "service": service_name, "service_type": service_type, "success": False, "error": f"Service call failed: {error_msg}", } # Return service response if present if response: if response.get("op") == "service_response": # Alternative response format return { "service": service_name, "service_type": service_type, "success": response.get("result", True), "result": response.get("values", {}), } elif response.get("op") == "status" and response.get("level") == "error": # Error response return { "service": service_name, "service_type": service_type, "success": False, "error": response.get("msg", "Unknown error"), } else: # Unexpected response format return { "service": service_name, "service_type": service_type, "success": False, "error": "Unexpected response format", "raw_response": response, } else: return { "service": service_name, "service_type": service_type, "success": False, "error": "No response received from service call", } @mcp.tool(description=("Get list of all currently running ROS nodes.\nExample:\nget_nodes()")) def get_nodes() -> dict: """ Get list of all currently running ROS nodes. Returns: dict: Contains list of all active nodes, or a message string if no nodes are found. """ # rosbridge service call to get node list message = { "op": "call_service", "service": "/rosapi/nodes", "type": "rosapi/Nodes", "args": {}, "id": "get_nodes_request_1", } # Request node list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return node info if present if response and "values" in response: nodes = response["values"].get("nodes", []) return {"nodes": nodes, "node_count": len(nodes)} else: return {"warning": "No nodes found"} @mcp.tool( description=( "Get detailed information about a specific node including its publishers, subscribers, and services.\n" "Example:\n" "get_node_details('/turtlesim')" ) ) def get_node_details(node: str) -> dict: """ Get detailed information about a specific node including its publishers, subscribers, and services. Args: node (str): The node name (e.g., '/turtlesim') Returns: dict: Contains detailed node information including publishers, subscribers, and services, or an error message if node doesn't exist. """ # Validate input if not node or not node.strip(): return {"error": "Node name cannot be empty"} result = { "node": node, "publishers": [], "subscribers": [], "services": [], "publisher_count": 0, "subscriber_count": 0, "service_count": 0, } # rosbridge service call to get node details message = { "op": "call_service", "service": "/rosapi/node_details", "type": "rosapi/NodeDetails", "args": {"node": node}, "id": f"get_node_details_{node.replace('/', '_')}", } # Request node details from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Extract data from the response if response and "values" in response: values = response["values"] # Extract publishers, subscribers, and services from the response # Note: rosapi uses "publishing" and "subscribing" field names publishers = values.get("publishing", []) subscribers = values.get("subscribing", []) services = values.get("services", []) result["publishers"] = publishers result["subscribers"] = subscribers result["services"] = services result["publisher_count"] = len(publishers) result["subscriber_count"] = len(subscribers) result["service_count"] = len(services) # Check if we got any data if not result["publishers"] and not result["subscribers"] and not result["services"]: return {"error": f"Node {node} not found or has no details available"} return result @mcp.tool( description=( "Get comprehensive information about all ROS nodes including their publishers, subscribers, and services.\n" "Example:\n" "inspect_all_nodes()" ) ) def inspect_all_nodes() -> dict: """ Get comprehensive information about all ROS nodes including their publishers, subscribers, and services. Returns: dict: Contains detailed information about all nodes including: - Node names and details - Publishers for each node - Subscribers for each node - Services provided by each node - Connection counts and statistics """ # First get all nodes nodes_message = { "op": "call_service", "service": "/rosapi/nodes", "type": "rosapi/Nodes", "args": {}, "id": "inspect_all_nodes_request_1", } with ws_manager: nodes_response = ws_manager.request(nodes_message) if not nodes_response or "values" not in nodes_response: return {"error": "Failed to get nodes list"} nodes = nodes_response["values"].get("nodes", []) node_details = {} # Get details for each node node_errors = [] for node in nodes: # Get node details (publishers, subscribers, services) node_details_message = { "op": "call_service", "service": "/rosapi/node_details", "type": "rosapi/NodeDetails", "args": {"node": node}, "id": f"get_node_details_{node.replace('/', '_')}", } node_details_response = ws_manager.request(node_details_message) if node_details_response and "values" in node_details_response: values = node_details_response["values"] # Extract publishers, subscribers, and services from the response # Note: rosapi uses "publishing" and "subscribing" field names publishers = values.get("publishing", []) subscribers = values.get("subscribing", []) services = values.get("services", []) node_details[node] = { "publishers": publishers, "subscribers": subscribers, "services": services, "publisher_count": len(publishers), "subscriber_count": len(subscribers), "service_count": len(services), } elif ( node_details_response and "result" in node_details_response and not node_details_response["result"] ): error_msg = node_details_response.get("values", {}).get( "message", "Service call failed" ) node_errors.append(f"Node {node}: {error_msg}") else: node_errors.append(f"Node {node}: Failed to get node details") return { "total_nodes": len(nodes), "nodes": node_details, "node_errors": node_errors, # Include any errors encountered during inspection } ## ############################################################################################## ## ## ## NETWORK DIAGNOSTICS ## ## ############################################################################################## ## @mcp.tool( description=( "Ping a robot's IP address and check if a specific port is open.\n" "A successful ping to the IP but not the port can indicate that ROSbridge is not running.\n" "Example:\n" "ping_robot(ip='192.168.1.100', port=9090)" ) ) def ping_robot(ip: str, port: int, ping_timeout: float = 2.0, port_timeout: float = 2.0) -> dict: """ Ping an IP address and check if a specific port is open. Args: ip (str): The IP address to ping (e.g., '192.168.1.100') port (int): The port number to check (e.g., 9090) ping_timeout (float): Timeout for ping in seconds. Default = 2.0. port_timeout (float): Timeout for port check in seconds. Default = 2.0. Returns: dict: Contains ping and port check results with detailed status information. """ return ping_ip_and_port(ip, port, ping_timeout, port_timeout) ## ############################################################################################## ## ## ## IMAGE ANALYSIS ## ## ############################################################################################## ## def _encode_image_to_imagecontent(image): """ Encodes a PIL Image to a format compatible with ImageContent. Args: image (PIL.Image.Image): The image to encode. Returns: ImageContent: JPEG-encoded image wrapped in an ImageContent object. """ buffer = io.BytesIO() image.save(buffer, format="JPEG") img_bytes = buffer.getvalue() img_obj = Image(data=img_bytes, format="jpeg") return img_obj.to_image_content() @mcp.tool( description=( "First, subscribe to an Image topic using 'subscribe_once' to save an image.\n" "Then, use this tool to analyze the saved image\n" ) ) def analyze_previously_received_image(): """ Analyze the previously received image saved at ./camera/received_image.jpeg This tool loads the previously saved image from './camera/received_image.jpeg' (which must have been created by 'parse_image' or 'subscribe_once'), and converts it into an MCP-compatible ImageContent format so that the LLM can interpret it. """ path = "./camera/received_image.jpeg" if not os.path.exists(path): return {"error": "No image found at ./camera/received_image.jpeg"} img = PILImage.open(path) return _encode_image_to_imagecontent(img) def parse_arguments(): """Parse command line arguments for MCP server configuration.""" parser = argparse.ArgumentParser( description="ROS MCP Server - Connect to ROS robots via MCP protocol", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python server.py # Use stdio transport (default) python server.py --transport http --host 0.0.0.0 --port 9000 python server.py --transport streamable-http --host 127.0.0.1 --port 8080 """, ) parser.add_argument( "--transport", choices=["stdio", "http", "streamable-http", "sse"], default="stdio", help="MCP transport protocol to use (default: stdio)", ) parser.add_argument( "--host", default="127.0.0.1", help="Host address for HTTP-based transports (default: 127.0.0.1)", ) parser.add_argument( "--port", type=int, default=9000, help="Port number for HTTP-based transports (default: 9000)", ) return parser.parse_args() def main(): """Main entry point for the MCP server console script.""" # Parse command line arguments args = parse_arguments() # Update global variables with parsed arguments global MCP_TRANSPORT, MCP_HOST, MCP_PORT MCP_TRANSPORT = args.transport.lower() MCP_HOST = args.host MCP_PORT = args.port if MCP_TRANSPORT == "stdio": # stdio doesn't need host/port mcp.run(transport="stdio") elif MCP_TRANSPORT in {"http", "streamable-http"}: # http and streamable-http both require host/port print(f"Transport: {MCP_TRANSPORT} -> http://{MCP_HOST}:{MCP_PORT}") mcp.run(transport=MCP_TRANSPORT, host=MCP_HOST, port=MCP_PORT) elif MCP_TRANSPORT == "sse": print(f"Transport: {MCP_TRANSPORT} -> http://{MCP_HOST}:{MCP_PORT}") print("Currently unsupported. Use 'stdio', 'http', or 'streamable-http'.") mcp.run(transport=MCP_TRANSPORT, host=MCP_HOST, port=MCP_PORT) else: raise ValueError( f"Unsupported MCP_TRANSPORT={MCP_TRANSPORT!r}. " "Use 'stdio', 'http', or 'streamable-http'." ) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/robotmcp/ros-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server