Skip to main content
Glama
server.py117 kB
import argparse import asyncio import io import json import os import sys import time import uuid from typing import Any, Dict, List, Union from fastmcp import Context, FastMCP from fastmcp.utilities.types import Image from PIL import Image as PILImage from utils.config_utils import get_verified_robot_spec_util, get_verified_robots_list_util from utils.network_utils import ping_ip_and_port from utils.websocket_manager import WebSocketManager, parse_input # ROS bridge connection settings ROSBRIDGE_IP = "127.0.0.1" # Default is localhost. Replace with your local IPor set using the LLM. ROSBRIDGE_PORT = ( 9090 # Rosbridge default is 9090. Replace with your rosbridge port or set using the LLM. ) # MCP transport settings MCP_TRANSPORT = os.getenv("MCP_TRANSPORT", "stdio").lower() # Default is stdio. # MCP connection settings (streamable-http) MCP_HOST = os.getenv( "MCP_HOST", "127.0.0.1" ) # Default is localhost. Replace with the address of your remote MCP server. # MCP port settings (default=9000) MCP_PORT = int( os.getenv("MCP_PORT", "9000") ) # Default is 9000. Replace with the port of your remote MCP server. # Initialize MCP server and WebSocket manager mcp = FastMCP("ros-mcp-server") ws_manager = WebSocketManager( ROSBRIDGE_IP, ROSBRIDGE_PORT, default_timeout=5.0 ) # Increased default timeout for ROS operations def convert_expects_image_hint(expects_image: str) -> bool | None: """ Convert string-based expects_image hint to boolean for internal use. Args: expects_image (str): String hint about whether to expect image data - "true": prioritize image parsing - "false": skip image detection for faster processing - "auto": auto-detect based on message fields (default) - any other value: treated as "auto" Returns: bool | None: Converted hint for parse_input function - True: prioritize image parsing - False: skip image detection - None: auto-detect """ if expects_image == "true": return True elif expects_image == "false": return False else: # "auto" or any other value return None @mcp.tool( description=( "Load specifications and usage context for a verified robot model. " "ONLY use if the robot model is in the verified list (use get_verified_robots_list first to check). " "Most robots won't have a spec - that's OK, connect directly using connect_to_robot instead." ) ) def get_verified_robot_spec(name: str) -> dict: """ Load pre-defined specifications and additional context for a verified robot model. This is OPTIONAL - only for a small set of pre-verified robot models stored in the repository. Use get_verified_robots_list() first to check if a spec exists. If no spec exists for your robot, simply use connect_to_robot() directly. Args: name (str): The exact robot model name from the verified list. Returns: dict: The robot specification with type, prompts, and additional context. """ robot_config = get_verified_robot_spec_util(name) if len(robot_config) > 1: return { "error": f"Multiple configurations found for robot '{name}'. Please specify a more precise name." } elif not robot_config: return { "error": f"No configuration found for robot '{name}'. Please check the name and try again. Or you can set the IP/port manually using the 'connect_to_robot' tool." } return {"robot_config": robot_config} @mcp.tool( description=( "List pre-verified robot models that have specification files with usage guidance available. " "Use this to check if a robot model has additional context available before calling get_verified_robot_spec. " "If your robot is not in this list, you can still connect to it directly using connect_to_robot." ) ) def get_verified_robots_list() -> dict: """ List all pre-verified robot models that have specification files available in the repository. This is a small curated list of robot models with pre-defined specifications. If your robot model is not in this list, you can still connect to any ROS robot using the connect_to_robot() tool directly. Returns: dict: List of available verified robot model names and count. """ return get_verified_robots_list_util() @mcp.tool( description=( "Connect to the robot by setting the IP/port. This tool also tests connectivity to confirm that the robot is reachable and the port is open." ) ) def connect_to_robot( ip: str = ROSBRIDGE_IP, port: Union[int, str] = ROSBRIDGE_PORT, ping_timeout: float = 2.0, port_timeout: float = 2.0, ) -> dict: """ Connect to a robot by setting the IP and port for the WebSocket connection, then testing connectivity. Args: ip (str): The IP address of the rosbridge server. Defaults to "127.0.0.1" (localhost). port (int): The port number of the rosbridge server. Defaults to 9090. ping_timeout (float): Timeout for ping in seconds. Default = 2.0. port_timeout (float): Timeout for port check in seconds. Default = 2.0. Returns: dict: Connection status with ping and port check results. """ # Set default values if None actual_ip = str(ip).strip() if ip else ROSBRIDGE_IP actual_port = int(port) if port else ROSBRIDGE_PORT # Set the IP and port ws_manager.set_ip(actual_ip, actual_port) # Test connectivity ping_result = ping_ip_and_port(actual_ip, actual_port, ping_timeout, port_timeout) # Combine the results return { "message": f"WebSocket IP set to {actual_ip}:{actual_port}", "connectivity_test": ping_result, } @mcp.tool(description="Detect the ROS version and distribution via rosbridge.") def detect_ros_version() -> dict: """ Detects the ROS version and distro via rosbridge WebSocket. Returns: dict: {'version': <version or '1'>, 'distro': <distro>} or error info. """ # Try ROS2 detection ros2_request = { "op": "call_service", "id": "ros2_version_check", "service": "/rosapi/get_ros_version", "args": {}, } with ws_manager: response = ws_manager.request(ros2_request) values = response.get("values") if response else None if isinstance(values, dict) and "version" in values: return {"version": values.get("version"), "distro": values.get("distro")} # Fallback to ROS1 detection ros1_request = { "op": "call_service", "id": "ros1_distro_check", "service": "/rosapi/get_param", "args": {"name": "/rosdistro"}, } response = ws_manager.request(ros1_request) value = response.get("values") if response else None if value: distro = value.get("value") if isinstance(value, dict) else value distro_clean = str(distro).strip('"').replace("\\n", "").replace("\n", "") return {"version": "1", "distro": distro_clean} return {"error": "Could not detect ROS version"} @mcp.tool(description=("Fetch available topics from the ROS bridge.\nExample:\nget_topics()")) def get_topics() -> dict: """ Fetch available topics from the ROS bridge. Returns: dict: Contains two lists - 'topics' and 'types', or a message string if no topics are found. """ # rosbridge service call to get topic list message = { "op": "call_service", "service": "/rosapi/topics", "type": "rosapi/Topics", "id": "get_topics_request_1", } # Request topic list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return topic info if present if response and "values" in response: return response["values"] else: return {"warning": "No topics found"} @mcp.tool( description=("Get the message type for a specific topic.\nExample:\nget_topic_type('/cmd_vel')") ) def get_topic_type(topic: str) -> dict: """ Get the message type for a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains the 'type' field with the message type, or an error message if topic doesn't exist. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get topic type message = { "op": "call_service", "service": "/rosapi/topic_type", "type": "rosapi/TopicType", "args": {"topic": topic}, "id": f"get_topic_type_request_{topic.replace('/', '_')}", } # Request topic type from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return topic type if present if response and "values" in response: topic_type = response["values"].get("type", "") if topic_type: return {"topic": topic, "type": topic_type} else: return {"error": f"Topic {topic} does not exist or has no type"} else: return {"error": f"Failed to get type for topic {topic}"} @mcp.tool( description=( "Get the complete structure/definition of a message type.\n" "Example:\n" "get_message_details('geometry_msgs/Twist')" ) ) def get_message_details(message_type: str) -> dict: """ Get the complete structure/definition of a message type. Args: message_type (str): The message type (e.g., 'geometry_msgs/Twist') Returns: dict: Contains the message structure with field names and types, or an error message if the message type doesn't exist. """ # Validate input if not message_type or not message_type.strip(): return {"error": "Message type cannot be empty"} # rosbridge service call to get message details message = { "op": "call_service", "service": "/rosapi/message_details", "type": "rosapi/MessageDetails", "args": {"type": message_type}, "id": f"get_message_details_request_{message_type.replace('/', '_')}", } # Request message details from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return message structure if present if response and "values" in response: typedefs = response["values"].get("typedefs", []) if typedefs: # Parse the structure into a more readable format structure = {} for typedef in typedefs: type_name = typedef.get("type", message_type) field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype structure[type_name] = {"fields": fields, "field_count": len(fields)} return {"message_type": message_type, "structure": structure} else: return {"error": f"Message type {message_type} not found or has no definition"} else: return {"error": f"Failed to get details for message type {message_type}"} @mcp.tool( description=( "Get list of nodes that are publishing to a specific topic.\n" "Example:\n" "get_topic_publishers('/cmd_vel')" ) ) def get_topic_publishers(topic: str) -> dict: """ Get list of nodes that are publishing to a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains list of publisher node names, or a message if no publishers found. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get publishers message = { "op": "call_service", "service": "/rosapi/publishers", "type": "rosapi/Publishers", "args": {"topic": topic}, "id": f"get_topic_publishers_request_{topic.replace('/', '_')}", } # Request publishers from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return publishers if present if response and "values" in response: publishers = response["values"].get("publishers", []) return {"topic": topic, "publishers": publishers, "publisher_count": len(publishers)} else: return {"error": f"Failed to get publishers for topic {topic}"} @mcp.tool( description=( "Get list of nodes that are subscribed to a specific topic.\n" "Example:\n" "get_topic_subscribers('/cmd_vel')" ) ) def get_topic_subscribers(topic: str) -> dict: """ Get list of nodes that are subscribed to a specific topic. Args: topic (str): The topic name (e.g., '/cmd_vel') Returns: dict: Contains list of subscriber node names, or a message if no subscribers found. """ # Validate input if not topic or not topic.strip(): return {"error": "Topic name cannot be empty"} # rosbridge service call to get subscribers message = { "op": "call_service", "service": "/rosapi/subscribers", "type": "rosapi/Subscribers", "args": {"topic": topic}, "id": f"get_topic_subscribers_request_{topic.replace('/', '_')}", } # Request subscribers from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return subscribers if present if response and "values" in response: subscribers = response["values"].get("subscribers", []) return {"topic": topic, "subscribers": subscribers, "subscriber_count": len(subscribers)} else: return {"error": f"Failed to get subscribers for topic {topic}"} @mcp.tool( description=( "Get comprehensive information about all ROS topics including publishers, subscribers, and message types. Note that this may take time to execute when three are a large number of topics since it queries each one by one under the hood. \n" "Example:\n" "inspect_all_topics()" ) ) def inspect_all_topics() -> dict: """ Get comprehensive information about all ROS topics including publishers, subscribers, and message types. Returns: dict: Contains detailed information about all topics including: - Topic names and message types - Publishers for each topic - Subscribers for each topic - Connection counts and statistics """ # First get all topics topics_message = { "op": "call_service", "service": "/rosapi/topics", "type": "rosapi/Topics", "args": {}, "id": "inspect_all_topics_request_1", } with ws_manager: topics_response = ws_manager.request(topics_message) if not topics_response or "values" not in topics_response: return {"error": "Failed to get topics list"} topics = topics_response["values"].get("topics", []) types = topics_response["values"].get("types", []) topic_details = {} # Get details for each topic topic_errors = [] for i, topic in enumerate(topics): # Get topic type topic_type = types[i] if i < len(types) else "unknown" # Get publishers for this topic publishers_message = { "op": "call_service", "service": "/rosapi/publishers", "type": "rosapi/Publishers", "args": {"topic": topic}, "id": f"get_publishers_{topic.replace('/', '_')}", } publishers_response = ws_manager.request(publishers_message) publishers = [] if publishers_response and "values" in publishers_response: publishers = publishers_response["values"].get("publishers", []) elif publishers_response and "error" in publishers_response: topic_errors.append(f"Topic {topic} publishers: {publishers_response['error']}") # Get subscribers for this topic subscribers_message = { "op": "call_service", "service": "/rosapi/subscribers", "type": "rosapi/Subscribers", "args": {"topic": topic}, "id": f"get_subscribers_{topic.replace('/', '_')}", } subscribers_response = ws_manager.request(subscribers_message) subscribers = [] if subscribers_response and "values" in subscribers_response: subscribers = subscribers_response["values"].get("subscribers", []) elif subscribers_response and "error" in subscribers_response: topic_errors.append(f"Topic {topic} subscribers: {subscribers_response['error']}") topic_details[topic] = { "type": topic_type, "publishers": publishers, "subscribers": subscribers, "publisher_count": len(publishers), "subscriber_count": len(subscribers), } return { "total_topics": len(topics), "topics": topic_details, "topic_errors": topic_errors, # Include any errors encountered during inspection } @mcp.tool( description=( "Subscribe to a ROS topic and return the first message received.\n" "Example:\n" "subscribe_once(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped')\n" "subscribe_once(topic='/slow_topic', msg_type='my_package/SlowMsg', timeout=None) # Specify timeout only if topic publishes infrequently\n" "subscribe_once(topic='/high_rate_topic', msg_type='sensor_msgs/Image', timeout=None, queue_length=5, throttle_rate_ms=100) # Control message buffering and rate\n" "subscribe_once(topic='/camera/image_raw', msg_type='sensor_msgs/Image', expects_image='true') # Hint that this is an image for faster processing\n" "subscribe_once(topic='/point_cloud', msg_type='sensor_msgs/PointCloud2', expects_image='false') # Skip image detection for non-image data" ) ) def subscribe_once( topic: str = "", msg_type: str = "", expects_image: str = "auto", timeout: float = ws_manager.default_timeout, queue_length: int | None = None, throttle_rate_ms: int | None = None, ) -> dict: """ Subscribe to a given ROS topic via rosbridge and return the first message received. Args: topic (str): The ROS topic name (e.g., "/cmd_vel", "/joint_states"). msg_type (str): The ROS message type (e.g., "geometry_msgs/Twist"). timeout (float | None): Timeout in seconds. If None, uses the default timeout. queue_length (int | None): How many messages to buffer before dropping old ones. Must be ≥ 1. throttle_rate_ms (int | None): Minimum interval between messages in milliseconds. Must be ≥ 0. expects_image (str): Hint about whether to expect image data. - "true": prioritize image parsing (use for sensor_msgs/Image topics) - "false": skip image detection for faster processing (use for non-image topics) - "auto": auto-detect based on message fields (default) Returns: dict: - {"msg": <parsed ROS message>} if successful - {"error": "<error message>"} if subscription or timeout fails """ # Validate critical args before attempting subscription if not topic or not msg_type: return {"error": "Missing required arguments: topic and msg_type must be provided."} # Validate optional parameters if queue_length is not None and (not isinstance(queue_length, int) or queue_length < 1): return {"error": "queue_length must be an integer ≥ 1"} if throttle_rate_ms is not None and ( not isinstance(throttle_rate_ms, int) or throttle_rate_ms < 0 ): return {"error": "throttle_rate_ms must be an integer ≥ 0"} # Construct the rosbridge subscribe message subscribe_msg: dict = { "op": "subscribe", "topic": topic, "type": msg_type, } # Add optional parameters if provided if queue_length is not None: subscribe_msg["queue_length"] = queue_length if throttle_rate_ms is not None: subscribe_msg["throttle_rate"] = throttle_rate_ms # Subscribe and wait for the first message with ws_manager: # Send subscription request send_error = ws_manager.send(subscribe_msg) if send_error: return {"error": f"Failed to subscribe: {send_error}"} # Use default timeout if none specified actual_timeout = timeout if timeout is not None else ws_manager.default_timeout # Loop until we receive the first message or timeout end_time = time.time() + actual_timeout while time.time() < end_time: response = ws_manager.receive(timeout=0.5) # non-blocking small timeout if response is None: continue # idle timeout: no frame this tick # Convert string hint to boolean for parse_input expects_image_bool = convert_expects_image_hint(expects_image) # Parse input with expects_image hint msg_data, was_parsed_as_image = parse_input(response, expects_image_bool) if not msg_data: continue # parsing failed or empty # Check for status errors from rosbridge if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Rosbridge error: {msg_data.get('msg', 'Unknown error')}"} # Check for the first published message if msg_data.get("op") == "publish" and msg_data.get("topic") == topic: # Unsubscribe before returning the message unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) # Return appropriate message based on whether image was actually parsed if was_parsed_as_image: # Exclude the 'data' field from image messages as it's too large msg_content = msg_data.get("msg", {}) filtered_msg = {k: v for k, v in msg_content.items() if k != "data"} return { "msg": filtered_msg, "message": "Image received successfully and saved in the MCP server. Run the 'analyze_previously_received_image' tool to analyze it", } else: return {"msg": msg_data.get("msg", {})} # Timeout - unsubscribe and return error unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) return {"error": "Timeout waiting for message from topic"} @mcp.tool( description=( "Publish a single message to a ROS topic.\n" "Example:\n" "publish_once(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', msg={'linear': {'x': 1.0}})" ) ) def publish_once(topic: str = "", msg_type: str = "", msg: dict = {}) -> dict: """ Publish a single message to a ROS topic via rosbridge. Args: topic (str): ROS topic name (e.g., "/cmd_vel") msg_type (str): ROS message type (e.g., "geometry_msgs/Twist") msg (dict): Message payload as a dictionary Returns: dict: - {"success": True} if sent without errors - {"error": "<error message>"} if connection/send failed - If rosbridge responds (usually it doesn’t for publish), parsed JSON or error info """ # Validate critical args before attempting publish if not topic or not msg_type or msg == {}: return { "error": "Missing required arguments: topic, msg_type, and msg must all be provided." } # Use proper advertise → publish → unadvertise pattern with ws_manager: # 1. Advertise the topic advertise_msg = {"op": "advertise", "topic": topic, "type": msg_type} send_error = ws_manager.send(advertise_msg) if send_error: return {"error": f"Failed to advertise topic: {send_error}"} # Check for advertise response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Advertise failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for advertise # 2. Publish the message publish_msg = {"op": "publish", "topic": topic, "msg": msg} send_error = ws_manager.send(publish_msg) if send_error: # Try to unadvertise even if publish failed ws_manager.send({"op": "unadvertise", "topic": topic}) return {"error": f"Failed to publish message: {send_error}"} # Check for publish response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": # Unadvertise before returning error ws_manager.send({"op": "unadvertise", "topic": topic}) return {"error": f"Publish failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for publish # 3. Unadvertise the topic unadvertise_msg = {"op": "unadvertise", "topic": topic} ws_manager.send(unadvertise_msg) return { "success": True, "note": "Message published using advertise → publish → unadvertise pattern", } @mcp.tool( description=( "Subscribe to a topic for a duration and collect messages.\n" "Example:\n" "subscribe_for_duration(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', duration=5, max_messages=10)\n" "subscribe_for_duration(topic='/high_rate_topic', msg_type='sensor_msgs/Image', duration=10, queue_length=5, throttle_rate_ms=100) # Control message buffering and rate\n" "subscribe_for_duration(topic='/camera/image_raw', msg_type='sensor_msgs/Image', duration=5, expects_image='true') # Hint that this is an image for faster processing\n" "subscribe_for_duration(topic='/point_cloud', msg_type='sensor_msgs/PointCloud2', duration=5, expects_image='false') # Skip image detection for non-image data" ) ) def subscribe_for_duration( topic: str = "", msg_type: str = "", duration: float = 5.0, max_messages: int = 100, queue_length: int | None = None, throttle_rate_ms: int | None = None, expects_image: str = "auto", ) -> dict: """ Subscribe to a ROS topic via rosbridge for a fixed duration and collect messages. Args: topic (str): ROS topic name (e.g. "/cmd_vel", "/joint_states") msg_type (str): ROS message type (e.g. "geometry_msgs/Twist") duration (float): How long (seconds) to listen for messages max_messages (int): Maximum number of messages to collect before stopping queue_length (int | None): How many messages to buffer before dropping old ones. Must be ≥ 1. throttle_rate_ms (int | None): Minimum interval between messages in milliseconds. Must be ≥ 0. expects_image (str): Hint about whether to expect image data. - "true": prioritize image parsing (use for sensor_msgs/Image topics) - "false": skip image detection for faster processing (use for non-image topics) - "auto": auto-detect based on message fields (default) Returns: dict: { "topic": topic_name, "collected_count": N, "messages": [msg1, msg2, ...] } """ # Validate critical args before subscribing if not topic or not msg_type: return {"error": "Missing required arguments: topic and msg_type must be provided."} # Validate optional parameters if queue_length is not None and (not isinstance(queue_length, int) or queue_length < 1): return {"error": "queue_length must be an integer ≥ 1"} if throttle_rate_ms is not None and ( not isinstance(throttle_rate_ms, int) or throttle_rate_ms < 0 ): return {"error": "throttle_rate_ms must be an integer ≥ 0"} # Send subscription request subscribe_msg: dict = { "op": "subscribe", "topic": topic, "type": msg_type, } # Add optional parameters if provided if queue_length is not None: subscribe_msg["queue_length"] = queue_length if throttle_rate_ms is not None: subscribe_msg["throttle_rate"] = throttle_rate_ms with ws_manager: send_error = ws_manager.send(subscribe_msg) if send_error: return {"error": f"Failed to subscribe: {send_error}"} collected_messages = [] status_errors = [] end_time = time.time() + duration # Loop until duration expires or we hit max_messages while time.time() < end_time and len(collected_messages) < max_messages: response = ws_manager.receive(timeout=0.5) # non-blocking small timeout if response is None: continue # idle timeout: no frame this tick # Convert string hint to boolean for parse_input expects_image_bool = convert_expects_image_hint(expects_image) # Parse input with expects_image hint msg_data, was_parsed_as_image = parse_input(response, expects_image_bool) if not msg_data: continue # parsing failed or empty # Check for status errors from rosbridge if msg_data.get("op") == "status" and msg_data.get("level") == "error": status_errors.append(msg_data.get("msg", "Unknown error")) continue # Check for published messages matching our topic if msg_data.get("op") == "publish" and msg_data.get("topic") == topic: # Add message based on whether it was actually parsed as image if was_parsed_as_image: # Exclude the 'data' field from image messages as it's too large msg_content = msg_data.get("msg", {}) filtered_msg = {k: v for k, v in msg_content.items() if k != "data"} collected_messages.append( { "image_message": "Image received and saved. Use 'analyze_previously_received_image' to analyze it.", "msg": filtered_msg, } ) else: collected_messages.append(msg_data.get("msg", {})) # Unsubscribe when done unsubscribe_msg = {"op": "unsubscribe", "topic": topic} ws_manager.send(unsubscribe_msg) return { "topic": topic, "collected_count": len(collected_messages), "messages": collected_messages, "status_errors": status_errors, # Include any errors encountered during collection } @mcp.tool( description=( "Publish a sequence of messages with delays.\n" "Example:\n" "publish_for_durations(topic='/cmd_vel', msg_type='geometry_msgs/msg/TwistStamped', messages=[{'linear': {'x': 1.0}}, {'linear': {'x': 0.0}}], durations=[1, 2])" ) ) def publish_for_durations( topic: str = "", msg_type: str = "", messages: List[Dict[str, Any]] = [], durations: List[float] = [], ) -> dict: """ Publish a sequence of messages to a given ROS topic with delays in between. Args: topic (str): ROS topic name (e.g., "/cmd_vel") msg_type (str): ROS message type (e.g., "geometry_msgs/Twist") messages (List[Dict[str, Any]]): A list of message dictionaries (ROS-compatible payloads) durations (List[float]): A list of durations (seconds) to wait between messages Returns: dict: { "success": True, "published_count": <number of messages>, "topic": topic, "msg_type": msg_type } OR {"error": "<error message>"} if something failed """ # Validate critical args before publishing if not topic or not msg_type or messages == [] or durations == []: return { "error": "Missing required arguments: topic, msg_type, messages, and durations must all be provided." } # Ensure same length for messages & durations if len(messages) != len(durations): return {"error": "messages and durations must have the same length"} # Use proper advertise → publish → unadvertise pattern with ws_manager: # 1. Advertise the topic advertise_msg = {"op": "advertise", "topic": topic, "type": msg_type} send_error = ws_manager.send(advertise_msg) if send_error: return {"error": f"Failed to advertise topic: {send_error}"} # Check for advertise response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": return {"error": f"Advertise failed: {msg_data.get('msg', 'Unknown error')}"} except json.JSONDecodeError: pass # Non-JSON response is usually fine for advertise published_count = 0 errors = [] # 2. Iterate and publish each message with a delay for i, (msg, delay) in enumerate(zip(messages, durations)): # Build the rosbridge publish message publish_msg = {"op": "publish", "topic": topic, "msg": msg} # Send it send_error = ws_manager.send(publish_msg) if send_error: errors.append(f"Message {i + 1}: {send_error}") continue # Continue with next message instead of failing completely # Check for publish response/errors response = ws_manager.receive(timeout=1.0) if response: try: msg_data = json.loads(response) if msg_data.get("op") == "status" and msg_data.get("level") == "error": errors.append(f"Message {i + 1}: {msg_data.get('msg', 'Unknown error')}") continue except json.JSONDecodeError: pass # Non-JSON response is usually fine for publish published_count += 1 # Wait before sending the next message time.sleep(delay) # 3. Unadvertise the topic unadvertise_msg = {"op": "unadvertise", "topic": topic} ws_manager.send(unadvertise_msg) return { "success": True, "published_count": published_count, "total_messages": len(messages), "topic": topic, "msg_type": msg_type, "errors": errors, # Include any errors encountered during publishing } ## ############################################################################################## ## ## ## ROS SERVICES ## ## ############################################################################################## ## @mcp.tool(description=("Get list of all available ROS services.\nExample:\nget_services()")) def get_services() -> dict: """ Get list of all available ROS services. Returns: dict: Contains list of all active services, or a message string if no services are found. """ # rosbridge service call to get service list message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "get_services_request_1", } # Request service list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return service info if present if response and "values" in response: services = response["values"].get("services", []) return {"services": services, "service_count": len(services)} else: return {"warning": "No services found"} @mcp.tool( description=( "Get the service type for a specific service.\nExample:\nget_service_type('/rosapi/topics')" ) ) def get_service_type(service: str) -> dict: """ Get the service type for a specific service. Args: service (str): The service name (e.g., '/rosapi/topics') Returns: dict: Contains the service type, or an error message if service doesn't exist. """ # Validate input if not service or not service.strip(): return {"error": "Service name cannot be empty"} # rosbridge service call to get service type message = { "op": "call_service", "service": "/rosapi/service_type", "type": "rosapi/ServiceType", "args": {"service": service}, "id": f"get_service_type_request_{service.replace('/', '_')}", } # Request service type from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return service type if present if response and "values" in response: service_type = response["values"].get("type", "") if service_type: return {"service": service, "type": service_type} else: return {"error": f"Service {service} does not exist or has no type"} else: return {"error": f"Failed to get type for service {service}"} @mcp.tool( description=( "Get complete service details including request and response structures.\n" "Example:\n" "get_service_details('my_package/CustomService')" ) ) def get_service_details(service_type: str) -> dict: """ Get complete service details including request and response structures. Args: service_type (str): The service type (e.g., 'my_package/CustomService') Returns: dict: Contains complete service definition with request and response structures. """ # Validate input if not service_type or not service_type.strip(): return {"error": "Service type cannot be empty"} result = {"service_type": service_type, "request": {}, "response": {}} # Get both request and response details in a single WebSocket context with ws_manager: # Get request details request_message = { "op": "call_service", "service": "/rosapi/service_request_details", "type": "rosapi/ServiceRequestDetails", "args": {"type": service_type}, "id": f"get_service_details_request_{service_type.replace('/', '_')}", } request_response = ws_manager.request(request_message) if request_response and "values" in request_response: typedefs = request_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype result["request"] = {"fields": fields, "field_count": len(fields)} # Get response details response_message = { "op": "call_service", "service": "/rosapi/service_response_details", "type": "rosapi/ServiceResponseDetails", "args": {"type": service_type}, "id": f"get_service_details_response_{service_type.replace('/', '_')}", } response_response = ws_manager.request(response_message) if response_response and "values" in response_response: typedefs = response_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) fields = {} for name, ftype in zip(field_names, field_types): fields[name] = ftype result["response"] = {"fields": fields, "field_count": len(fields)} # Check if we got any data if not result["request"] and not result["response"]: return {"error": f"Service type {service_type} not found or has no definition"} return result @mcp.tool( description=( "Get list of nodes that provide a specific service.\n" "Example:\n" "get_service_providers('/rosapi/topics')" ) ) def get_service_providers(service: str) -> dict: """ Get list of nodes that provide a specific service. Args: service (str): The service name (e.g., '/rosapi/topics') Returns: dict: Contains list of nodes providing this service, or an error message if service doesn't exist. """ # Validate input if not service or not service.strip(): return {"error": "Service name cannot be empty"} # rosbridge service call to get service providers (using service_node like inspect_all_services) message = { "op": "call_service", "service": "/rosapi/service_node", "type": "rosapi/ServiceNode", "args": {"service": service}, "id": f"get_service_providers_request_{service.replace('/', '_')}", } # Request service providers from rosbridge with ws_manager: response = ws_manager.request(message) # Return service providers if present (using same logic as inspect_all_services) providers = [] # Handle different response formats safely if response and isinstance(response, dict): if "values" in response: node = response["values"].get("node", "") if node: providers = [node] elif "result" in response: node = response["result"].get("node", "") if node: providers = [node] elif "error" in response: return {"error": f"Service call failed: {response['error']}"} elif response is False: return {"error": f"No response received for service {service}"} elif response is True: return {"error": f"Unexpected boolean response for service {service}"} else: return {"error": f"Failed to get providers for service {service}"} return {"service": service, "providers": providers, "provider_count": len(providers)} @mcp.tool( description=( "Get comprehensive information about all services including types and providers. Note that this may take time to execute when three are a large number of services since it queries each one by one under the hood. \n" "Example:\n" "inspect_all_services()" ) ) def inspect_all_services() -> dict: """ Get comprehensive information about all services including types and providers. Returns: dict: Contains detailed information about all services, including service names, types, and provider nodes. """ # First get all services services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "inspect_all_services_request_1", } with ws_manager: services_response = ws_manager.request(services_message) if not services_response or "values" not in services_response: return {"error": "Failed to get services list"} services = services_response["values"].get("services", []) service_details = {} # Get details for each service service_errors = [] for service in services: # Get service type type_message = { "op": "call_service", "service": "/rosapi/service_type", "type": "rosapi/ServiceType", "args": {"service": service}, "id": f"get_type_{service.replace('/', '_')}", } type_response = ws_manager.request(type_message) service_type = "" if type_response and "values" in type_response: service_type = type_response["values"].get("type", "unknown") elif type_response and "error" in type_response: service_errors.append(f"Service {service}: {type_response['error']}") # Get service provider (using service_node instead of service_providers) provider_message = { "op": "call_service", "service": "/rosapi/service_node", "type": "rosapi/ServiceNode", "args": {"service": service}, "id": f"get_provider_{service.replace('/', '_')}", } provider_response = ws_manager.request(provider_message) providers = [] # Handle different response formats safely if provider_response and isinstance(provider_response, dict): if "values" in provider_response: node = provider_response["values"].get("node", "") if node: providers = [node] elif "result" in provider_response: node = provider_response["result"].get("node", "") if node: providers = [node] elif "error" in provider_response: service_errors.append( f"Service {service} provider: {provider_response['error']}" ) elif provider_response is False: service_errors.append(f"Service {service} provider: No response received") elif provider_response is True: service_errors.append(f"Service {service} provider: Unexpected boolean response") service_details[service] = { "type": service_type, "providers": providers, "provider_count": len(providers), } return { "total_services": len(services), "services": service_details, "service_errors": service_errors, # Include any errors encountered during inspection } @mcp.tool( description=( "Call a ROS service with specified request data.\n" "Example:\n" "call_service('/rosapi/topics', 'rosapi/Topics', {})\n" "call_service('/slow_service', 'my_package/SlowService', {}, timeout=10.0) # Specify timeout only for slow services" ) ) def call_service( service_name: str, service_type: str, request: dict, timeout: float | None = None ) -> dict: """ Call a ROS service with specified request data. Args: service_name (str): The service name (e.g., '/rosapi/topics') service_type (str): The service type (e.g., 'rosapi/Topics') request (dict): Service request data as a dictionary timeout (float | None): Timeout in seconds. If None, uses the default timeout. Returns: dict: Contains the service response or error information. """ # rosbridge service call message = { "op": "call_service", "service": service_name, "type": service_type, "args": request, "id": f"call_service_request_{service_name.replace('/', '_')}", } # Call the service through rosbridge with ws_manager: response = ws_manager.request(message, timeout=timeout) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return { "service": service_name, "service_type": service_type, "success": False, "error": f"Service call failed: {error_msg}", } # Return service response if present if response: if response.get("op") == "service_response": # Alternative response format return { "service": service_name, "service_type": service_type, "success": response.get("result", True), "result": response.get("values", {}), } elif response.get("op") == "status" and response.get("level") == "error": # Error response return { "service": service_name, "service_type": service_type, "success": False, "error": response.get("msg", "Unknown error"), } else: # Unexpected response format return { "service": service_name, "service_type": service_type, "success": False, "error": "Unexpected response format", "raw_response": response, } else: return { "service": service_name, "service_type": service_type, "success": False, "error": "No response received from service call", } @mcp.tool(description=("Get list of all currently running ROS nodes.\nExample:\nget_nodes()")) def get_nodes() -> dict: """ Get list of all currently running ROS nodes. Returns: dict: Contains list of all active nodes, or a message string if no nodes are found. """ # rosbridge service call to get node list message = { "op": "call_service", "service": "/rosapi/nodes", "type": "rosapi/Nodes", "args": {}, "id": "get_nodes_request_1", } # Request node list from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Return node info if present if response and "values" in response: nodes = response["values"].get("nodes", []) return {"nodes": nodes, "node_count": len(nodes)} else: return {"warning": "No nodes found"} @mcp.tool( description=( "Get a single ROS parameter value by name. Works only with ROS 2.\nExample:\nget_param('/turtlesim:background_b')" ) ) def get_parameter(name: str) -> dict: """ Get a single ROS parameter value by name. Works only with ROS 2. Args: name (str): The parameter name (e.g., '/turtlesim:background_b') Returns: dict: Contains parameter value and metadata, or error message if parameter not found. """ if not name or not name.strip(): return {"error": "Parameter name cannot be empty"} message = { "op": "call_service", "service": "/rosapi/get_param", "type": "rosapi/GetParam", "args": {"name": name}, "id": f"get_param_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: response = ws_manager.request(message) if response and "values" in response: result_data = response["values"] return { "name": name, "value": result_data.get("value", ""), "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } elif response and "result" in response and response["result"]: result_data = response["result"] return { "name": name, "value": result_data.get("value", ""), "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } else: error_msg = ( response.get("values", {}).get("message", "Service call failed") if response else "No response" ) return {"error": f"Failed to get parameter {name}: {error_msg}"} @mcp.tool( description=( "Set a single ROS parameter value. Works only with ROS 2.\nExample:\nset_param('/turtlesim:background_b', '255')" ) ) def set_parameter(name: str, value: str) -> dict: """ Set a single ROS parameter value. Works only with ROS 2. Args: name (str): The parameter name (e.g., '/turtlesim:background_b') value (str): The parameter value to set Returns: dict: Contains success status and metadata, or error message if failed. """ if not name or not name.strip(): return {"error": "Parameter name cannot be empty"} message = { "op": "call_service", "service": "/rosapi/set_param", "type": "rosapi/SetParam", "args": {"name": name, "value": value}, "id": f"set_param_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: response = ws_manager.request(message) if response and "values" in response: result_data = response["values"] return { "name": name, "value": value, "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } elif response and "result" in response and response["result"]: result_data = response["result"] return { "name": name, "value": value, "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } else: error_msg = ( response.get("values", {}).get("message", "Service call failed") if response else "No response" ) return {"error": f"Failed to set parameter {name}: {error_msg}"} @mcp.tool( description=( "Check if a ROS parameter exists. Works only with ROS 2.\nExample:\nhas_param('/turtlesim:background_b')" ) ) def has_parameter(name: str) -> dict: """ Check if a ROS parameter exists. Works only with ROS 2. Args: name (str): The parameter name (e.g., '/turtlesim:background_b') Returns: dict: Contains existence status and metadata, or error message if failed. """ if not name or not name.strip(): return {"error": "Parameter name cannot be empty"} message = { "op": "call_service", "service": "/rosapi/has_param", "type": "rosapi/HasParam", "args": {"name": name}, "id": f"has_param_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: response = ws_manager.request(message) if response and "values" in response: result_data = response["values"] return { "name": name, "exists": result_data.get("exists", False), "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } elif response and "result" in response and response["result"]: result_data = response["result"] return { "name": name, "exists": result_data.get("exists", False), "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } else: error_msg = ( response.get("values", {}).get("message", "Service call failed") if response else "No response" ) return {"error": f"Failed to check parameter {name}: {error_msg}"} @mcp.tool( description=( "Delete a ROS parameter. Works only with ROS 2.\nExample:\ndelete_param('/turtlesim:background_b')" ) ) def delete_parameter(name: str) -> dict: """ Delete a ROS parameter. Works only with ROS 2. Args: name (str): The parameter name (e.g., '/turtlesim:background_b') Returns: dict: Contains success status and metadata, or error message if failed. """ if not name or not name.strip(): return {"error": "Parameter name cannot be empty"} message = { "op": "call_service", "service": "/rosapi/delete_param", "type": "rosapi/DeleteParam", "args": {"name": name}, "id": f"delete_param_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: response = ws_manager.request(message) if response and "values" in response: result_data = response["values"] return { "name": name, "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } elif response and "result" in response and response["result"]: result_data = response["result"] return { "name": name, "successful": result_data.get("successful", False), "reason": result_data.get("reason", ""), } else: error_msg = ( response.get("values", {}).get("message", "Service call failed") if response else "No response" ) return {"error": f"Failed to delete parameter {name}: {error_msg}"} @mcp.tool( description=( "Get list of all ROS parameter names. Works only with ROS 2.\nExample:\nget_parameters()" ) ) def get_parameters() -> dict: """ Get list of all ROS parameter names. Works only with ROS 2. Returns: dict: Contains list of all parameter names, or error message if failed. """ message = { "op": "call_service", "service": "/rosapi/get_param_names", "type": "rosapi/GetParamNames", "args": {}, "id": "get_parameters_request_1", } with ws_manager: response = ws_manager.request(message) if response and "values" in response: names = response["values"].get("names", []) return {"parameters": names, "parameter_count": len(names)} elif response and "result" in response and response["result"]: result_data = response["result"] if isinstance(result_data, dict): names = result_data.get("names", []) else: names = [] return {"parameters": names, "parameter_count": len(names)} else: error_msg = ( response.get("values", {}).get("message", "Service call failed") if response else "No response" ) return {"error": f"Failed to get parameter names: {error_msg}"} @mcp.tool( description=( "Get comprehensive information about all ROS parameters including values and metadata.\nWorks only with ROS 2.\n" "Example:\n" "inspect_all_parameters()" ) ) def inspect_all_parameters() -> dict: """ Get comprehensive information about all ROS parameters including values and metadata. Works only with ROS 2. Returns: dict: Contains detailed information about all parameters, including parameter names, values, and metadata. """ # First get all parameters parameters_message = { "op": "call_service", "service": "/rosapi/get_param_names", "type": "rosapi/GetParamNames", "args": {}, "id": "inspect_all_parameters_request_1", } with ws_manager: parameters_response = ws_manager.request(parameters_message) if not parameters_response or "values" not in parameters_response: return {"error": "Failed to get parameters list"} parameters = parameters_response["values"].get("names", []) parameter_details = {} # Get details for each parameter parameter_errors = [] for param_name in parameters: # Get parameter value value_message = { "op": "call_service", "service": "/rosapi/get_param", "type": "rosapi/GetParam", "args": {"name": param_name}, "id": f"get_param_{param_name.replace('/', '_').replace(':', '_')}", } value_response = ws_manager.request(value_message) param_value = "" param_successful = False if value_response and "values" in value_response: value_data = value_response["values"] param_value = value_data.get("value", "") param_successful = value_data.get("successful", False) elif value_response and "result" in value_response and value_response["result"]: value_data = value_response["result"] param_value = value_data.get("value", "") param_successful = value_data.get("successful", False) elif value_response and "error" in value_response: parameter_errors.append(f"Parameter {param_name}: {value_response['error']}") # Get parameter type (using describe_parameters service) type_message = { "op": "call_service", "service": "/rosapi/describe_parameters", "type": "rcl_interfaces/DescribeParameters", "args": {"names": [param_name]}, "id": f"describe_param_{param_name.replace('/', '_').replace(':', '_')}", } type_response = ws_manager.request(type_message) param_type = "unknown" # Handle different response formats for parameter type detection if type_response and isinstance(type_response, dict): if "values" in type_response: result_data = type_response["values"] if isinstance(result_data, dict): descriptors = result_data.get("descriptors", []) if descriptors and len(descriptors) > 0: param_type = descriptors[0].get("type", "unknown") elif "result" in type_response and type_response["result"]: result_data = type_response["result"] if isinstance(result_data, dict): descriptors = result_data.get("descriptors", []) if descriptors and len(descriptors) > 0: param_type = descriptors[0].get("type", "unknown") elif "error" in type_response: parameter_errors.append( f"Parameter {param_name} type: {type_response['error']}" ) # Fallback: Try to infer type from value if param_type == "unknown" and param_value: try: # Remove quotes for type checking clean_value = param_value.strip('"') # Try to parse as different types if clean_value.lower() in ["true", "false"]: param_type = "bool" elif clean_value.isdigit() or ( clean_value.startswith("-") and clean_value[1:].isdigit() ): param_type = "int" elif ( "." in clean_value and clean_value.replace(".", "").replace("-", "").isdigit() ): param_type = "float" elif param_value.startswith('"') and param_value.endswith('"'): param_type = "string" elif clean_value == "": param_type = "string" else: param_type = "string" except Exception: param_type = "string" parameter_details[param_name] = { "value": param_value, "type": param_type, "exists": param_successful, } return { "total_parameters": len(parameters), "parameters": parameter_details, "parameter_errors": parameter_errors, # Include any errors encountered during inspection } @mcp.tool( description=( "Get comprehensive details about a specific ROS parameter including value, type, and metadata. Works only with ROS 2.\n " "Example:\n" "get_parameter_details('/turtlesim:background_r')" ) ) def get_parameter_details(name: str) -> dict: """ Get comprehensive details about a specific ROS parameter including value, type, and metadata. Works only with ROS 2. Args: name (str): The parameter name (e.g., '/turtlesim:background_r') Returns: dict: Contains detailed parameter information or error details. """ # Validate input if not name or not name.strip(): return {"error": "Parameter name cannot be empty"} # Get parameter value value_message = { "op": "call_service", "service": "/rosapi/get_param", "type": "rosapi/GetParam", "args": {"name": name}, "id": f"get_param_details_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: value_response = ws_manager.request(value_message) if not value_response or "values" not in value_response: return {"error": f"Failed to get parameter {name}"} value_data = value_response["values"] param_value = value_data.get("value", "") param_successful = value_data.get("successful", False) if not param_successful: return {"error": f"Parameter {name} does not exist"} # Get parameter type type_message = { "op": "call_service", "service": "/rosapi/describe_parameters", "type": "rcl_interfaces/DescribeParameters", "args": {"names": [name]}, "id": f"describe_param_details_{name.replace('/', '_').replace(':', '_')}", } with ws_manager: type_response = ws_manager.request(type_message) param_type = "unknown" param_description = "" if type_response and isinstance(type_response, dict): if "values" in type_response: result_data = type_response["values"] if isinstance(result_data, dict): descriptors = result_data.get("descriptors", []) if descriptors and len(descriptors) > 0: descriptor = descriptors[0] param_type = descriptor.get("type", "unknown") param_description = descriptor.get("description", "") elif "result" in type_response and type_response["result"]: result_data = type_response["result"] if isinstance(result_data, dict): descriptors = result_data.get("descriptors", []) if descriptors and len(descriptors) > 0: descriptor = descriptors[0] param_type = descriptor.get("type", "unknown") param_description = descriptor.get("description", "") # Fallback: Try to infer type from value if param_type == "unknown" and param_value: try: clean_value = param_value.strip('"') if clean_value.lower() in ["true", "false"]: param_type = "bool" elif clean_value.isdigit() or ( clean_value.startswith("-") and clean_value[1:].isdigit() ): param_type = "int" elif "." in clean_value and clean_value.replace(".", "").replace("-", "").isdigit(): param_type = "float" elif param_value.startswith('"') and param_value.endswith('"'): param_type = "string" elif clean_value == "": param_type = "string" else: param_type = "string" except Exception: param_type = "string" return { "name": name, "value": param_value, "type": param_type, "exists": param_successful, "description": param_description, "node": name.split(":")[0] if ":" in name else "", "parameter": name.split(":")[1] if ":" in name else name, } @mcp.tool( description=( "Get detailed information about a specific node including its publishers, subscribers, and services.\n" "Example:\n" "get_node_details('/turtlesim')" ) ) def get_node_details(node: str) -> dict: """ Get detailed information about a specific node including its publishers, subscribers, and services. Args: node (str): The node name (e.g., '/turtlesim') Returns: dict: Contains detailed node information including publishers, subscribers, and services, or an error message if node doesn't exist. """ # Validate input if not node or not node.strip(): return {"error": "Node name cannot be empty"} result = { "node": node, "publishers": [], "subscribers": [], "services": [], "publisher_count": 0, "subscriber_count": 0, "service_count": 0, } # rosbridge service call to get node details message = { "op": "call_service", "service": "/rosapi/node_details", "type": "rosapi/NodeDetails", "args": {"node": node}, "id": f"get_node_details_{node.replace('/', '_')}", } # Request node details from rosbridge with ws_manager: response = ws_manager.request(message) # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values error_msg = response.get("values", {}).get("message", "Service call failed") return {"error": f"Service call failed: {error_msg}"} # Extract data from the response if response and "values" in response: values = response["values"] # Extract publishers, subscribers, and services from the response # Note: rosapi uses "publishing" and "subscribing" field names publishers = values.get("publishing", []) subscribers = values.get("subscribing", []) services = values.get("services", []) result["publishers"] = publishers result["subscribers"] = subscribers result["services"] = services result["publisher_count"] = len(publishers) result["subscriber_count"] = len(subscribers) result["service_count"] = len(services) # Check if we got any data if not result["publishers"] and not result["subscribers"] and not result["services"]: return {"error": f"Node {node} not found or has no details available"} return result @mcp.tool( description=( "Get comprehensive information about all ROS nodes including their publishers, subscribers, and services.\n" "Example:\n" "inspect_all_nodes()" ) ) def inspect_all_nodes() -> dict: """ Get comprehensive information about all ROS nodes including their publishers, subscribers, and services. Returns: dict: Contains detailed information about all nodes including: - Node names and details - Publishers for each node - Subscribers for each node - Services provided by each node - Connection counts and statistics """ # First get all nodes nodes_message = { "op": "call_service", "service": "/rosapi/nodes", "type": "rosapi/Nodes", "args": {}, "id": "inspect_all_nodes_request_1", } with ws_manager: nodes_response = ws_manager.request(nodes_message) if not nodes_response or "values" not in nodes_response: return {"error": "Failed to get nodes list"} nodes = nodes_response["values"].get("nodes", []) node_details = {} # Get details for each node node_errors = [] for node in nodes: # Get node details (publishers, subscribers, services) node_details_message = { "op": "call_service", "service": "/rosapi/node_details", "type": "rosapi/NodeDetails", "args": {"node": node}, "id": f"get_node_details_{node.replace('/', '_')}", } node_details_response = ws_manager.request(node_details_message) if node_details_response and "values" in node_details_response: values = node_details_response["values"] # Extract publishers, subscribers, and services from the response # Note: rosapi uses "publishing" and "subscribing" field names publishers = values.get("publishing", []) subscribers = values.get("subscribing", []) services = values.get("services", []) node_details[node] = { "publishers": publishers, "subscribers": subscribers, "services": services, "publisher_count": len(publishers), "subscriber_count": len(subscribers), "service_count": len(services), } elif ( node_details_response and "result" in node_details_response and not node_details_response["result"] ): error_msg = node_details_response.get("values", {}).get( "message", "Service call failed" ) node_errors.append(f"Node {node}: {error_msg}") else: node_errors.append(f"Node {node}: Failed to get node details") return { "total_nodes": len(nodes), "nodes": node_details, "node_errors": node_errors, # Include any errors encountered during inspection } ## ############################################################################################## ## ## ## ROS ACTIONS ## ## ############################################################################################## ## @mcp.tool( description=( "Get list of all available ROS actions. Works only with ROS 2.\nExample:\nget_actions()" ) ) def get_actions() -> dict: """ Get list of all available ROS actions. Works only with ROS 2. Returns: dict: Contains list of all active actions, or a message string if no actions are found. """ # Check if required service is available required_services = ["/rosapi/action_servers"] with ws_manager: # Get available services to check compatibility services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "check_services_for_get_actions", } services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { "warning": "Cannot check service availability", "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, "suggestion": "Ensure rosbridge is running and rosapi is available", }, } available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] if missing_services: return { "warning": "Action listing not supported by this rosbridge/rosapi version", "compatibility": { "issue": "Required action services are not available", "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "action" in s], "suggestion": "This rosbridge version doesn't support action listing services", }, } # rosbridge service call to get action list message = { "op": "call_service", "service": "/rosapi/action_servers", "type": "rosapi/ActionServers", "args": {}, "id": "get_actions_request_1", } # Request action list from rosbridge with ws_manager: response = ws_manager.request(message) # Handle error responses from ws_manager if response and "error" in response: return {"error": f"WebSocket error: {response['error']}"} # Check for service response errors first if response and "result" in response and not response["result"]: # Service call failed - return error with details from values if "values" in response and isinstance(response["values"], dict): error_msg = response["values"].get("message", "Service call failed") else: error_msg = "Service call failed" return {"error": f"Service call failed: {error_msg}"} # Return action info if present if response and "values" in response: actions = response["values"].get("action_servers", []) return {"actions": actions, "action_count": len(actions)} else: return {"warning": "No actions found or /rosapi/action_servers service not available"} @mcp.tool( description=( "Get the action type for a specific action. Works only with ROS 2.\nExample:\nget_action_type('/turtle1/rotate_absolute')" ) ) def get_action_type(action: str) -> dict: """ Get the action type for a specific action. Works only with ROS 2. Args: action (str): The action name (e.g., '/turtle1/rotate_absolute') Returns: dict: Contains the action type, or an error message if action doesn't exist. """ # Validate input if not action or not action.strip(): return {"error": "Action name cannot be empty"} # Check if required service is available required_services = ["/rosapi/interfaces"] with ws_manager: # Get available services to check compatibility services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "check_services_for_get_action_type", } services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { "warning": "Cannot check service availability", "action": action, "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, "suggestion": "Ensure rosbridge is running and rosapi is available", }, } available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] if missing_services: return { "warning": "Action type resolution not supported by this rosbridge/rosapi version", "action": action, "compatibility": { "issue": "Required services are not available", "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "interface" in s], "suggestion": "This rosbridge version doesn't support interface listing services", }, } # Since there's no direct action_type service, we'll derive it from known patterns # or use a mapping approach for common actions # Known action type mappings action_type_map = { "/turtle1/rotate_absolute": "turtlesim/action/RotateAbsolute", # Add more mappings as needed } # Check if it's a known action if action in action_type_map: return {"action": action, "type": action_type_map[action]} # For unknown actions, try to derive the type from interfaces list # First get all interfaces to see if we can find a matching action type interfaces_message = { "op": "call_service", "service": "/rosapi/interfaces", "type": "rosapi/Interfaces", "args": {}, "id": f"get_interfaces_for_action_{action.replace('/', '_')}", } with ws_manager: interfaces_response = ws_manager.request(interfaces_message) if interfaces_response and "values" in interfaces_response: interfaces = interfaces_response["values"].get("interfaces", []) # Look for action interfaces that might match action_interfaces = [iface for iface in interfaces if "/action/" in iface] # Try to match based on action name patterns action_name_part = action.split("/")[-1] # Get last part (e.g., "rotate_absolute") for iface in action_interfaces: if action_name_part.lower() in iface.lower(): return {"action": action, "type": iface} # If no exact match, return the list of available action interfaces return { "error": f"Action type for {action} not found", "available_action_types": action_interfaces, "suggestion": "This action might not be available or use a different naming pattern", } return { "error": f"Failed to get type for action {action}", "action": action, "compatibility": { "issue": "Failed to retrieve interfaces from rosapi", "required_services": ["/rosapi/interfaces"], "suggestion": "Ensure rosbridge is running and rosapi is available", "note": "Action type resolution requires /rosapi/interfaces service", }, } @mcp.tool( description=( "Get complete action details including goal, result, and feedback structures. Works only with ROS 2.\n" "Example:\n" "get_action_details('turtlesim/action/RotateAbsolute')." ) ) def get_action_details(action_type: str) -> dict: """ Get complete action details including goal, result, and feedback structures. Works only with ROS 2. Args: action_type (str): The action type (e.g., 'turtlesim/action/RotateAbsolute') Returns: dict: Contains complete action definition with goal, result, and feedback structures. """ # Validate input if not action_type or not action_type.strip(): return {"error": "Action type cannot be empty"} # Check if required action detail services are available required_services = [ "/rosapi/action_goal_details", "/rosapi/action_result_details", "/rosapi/action_feedback_details", ] with ws_manager: # Get available services to check compatibility services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "check_services_for_action_details", } services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { "error": "Failed to check service availability", "action_type": action_type, "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, "suggestion": "Ensure rosbridge is running and rosapi is available", }, } available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] if missing_services: return { "error": f"Action details for {action_type} not found", "action_type": action_type, "compatibility": { "issue": "Required action detail services are not available", "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "action" in s], "suggestions": [ "Use get_actions() to list available actions", "Use get_action_type() to get action type from action name", "Action details may not be exposed by this rosbridge/rosapi version", "Consider subscribing to action topics directly for live message inspection", ], "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi", }, } result = {"action_type": action_type, "goal": {}, "result": {}, "feedback": {}} # Get goal, result, and feedback details in a single WebSocket context with ws_manager: # Get goal details using action-specific service goal_message = { "op": "call_service", "service": "/rosapi/action_goal_details", "type": "rosapi_msgs/srv/ActionGoalDetails", "args": {"type": action_type}, "id": f"get_action_goal_details_{action_type.replace('/', '_')}", } goal_response = ws_manager.request(goal_message) if ( goal_response and isinstance(goal_response, dict) and "values" in goal_response and "error" not in goal_response ): typedefs = goal_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) field_array_len = typedef.get("fieldarraylen", []) examples = typedef.get("examples", []) const_names = typedef.get("constnames", []) const_values = typedef.get("constvalues", []) fields = {} field_details = {} for i, (name, ftype) in enumerate(zip(field_names, field_types)): fields[name] = ftype field_details[name] = { "type": ftype, "array_length": field_array_len[i] if i < len(field_array_len) else -1, "example": examples[i] if i < len(examples) else None, } result["goal"] = { "fields": fields, "field_count": len(fields), "field_details": field_details, "message_type": typedef.get("type", ""), "examples": examples, "constants": dict(zip(const_names, const_values)) if const_names else {}, } # Get result details using action-specific service result_message = { "op": "call_service", "service": "/rosapi/action_result_details", "type": "rosapi_msgs/srv/ActionResultDetails", "args": {"type": action_type}, "id": f"get_action_result_details_{action_type.replace('/', '_')}", } result_response = ws_manager.request(result_message) if ( result_response and isinstance(result_response, dict) and "values" in result_response and "error" not in result_response ): typedefs = result_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) field_array_len = typedef.get("fieldarraylen", []) examples = typedef.get("examples", []) const_names = typedef.get("constnames", []) const_values = typedef.get("constvalues", []) fields = {} field_details = {} for i, (name, ftype) in enumerate(zip(field_names, field_types)): fields[name] = ftype field_details[name] = { "type": ftype, "array_length": field_array_len[i] if i < len(field_array_len) else -1, "example": examples[i] if i < len(examples) else None, } result["result"] = { "fields": fields, "field_count": len(fields), "field_details": field_details, "message_type": typedef.get("type", ""), "examples": examples, "constants": dict(zip(const_names, const_values)) if const_names else {}, } # Get feedback details using action-specific service feedback_message = { "op": "call_service", "service": "/rosapi/action_feedback_details", "type": "rosapi_msgs/srv/ActionFeedbackDetails", "args": {"type": action_type}, "id": f"get_action_feedback_details_{action_type.replace('/', '_')}", } feedback_response = ws_manager.request(feedback_message) if ( feedback_response and isinstance(feedback_response, dict) and "values" in feedback_response and "error" not in feedback_response ): typedefs = feedback_response["values"].get("typedefs", []) if typedefs: for typedef in typedefs: field_names = typedef.get("fieldnames", []) field_types = typedef.get("fieldtypes", []) field_array_len = typedef.get("fieldarraylen", []) examples = typedef.get("examples", []) const_names = typedef.get("constnames", []) const_values = typedef.get("constvalues", []) fields = {} field_details = {} for i, (name, ftype) in enumerate(zip(field_names, field_types)): fields[name] = ftype field_details[name] = { "type": ftype, "array_length": field_array_len[i] if i < len(field_array_len) else -1, "example": examples[i] if i < len(examples) else None, } result["feedback"] = { "fields": fields, "field_count": len(fields), "field_details": field_details, "message_type": typedef.get("type", ""), "examples": examples, "constants": dict(zip(const_names, const_values)) if const_names else {}, } # Check if we got any data if not result["goal"] and not result["result"] and not result["feedback"]: return {"error": f"Action type {action_type} not found or has no definition"} return result @mcp.tool( description=( "Get action status for a specific action name. Works only with ROS 2.\n" "Example:\n" "get_action_status('/fibonacci')" ) ) def get_action_status(action_name: str) -> dict: """ Get action status for a specific action name. Works only with ROS 2. Args: action_name (str): The action name (e.g., '/fibonacci') Returns: dict: Contains action status information including active goals and their status. """ # Validate input if not action_name or not action_name.strip(): return {"error": "Action name cannot be empty"} # Ensure action name starts with / if not action_name.startswith("/"): action_name = f"/{action_name}" # Try to get action status by subscribing to the status topic status_topic = f"{action_name}/_action/status" status_msg_type = "action_msgs/msg/GoalStatusArray" try: # Subscribe to action status topic with ws_manager: message = { "op": "subscribe", "topic": status_topic, "type": status_msg_type, "id": f"get_action_status_{action_name.replace('/', '_')}", } send_error = ws_manager.send(message) if send_error: return { "action_name": action_name, "success": False, "error": f"Failed to subscribe to status topic: {send_error}", } # Wait for status message response = ws_manager.receive(timeout=3.0) if not response: return { "action_name": action_name, "success": False, "error": "No response from action status topic", } response_data = json.loads(response) if response_data.get("op") == "status" and response_data.get("level") == "error": return { "error": f"Action status error: {response_data.get('msg', 'Unknown error')}" } if "msg" not in response_data or "status_list" not in response_data["msg"]: return { "action_name": action_name, "success": True, "active_goals": [], "goal_count": 0, "note": f"No active goals found for action {action_name}", } status_list = response_data["msg"]["status_list"] status_map = { 0: "STATUS_UNKNOWN", 1: "STATUS_ACCEPTED", 2: "STATUS_EXECUTING", 3: "STATUS_CANCELING", 4: "STATUS_SUCCEEDED", 5: "STATUS_CANCELED", 6: "STATUS_ABORTED", } active_goals = [] for status_item in status_list: goal_info = status_item.get("goal_info", {}) goal_id = goal_info.get("goal_id", {}).get("uuid", "unknown") status = status_item.get("status", -1) stamp = goal_info.get("stamp", {}) active_goals.append( { "goal_id": goal_id, "status": status, "status_text": status_map.get(status, "UNKNOWN"), "timestamp": f"{stamp.get('sec', 0)}.{stamp.get('nanosec', 0)}", } ) return { "action_name": action_name, "success": True, "active_goals": active_goals, "goal_count": len(active_goals), "note": f"Found {len(active_goals)} active goal(s) for action {action_name}", } except json.JSONDecodeError as e: return {"error": f"Failed to parse status response: {str(e)}"} except Exception as e: return { "action_name": action_name, "success": False, "error": f"Failed to get action status: {str(e)}", } @mcp.tool( description=( "Get comprehensive information about all actions including types and available actions. Works only with ROS 2.\n" "Example:\n" "inspect_all_actions()." ) ) def inspect_all_actions() -> dict: """ Get comprehensive information about all actions including types and available actions. Works only with ROS 2. Returns: dict: Contains detailed information about all actions, including action names, types, and server information. """ # Check if required action services are available required_services = ["/rosapi/action_servers"] with ws_manager: # Get available services to check compatibility services_message = { "op": "call_service", "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, "id": "check_services_for_inspect_actions", } services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { "error": "Failed to check service availability", "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, "suggestion": "Ensure rosbridge is running and rosapi is available", }, } available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] if missing_services: return { "error": "Action inspection not supported by this rosbridge/rosapi version", "compatibility": { "issue": "Required action services are not available", "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "action" in s], "suggestions": [ "This rosbridge version doesn't support action inspection services", "Use get_actions() to list available actions", "Consider upgrading rosbridge or using a different implementation", ], "note": "Action inspection requires /rosapi/action_servers service", }, } # First get all actions actions_message = { "op": "call_service", "service": "/rosapi/action_servers", "type": "rosapi/ActionServers", "args": {}, "id": "inspect_all_actions_request_1", } with ws_manager: actions_response = ws_manager.request(actions_message) if not actions_response or "values" not in actions_response: return {"error": "Failed to get actions list"} actions = actions_response["values"].get("action_servers", []) action_details = {} # Get details for each action action_errors = [] for action in actions: # Try to get action type (this may not always work due to rosapi limitations) action_type = "unknown" # Known action type mappings for common actions action_type_map = { "/turtle1/rotate_absolute": "turtlesim/action/RotateAbsolute", # Add more mappings as needed based on common ROS actions } if action in action_type_map: action_type = action_type_map[action] else: # Try to derive from interfaces interfaces_message = { "op": "call_service", "service": "/rosapi/interfaces", "type": "rosapi/Interfaces", "args": {}, "id": f"get_interfaces_{action.replace('/', '_')}", } interfaces_response = ws_manager.request(interfaces_message) if interfaces_response and "values" in interfaces_response: interfaces = interfaces_response["values"].get("interfaces", []) action_interfaces = [iface for iface in interfaces if "/action/" in iface] # Try to match based on action name patterns action_name_part = action.split("/")[-1] for iface in action_interfaces: if action_name_part.lower() in iface.lower(): action_type = iface break action_details[action] = { "type": action_type, "status": "available" if action_type != "unknown" else "type_unknown", } return { "total_actions": len(actions), "actions": action_details, "action_errors": action_errors, } @mcp.tool( description=( "Send a goal to a ROS action server. Works only with ROS 2.\n" "Example:\n" "send_action_goal('/turtle1/rotate_absolute', 'turtlesim/action/RotateAbsolute', {'theta': 1.57})" ) ) async def send_action_goal( action_name: str, action_type: str, goal: dict, timeout: float | None = None, ctx: Context | None = None, ) -> dict: """ Send a goal to a ROS action server. Works only with ROS 2. Args: action_name (str): The name of the action to call (e.g., '/turtle1/rotate_absolute') action_type (str): The type of the action (e.g., 'turtlesim/action/RotateAbsolute') goal (dict): The goal message to send timeout (float, optional): Timeout for action completion in seconds. Default is None (uses default timeout). Returns: dict: Contains action response including goal_id, status, and result. """ # Validate inputs if not action_name or not action_name.strip(): return {"error": "Action name cannot be empty"} if not action_type or not action_type.strip(): return {"error": "Action type cannot be empty"} if not goal: return {"error": "Goal cannot be empty"} # Generate unique goal ID goal_id = f"goal_{int(time.time() * 1000)}_{uuid.uuid4().hex[:8]}" # rosbridge action goal message # Based on rosbridge source code, it expects "args" instead of "goal" message = { "op": "send_action_goal", "id": goal_id, "action": action_name, "action_type": action_type, "args": goal, # rosbridge expects "args" not "goal" "feedback": True, # Enable feedback messages } # Send the action goal through rosbridge with ws_manager: send_error = ws_manager.send(message) if send_error: return { "action": action_name, "action_type": action_type, "success": False, "error": f"Failed to send action goal: {send_error}", } # Wait for action completion - handle both action_result and action_feedback actual_timeout = timeout if timeout is not None else 10.0 # Default 10 seconds start_time = time.time() last_feedback = None # Store the last feedback message feedback_count = 0 # Count feedback messages received while time.time() - start_time < actual_timeout: elapsed_time = time.time() - start_time response = ws_manager.receive(timeout=actual_timeout - elapsed_time) if response: try: msg_data = json.loads(response) # Handle action_result messages (final completion) if msg_data.get("op") == "action_result": # Report completion if ctx: try: completion_msg = f"Action completed successfully (received {feedback_count} feedback messages)" await ctx.report_progress( progress=feedback_count, total=None, message=completion_msg ) except Exception: pass return { "action": action_name, "action_type": action_type, "success": True, "goal_id": goal_id, "status": msg_data.get("status", "unknown"), "result": msg_data.get("values", {}), } # Store action_feedback messages and report progress if msg_data.get("op") == "action_feedback": feedback_count += 1 last_feedback = msg_data # Report feedback progress if ctx: try: feedback_values = msg_data.get("values", {}) feedback_msg = f"Action feedback #{feedback_count}: {str(feedback_values)[:100]}..." await ctx.report_progress( progress=feedback_count, total=None, message=feedback_msg ) except Exception: pass except json.JSONDecodeError: continue else: # No response received, continue waiting pass await asyncio.sleep(0.1) # Timeout - return last feedback if available if ctx and feedback_count > 0: try: await ctx.report_progress( progress=feedback_count, total=None, message=f"Action timed out after {actual_timeout} seconds (received {feedback_count} feedback messages)", ) except Exception: pass result = { "action": action_name, "action_type": action_type, "success": False, "goal_id": goal_id, "error": f"Action timed out after {actual_timeout} seconds", } if last_feedback: result["success"] = True result["last_feedback"] = last_feedback.get("values", {}) result["note"] = "Action timed out, but partial progress was made" return result @mcp.tool( description=( "Cancel a specific action goal. Works only with ROS 2.\n" "Example:\n" "cancel_action_goal('/turtle1/rotate_absolute', 'goal_1758653551839_21acd486')" ) ) def cancel_action_goal(action_name: str, goal_id: str) -> dict: """ Cancel a specific action goal. Works only with ROS 2. Args: action_name (str): The name of the action (e.g., '/turtle1/rotate_absolute') goal_id (str): The goal ID to cancel Returns: dict: Contains cancellation status and result. """ # Validate inputs if not action_name or not action_name.strip(): return {"error": "Action name cannot be empty"} if not goal_id or not goal_id.strip(): return {"error": "Goal ID cannot be empty"} # Create cancel message for rosbridge (based on rosbridge source code) cancel_message = { "op": "cancel_action_goal", "id": goal_id, # Use the actual goal ID, not a new one "action": action_name, "feedback": True, # Enable feedback messages } # Send the cancel request through rosbridge with ws_manager: # Send cancel request send_error = ws_manager.send(cancel_message) if send_error: return { "action": action_name, "goal_id": goal_id, "success": False, "error": f"Failed to send cancel request: {send_error}", } return { "action": action_name, "goal_id": goal_id, "success": True, "note": "Cancel request sent successfully. Action may still be executing.", } ## ############################################################################################## ## ## ## NETWORK DIAGNOSTICS ## ## ############################################################################################## ## @mcp.tool( description=( "Ping a robot's IP address and check if a specific port is open.\n" "A successful ping to the IP but not the port can indicate that ROSbridge is not running.\n" "Example:\n" "ping_robot(ip='192.168.1.100', port=9090)" ) ) def ping_robot(ip: str, port: int, ping_timeout: float = 2.0, port_timeout: float = 2.0) -> dict: """ Ping an IP address and check if a specific port is open. Args: ip (str): The IP address to ping (e.g., '192.168.1.100') port (int): The port number to check (e.g., 9090) ping_timeout (float): Timeout for ping in seconds. Default = 2.0. port_timeout (float): Timeout for port check in seconds. Default = 2.0. Returns: dict: Contains ping and port check results with detailed status information. """ return ping_ip_and_port(ip, port, ping_timeout, port_timeout) ## ############################################################################################## ## ## ## IMAGE ANALYSIS ## ## ############################################################################################## ## def _encode_image_to_imagecontent(image): """ Encodes a PIL Image to a format compatible with ImageContent. Args: image (PIL.Image.Image): The image to encode. Returns: ImageContent: JPEG-encoded image wrapped in an ImageContent object. """ buffer = io.BytesIO() image.save(buffer, format="JPEG") img_bytes = buffer.getvalue() img_obj = Image(data=img_bytes, format="jpeg") return img_obj.to_image_content() @mcp.tool( description=( "Analyze a previously received image that was saved by any ROS operation.\n" "Images can be received from:\n" "- Any topic containing image data (not just topics with 'Image' in the name)\n" "- Service responses containing image data\n" "- subscribe_once() or subscribe_for_duration() operations\n" "Use this tool to analyze the saved image after receiving it from any source.\n" ) ) def analyze_previously_received_image(): """ Analyze the previously received image saved at ./camera/received_image.jpeg This tool loads the previously saved image from './camera/received_image.jpeg' (which can be created by any ROS operation that receives image data), and converts it into an MCP-compatible ImageContent format so that the LLM can interpret it. Images can be received from: - Any Topic containing image data - Any Service responses containing image data - subscribe_once() or subscribe_for_duration() operations """ path = "./camera/received_image.jpeg" if not os.path.exists(path): return {"error": "No image found at ./camera/received_image.jpeg"} img = PILImage.open(path) return _encode_image_to_imagecontent(img) def parse_arguments(): """Parse command line arguments for MCP server configuration.""" parser = argparse.ArgumentParser( description="ROS MCP Server - Connect to ROS robots via MCP protocol", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python server.py # Use stdio transport (default) python server.py --transport http --host 0.0.0.0 --port 9000 python server.py --transport streamable-http --host 127.0.0.1 --port 8080 """, ) parser.add_argument( "--transport", choices=["stdio", "http", "streamable-http", "sse"], default="stdio", help="MCP transport protocol to use (default: stdio)", ) parser.add_argument( "--host", default="127.0.0.1", help="Host address for HTTP-based transports (default: 127.0.0.1)", ) parser.add_argument( "--port", type=int, default=9000, help="Port number for HTTP-based transports (default: 9000)", ) return parser.parse_args() def main(): """Main entry point for the MCP server console script.""" # Parse command line arguments args = parse_arguments() # Update global variables with parsed arguments global MCP_TRANSPORT, MCP_HOST, MCP_PORT MCP_TRANSPORT = args.transport.lower() MCP_HOST = args.host MCP_PORT = args.port if MCP_TRANSPORT == "stdio": # stdio doesn't need host/port mcp.run(transport="stdio") elif MCP_TRANSPORT in {"http", "streamable-http"}: # http and streamable-http both require host/port print(f"Transport: {MCP_TRANSPORT} -> http://{MCP_HOST}:{MCP_PORT}", file=sys.stderr) mcp.run(transport=MCP_TRANSPORT, host=MCP_HOST, port=MCP_PORT) elif MCP_TRANSPORT == "sse": print(f"Transport: {MCP_TRANSPORT} -> http://{MCP_HOST}:{MCP_PORT}", file=sys.stderr) print("Currently unsupported. Use 'stdio', 'http', or 'streamable-http'.", file=sys.stderr) mcp.run(transport=MCP_TRANSPORT, host=MCP_HOST, port=MCP_PORT) else: raise ValueError( f"Unsupported MCP_TRANSPORT={MCP_TRANSPORT!r}. " "Use 'stdio', 'http', or 'streamable-http'." ) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/robotmcp/ros-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server