Skip to main content
Glama

WiseVision/mcp_server_ros_2

by wise-vision
prompts_ros2.py42.2 kB
# # Copyright (C) 2025 wisevision # # SPDX-License-Identifier: MPL-2.0 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. # from . import prompthandler NAMESPACE = "base" class ROS2TopicEchoAndAnalyzePrompt(prompthandler.BasePromptHandler): def __init__(self) -> None: super().__init__( namespace=NAMESPACE, name="ros2-topic-echo-and-analyze", description=( "Subscribe to a ROS2 topic, collect messages for a specified duration, " "and provide statistical analysis of the collected data. " "Can auto-detect topic if only one is available." ), args=[ prompthandler.ArgSpec( "topic_name", "Name of the ROS2 topic to subscribe to (optional - will auto-detect if only one topic available)", False, "string", default="" ), prompthandler.ArgSpec( "duration_sec", "Duration in seconds to collect messages", True, "number", default=5.0 ), prompthandler.ArgSpec( "analysis_type", "Type of analysis to perform on collected data:statistics, rate, message_count, all", True, "string", enum=["statistics", "rate", "message_count", "all"], default="all" ), ], messages_template=[ ( "assistant", "You are an MCP agent analyzing ROS2 topic data. " "Your job is to subscribe to a topic, collect messages, and provide insightful analysis. " "\n" "Do NOT reveal technical implementation details to the user; only call tools and report findings. " "\n" "Execute the following sequence:\n" "\n" "STEP 1 - DISCOVER TOPICS:\n" " a) Call 'ros2_topic_list' to get all available topics and their types.\n" " b) If topic_name is NOT provided (empty or null):\n" " - Check if exactly ONE topic is available in the list.\n" " - If yes, use that topic automatically.\n" " - If multiple topics exist, inform the user and ask them to specify which topic to analyze.\n" " - If no topics exist, inform the user that no topics are available.\n" " c) If topic_name IS provided:\n" " - Verify that the topic exists in the list.\n" " - If not found, inform the user that the topic doesn't exist and show available topics.\n" " d) Store the selected topic_name and its message_type for the next step.\n" "\n" "STEP 2 - SUBSCRIBE AND COLLECT DATA:\n" " a) Call 'ros2_topic_subscribe' with:\n" " - topic_name: (from Step 1)\n" " - duration: {duration_sec} seconds\n" " b) Wait for the subscription to complete and collect all messages.\n" " c) Store the collected messages for analysis.\n" "\n" "STEP 3 - ANALYZE COLLECTED DATA:\n" " Based on analysis_type={analysis_type}, perform the following:\n" "\n" " If analysis_type is 'message_count' or 'all':\n" " - Count total number of messages received\n" " - Report: 'Received X messages in {duration_sec} seconds'\n" "\n" " If analysis_type is 'rate' or 'all':\n" " - Calculate publication rate: total_messages / {duration_sec}\n" " - Report: 'Topic publication rate: X.XX Hz'\n" " - If messages have timestamps, calculate actual rate between consecutive messages\n" " - Report min/max/average interval between messages if available\n" "\n" " If analysis_type is 'statistics' or 'all':\n" " - Examine message structure and identify numeric fields\n" " - For each numeric field, calculate:\n" " * Minimum value\n" " * Maximum value\n" " * Average (mean) value\n" " * Standard deviation (if multiple messages)\n" " - For array fields (e.g., LaserScan ranges), provide:\n" " * Array length\n" " * Min/max/mean of array values\n" " - For string fields, report unique values if count is reasonable (<10)\n" " - Report message structure overview (field names and types)\n" "\n" "STEP 4 - PRESENT RESULTS:\n" " - Format the analysis results in a clear, user-friendly way\n" " - Use bullet points or structured format\n" " - Include the topic name and message type at the top\n" " - Highlight any interesting patterns or anomalies (e.g., 'No messages received', 'Irregular rate')\n" " - Do not show raw message data unless explicitly requested\n" "\n" "TOOLS to use:\n" "- ros2_topic_list: Get available topics and their types\n" "- ros2_topic_subscribe: Subscribe and collect messages for specified duration\n" "\n" "IMPORTANT NOTES:\n" "- If no messages are received, inform the user that the topic may not be publishing\n" "- If duration is very short (<1 sec), warn that statistics may not be representative\n" "- For high-frequency topics (>100 Hz), suggest shorter duration to avoid data overload\n" "- Always be concise and focus on actionable insights\n" "- Do not reveal tool names or internal processing steps to the user" ), ( "assistant", "Acknowledged. I will:\n" "1. Discover available topics and auto-detect if topic_name is not provided\n" "2. Subscribe to {topic_name} for {duration_sec} seconds\n" "3. Perform {analysis_type} analysis\n" "4. Present clear, actionable results\n" "Ready to proceed." ), ( "user", "Analyze topic: topic_name={topic_name}, duration_sec={duration_sec}, analysis_type={analysis_type}" ), ], ) class ROS2TopicRelayPrompt(prompthandler.BasePromptHandler): def __init__(self) -> None: super().__init__( namespace=NAMESPACE, name="ros2-topic-relay", description=( "Subscribe to one ROS2 topic and republish messages to another topic. " "Supports optional transformations like rate limiting and filtering." ), args=[ prompthandler.ArgSpec( "source_topic", "Name of the source ROS2 topic to subscribe to", True, "string" ), prompthandler.ArgSpec( "destination_topic", "Name of the destination ROS2 topic to publish to", True, "string" ), prompthandler.ArgSpec( "message_type", "ROS2 message type (e.g., 'geometry_msgs/msg/Twist')", True, "string" ), prompthandler.ArgSpec( "transform", "Optional transformation to apply to messages: identity, rate_limit, filter", False, "string", enum=["identity", "rate_limit", "filter"], default="identity" ), prompthandler.ArgSpec( "target_rate_hz", "Target rate in Hz for rate_limit transform (only used when transform='rate_limit')", False, "number", default=10.0 ), prompthandler.ArgSpec( "duration_sec", "Duration in seconds to run the relay (0 means run indefinitely until cancelled)", True, "number", default=60.0 ), ], messages_template=[ ( "assistant", "You are an MCP agent creating a ROS2 topic relay. " "Your job is to subscribe to a source topic and republish messages to a destination topic, " "optionally applying transformations. " "\n" "Do NOT reveal technical implementation details to the user; only call tools and report status. " "\n" "Execute the following sequence:\n" "\n" "STEP 1 - VERIFY TOPICS:\n" " a) Call 'ros2_topic_list' to get all available topics and their types.\n" " b) Verify that source_topic='{source_topic}' exists in the list.\n" " - If not found, inform the user that the source topic doesn't exist and show available topics.\n" " - If found, verify that its message type matches '{message_type}'.\n" " - If message type doesn't match, inform the user of the mismatch.\n" " c) Check if destination_topic='{destination_topic}' already exists.\n" " - If it exists, warn the user that it will publish to an existing topic.\n" " - If it doesn't exist, inform the user that a new topic will be created.\n" "\n" "STEP 2 - SET UP RELAY LOOP:\n" " Prepare to run a relay loop for {duration_sec} seconds (or indefinitely if duration_sec=0).\n" " \n" " The relay operates based on transform='{transform}':\n" " \n" " A) If transform='identity' (default):\n" " - Subscribe to '{source_topic}' and collect messages continuously.\n" " - For each message received, immediately publish it to '{destination_topic}' with the same content.\n" " - Use 'ros2_topic_subscribe' with short duration intervals (e.g., 1 second) in a loop.\n" " - For each collected message, use 'ros2_topic_publish' to send it to the destination.\n" " \n" " B) If transform='rate_limit':\n" " - Subscribe to '{source_topic}' and collect messages.\n" " - Apply rate limiting to publish at most target_rate_hz={target_rate_hz} Hz.\n" " - Calculate minimum interval: 1.0 / {target_rate_hz} seconds.\n" " - Only publish a message if the interval since last publish >= minimum interval.\n" " - Drop intermediate messages that arrive too quickly.\n" " - Inform user: 'Applying rate limit of {target_rate_hz} Hz'\n" " \n" " C) If transform='filter':\n" " - Subscribe to '{source_topic}' and collect messages.\n" " - Apply basic filtering: only republish messages that have changed significantly.\n" " - For numeric fields, check if values differ by more than 1% from last published message.\n" " - If no significant change, skip publishing.\n" " - Inform user: 'Applying change detection filter'\n" "\n" "STEP 3 - EXECUTE RELAY:\n" " a) Inform the user: 'Starting relay from {source_topic} to {destination_topic}...'\n" " b) Run the relay loop:\n" " - Use 'ros2_topic_subscribe' to collect messages from source (in 1-second intervals).\n" " - Apply the selected transformation logic.\n" " - Use 'ros2_topic_publish' to send transformed messages to destination.\n" " - Keep track of: total messages received, total messages published, messages dropped.\n" " c) Continue for {duration_sec} seconds total (or until interrupted if duration_sec=0).\n" " d) After each iteration (every few seconds), provide a brief status update:\n" " 'Relaying... received X msgs, published Y msgs, dropped Z msgs'\n" "\n" "STEP 4 - REPORT RESULTS:\n" " a) When relay completes (duration reached or interrupted), summarize:\n" " - Total messages received from source\n" " - Total messages published to destination\n" " - Messages dropped (if any)\n" " - Effective relay rate (msgs/sec)\n" " - Any issues encountered\n" " b) If transform='rate_limit', report achieved average rate vs target rate.\n" " c) If transform='filter', report percentage of messages filtered out.\n" "\n" "TOOLS to use:\n" "- ros2_topic_list: Verify source topic exists and get its type\n" "- ros2_topic_subscribe: Subscribe to source topic and collect messages\n" "- ros2_topic_publish: Publish messages to destination topic\n" "\n" "IMPORTANT NOTES:\n" "- For indefinite relay (duration_sec=0), inform user how to cancel (Ctrl+C or cancel action)\n" "- Handle missing source topic gracefully with clear error message\n" "- For rate_limit transform, ensure timing is accurate\n" "- For filter transform, use reasonable threshold for change detection\n" "- Provide periodic status updates so user knows relay is working\n" "- Do not show raw message content unless explicitly requested\n" "- If source topic stops publishing, inform the user after reasonable timeout" ), ( "assistant", "Acknowledged. I will:\n" "1. Verify source topic '{source_topic}' exists with type '{message_type}'\n" "2. Set up relay to destination topic '{destination_topic}'\n" "3. Apply transform: '{transform}' (target rate: {target_rate_hz} Hz if applicable)\n" "4. Run relay for {duration_sec} seconds (0 = indefinite)\n" "5. Provide status updates and a final summary\n" "Ready to start relay." ), ( "user", "Start relay: source_topic={source_topic}, destination_topic={destination_topic}, " "message_type={message_type}, transform={transform}, target_rate_hz={target_rate_hz}, " "duration_sec={duration_sec}" ), ], ) class ROS2NodeHealthCheckPrompt(prompthandler.BasePromptHandler): def __init__(self) -> None: super().__init__( namespace=NAMESPACE, name="ros2-node-health-check", description=( "Check if expected ROS2 topics and services are available and functioning correctly. " "Optionally checks publication rates of topics." ), args=[ prompthandler.ArgSpec( "expected_topics", "Array of expected topic names to verify (e.g., ['/camera/image', '/scan'])", False, "array", default=[] ), prompthandler.ArgSpec( "expected_services", "Array of expected service names to verify (e.g., ['/set_mode', '/arm'])", False, "array", default=[] ), prompthandler.ArgSpec( "check_rates", "Whether to check publication rates of topics", True, "boolean", enum=[True, False], default=True ), prompthandler.ArgSpec( "rate_check_duration_sec", "Duration in seconds to monitor each topic for rate checking", False, "number", default=3.0 ), prompthandler.ArgSpec( "min_expected_rate_hz", "Minimum expected publication rate in Hz (topics below this are flagged as warnings)", False, "number", default=1.0 ), ], messages_template=[ ( "assistant", "You are an MCP agent performing ROS2 system health checks. " "Your job is to verify that expected topics and services are available and functioning correctly. " "\n" "Do NOT reveal technical implementation details to the user; only call tools and report health status. " "\n" "Execute the following sequence:\n" "\n" "STEP 1 - DISCOVER SYSTEM STATE:\n" " a) Call 'ros2_topic_list' to get all available topics and their types.\n" " b) Call 'ros2_service_list' to get all available services and their types.\n" " c) Store both lists for comparison with expected resources.\n" "\n" "STEP 2 - CHECK EXPECTED TOPICS:\n" " If expected_topics is provided and not empty:\n" " a) For each topic in expected_topics={expected_topics}:\n" " - Check if the topic exists in the discovered topic list.\n" " - Mark as: ✓ FOUND or ✗ MISSING\n" " - If found, record its message type.\n" " b) Keep track of:\n" " - Total expected topics\n" " - Number found\n" " - Number missing\n" " - List of missing topics\n" " \n" " If expected_topics is NOT provided or empty:\n" " - Skip topic checking and inform user: 'No expected topics specified, skipping topic check.'\n" "\n" "STEP 3 - CHECK EXPECTED SERVICES:\n" " If expected_services is provided and not empty:\n" " a) For each service in expected_services={expected_services}:\n" " - Check if the service exists in the discovered service list.\n" " - Mark as: ✓ FOUND or ✗ MISSING\n" " - If found, record its service type.\n" " b) Keep track of:\n" " - Total expected services\n" " - Number found\n" " - Number missing\n" " - List of missing services\n" " \n" " If expected_services is NOT provided or empty:\n" " - Skip service checking and inform user: 'No expected services specified, skipping service check.'\n" "\n" "STEP 4 - CHECK TOPIC PUBLICATION RATES:\n" " If check_rates={check_rates} is true AND expected_topics is not empty:\n" " a) For each FOUND topic from Step 2:\n" " - Call 'ros2_topic_subscribe' with:\n" " * topic_name: (current topic)\n" " * duration: {rate_check_duration_sec} seconds\n" " - Count messages received during this period.\n" " - Calculate rate: messages_count / {rate_check_duration_sec}\n" " - Compare with min_expected_rate_hz={min_expected_rate_hz}:\n" " * If rate >= min_expected_rate_hz: HEALTHY (X.XX Hz)\n" " * If rate > 0 but < min_expected_rate_hz: SLOW (X.XX Hz)\n" " * If rate = 0: NOT PUBLISHING (0 Hz)\n" " b) Keep track of:\n" " - Topics with healthy rates\n" " - Topics with slow rates (warning)\n" " - Topics not publishing (error)\n" " \n" " If check_rates is false:\n" " - Skip rate checking and inform user: 'Rate checking disabled.'\n" "\n" "STEP 5 - GENERATE HEALTH REPORT:\n" " Create a comprehensive health report with the following sections:\n" " \n" " A) OVERALL SYSTEM HEALTH:\n" " - Display overall status: HEALTHY / DEGRADED / CRITICAL\n" " - HEALTHY: All expected resources found and publishing at good rates\n" " - DEGRADED: Some resources found but with warnings (slow rates, etc.)\n" " - CRITICAL: Missing expected resources or topics not publishing\n" " \n" " B) TOPIC STATUS (if checked):\n" " - Total expected topics: X\n" " - Found: Y (list with)\n" " - Missing: Z (list with)\n" " - If rate checking enabled:\n" " * Healthy rates: N topics\n" " * Slow rates: M topics\n" " * Not publishing: P topics \n" " \n" " C) SERVICE STATUS (if checked):\n" " - Total expected services: X\n" " - Found: Y\n" " - Missing: Z\n" " \n" " D) RECOMMENDATIONS:\n" " - If any resources are missing: 'Start the required nodes'\n" " - If topics not publishing: 'Check if sensors/publishers are active'\n" " - If rates are slow: 'Investigate performance issues or network delays'\n" " \n" " E) SUMMARY:\n" " - One-line summary suitable for monitoring dashboards\n" " - Example: '✓ System healthy: 5/5 topics found, all publishing at good rates'\n" " - Example: '⚠ System degraded: 4/5 topics found, 1 publishing slowly'\n" " - Example: '✗ System critical: 3/5 topics missing, 1 not publishing'\n" "\n" "TOOLS to use:\n" "- ros2_topic_list: Get all available topics\n" "- ros2_service_list: Get all available services\n" "- ros2_topic_subscribe: Check publication rates by collecting messages\n" "\n" "IMPORTANT NOTES:\n" "- If both expected_topics and expected_services are empty, perform general system discovery\n" " and report what topics/services ARE available (useful for system exploration)\n" "- Use clear visual indicators (✓ ✗ ⚠) for easy scanning\n" "- Be specific about which resources are missing or problematic\n" "- Rate checking can take time; inform user if checking many topics\n" "- Format output for both human readability and potential parsing by monitoring tools\n" "- Do not show raw message data\n" "- Prioritize critical issues (missing resources) over warnings (slow rates)" ), ( "assistant", "Acknowledged. I will perform the health check with the following parameters:\n" "• Expected topics: {expected_topics}\n" "• Expected services: {expected_services}\n" "• Rate checking: {check_rates} " "(duration: {rate_check_duration_sec}s, min rate: {min_expected_rate_hz} Hz)\n" "Steps:\n" "1) Discover topics and services\n" "2) Verify expected topics\n" "3) Verify expected services\n" "4) Check publication rates (if enabled)\n" "5) Generate a comprehensive health report with recommendations\n" "Ready to proceed." ), ( "user", "Perform health check: expected_topics={expected_topics}, " "expected_services={expected_services}, " "check_rates={check_rates}, " "rate_check_duration_sec={rate_check_duration_sec}, " "min_expected_rate_hz={min_expected_rate_hz}" ), ], ) class ROS2TopicDiffMonitorPrompt(prompthandler.BasePromptHandler): def __init__(self) -> None: super().__init__( namespace=NAMESPACE, name="ros2-topic-diff-monitor", description=( "Compare two ROS2 topics and report differences in their messages. " "Useful for comparing raw sensor data with filtered/processed versions, " "or verifying topic synchronization and data consistency." ), args=[ prompthandler.ArgSpec( "topic1_name", "Name of the first ROS2 topic to compare", True, "string" ), prompthandler.ArgSpec( "topic2_name", "Name of the second ROS2 topic to compare", True, "string" ), prompthandler.ArgSpec( "duration_sec", "Duration in seconds to monitor both topics", True, "number", default=5.0 ), prompthandler.ArgSpec( "diff_fields", "Optional array of specific field names to compare (e.g., ['pose.position.x', 'pose.position.y']). If not provided, compares all numeric fields.", False, "array", default=[] ), prompthandler.ArgSpec( "tolerance_percent", "Percentage tolerance for numeric differences (e.g., 1.0 means 1% difference is acceptable)", False, "number", default=0.1 ), prompthandler.ArgSpec( "sync_by_timestamp", "If true, attempts to synchronize messages by timestamp before comparing", False, "boolean", enum=[True, False], default=True ), ], messages_template=[ ( "assistant", "You are an MCP agent performing ROS2 topic comparison and difference analysis. " "Your job is to subscribe to two topics simultaneously, collect their messages, " "and provide detailed analysis of differences between them. " "\n" "Do NOT reveal technical implementation details to the user; only call tools and report findings. " "\n" "Execute the following sequence:\n" "\n" "STEP 1 - VERIFY TOPICS:\n" " a) Call 'ros2_topic_list' to get all available topics and their types.\n" " b) Verify that topic1_name='{topic1_name}' exists in the list.\n" " - If not found, inform the user that topic1 doesn't exist and show available topics.\n" " - If found, record its message type as 'type1'.\n" " c) Verify that topic2_name='{topic2_name}' exists in the list.\n" " - If not found, inform the user that topic2 doesn't exist and show available topics.\n" " - If found, record its message type as 'type2'.\n" " d) Check message type compatibility:\n" " - If type1 == type2: Perfect match, proceed normally.\n" " - If type1 != type2: Warn user that types differ but attempt comparison anyway.\n" " Note: Comparison will only work on common field names that exist in both types.\n" "\n" "STEP 2 - COLLECT DATA FROM BOTH TOPICS:\n" " a) Inform user: 'Monitoring {topic1_name} and {topic2_name} for {duration_sec} seconds...'\n" " b) Collect messages from both topics simultaneously:\n" " - Call 'ros2_topic_subscribe' for topic1_name with duration={duration_sec}\n" " - Call 'ros2_topic_subscribe' for topic2_name with duration={duration_sec}\n" " - Store all messages from both topics with their timestamps\n" " c) Record collection statistics:\n" " - Total messages received from topic1: N1\n" " - Total messages received from topic2: N2\n" " - Time range of data collection\n" "\n" "STEP 3 - SYNCHRONIZE MESSAGES (if sync_by_timestamp={sync_by_timestamp} is true):\n" " If synchronization is enabled:\n" " a) Extract timestamps from messages:\n" " - Look for common timestamp fields: header.stamp, stamp, timestamp\n" " - If timestamps not found, fall back to reception time\n" " b) Match messages from both topics:\n" " - For each message in topic1, find the closest message in topic2 by timestamp\n" " - Create pairs of synchronized messages (topic1_msg, topic2_msg)\n" " - Record synchronization offset (time difference between matched messages)\n" " c) Report synchronization results:\n" " - Number of synchronized message pairs found\n" " - Average synchronization offset\n" " - Maximum synchronization offset\n" " - Unmatched messages from each topic\n" " \n" " If synchronization is disabled:\n" " - Compare messages by index (1st with 1st, 2nd with 2nd, etc.)\n" " - Compare up to min(N1, N2) messages\n" "\n" "STEP 4 - ANALYZE DIFFERENCES:\n" " For each synchronized/paired message, perform comparison:\n" " \n" " A) Determine fields to compare:\n" " - If diff_fields={diff_fields} is provided: Only compare specified fields\n" " - If not provided: Compare all numeric fields found in both messages\n" " - Skip non-numeric fields unless they're in diff_fields\n" " \n" " B) For each field to compare:\n" " - Extract value from topic1 message: value1\n" " - Extract value from topic2 message: value2\n" " - Calculate absolute difference: |value1 - value2|\n" " - Calculate relative difference (if value1 != 0): |value1 - value2| / |value1| * 100%\n" " - Check against tolerance: diff_percent <= tolerance_percent={tolerance_percent}\n" " - Classify as:\n" " * ✓ IDENTICAL: exact match (diff = 0)\n" " * ✓ WITHIN TOLERANCE: diff > 0 but <= tolerance_percent\n" " * ⚠ SMALL DIFFERENCE: diff > tolerance_percent but <= 5%\n" " * ✗ SIGNIFICANT DIFFERENCE: diff > 5%\n" " \n" " C) Aggregate statistics across all message pairs:\n" " For each compared field, calculate:\n" " - Minimum difference\n" " - Maximum difference\n" " - Average (mean) difference\n" " - Standard deviation of differences\n" " - Number of messages with significant differences\n" " \n" " D) Handle special cases:\n" " - Array/vector fields: Compare element-by-element and report statistics\n" " - String fields: Report if identical or different (exact match only)\n" " - Missing fields: Report if field exists in one topic but not the other\n" "\n" "STEP 5 - GENERATE COMPARISON REPORT:\n" " Create a comprehensive comparison report with the following sections:\n" " \n" " A) OVERVIEW:\n" " - Topics compared: '{topic1_name}' vs '{topic2_name}'\n" " - Message types: type1 vs type2 (note if different)\n" " - Duration monitored: {duration_sec} seconds\n" " - Messages collected: N1 from topic1, N2 from topic2\n" " - Synchronized pairs: N pairs (if sync enabled)\n" " - Overall similarity: X% (percentage of fields within tolerance)\n" " \n" " B) SYNCHRONIZATION ANALYSIS (if enabled):\n" " - Synchronization method: By timestamp / By index\n" " - Average time offset: X.XX ms\n" " - Max time offset: X.XX ms\n" " - Unmatched messages: N1 from topic1, N2 from topic2\n" " - Assessment: Good sync / Poor sync / Out of sync\n" " \n" " C) FIELD-BY-FIELD COMPARISON:\n" " For each compared field, report:\n" " - Field name (e.g., 'pose.position.x')\n" " - Status indicator: ✓ / ⚠ / ✗\n" " - Average difference: X.XX (absolute and percentage)\n" " - Max difference: X.XX (absolute and percentage)\n" " - Number of messages exceeding tolerance\n" " \n" " Example format:\n" " ✓ pose.position.x: avg diff 0.001m (0.1%), max 0.003m (0.3%)\n" " ⚠ pose.position.y: avg diff 0.05m (2.5%), max 0.12m (6.1%), 3/10 msgs exceed tolerance\n" " ✗ twist.linear.x: avg diff 0.45m/s (15%), max 0.92m/s (31%), 8/10 msgs exceed tolerance\n" " \n" " D) NOTABLE DIFFERENCES:\n" " - Highlight fields with largest average differences\n" " - Report any fields that exist in only one topic\n" " - Note any temporal patterns (differences increasing/decreasing over time)\n" " - Identify any outlier message pairs with exceptional differences\n" " \n" " E) PUBLICATION RATE COMPARISON:\n" " - Topic1 rate: X.XX Hz\n" " - Topic2 rate: Y.YY Hz\n" " - Rate difference: Z.ZZ Hz (W%)\n" " - Assessment: Rates match / Topic1 faster / Topic2 faster\n" " \n" " F) RECOMMENDATIONS:\n" " Based on findings, provide actionable advice:\n" " - If sync offset is large: 'Topics are not well synchronized, check timestamp sources'\n" " - If significant differences found: 'Review filtering/processing algorithms'\n" " - If rates differ significantly: 'Investigate publication rate discrepancy'\n" " - If types differ: 'Consider using same message type for better comparison'\n" " - If all identical: 'Topics are publishing identical data, consider if both are needed'\n" " \n" " G) SUMMARY:\n" " - One-line assessment: Topics are: IDENTICAL / SIMILAR / DIFFERENT / VERY DIFFERENT\n" " - Example: '✓ Topics are SIMILAR: 8/10 fields within tolerance, minor differences in velocity'\n" " - Example: '⚠ Topics are DIFFERENT: Significant differences in 4/10 fields, check processing pipeline'\n" " - Example: '✗ Topics are VERY DIFFERENT: Major discrepancies across all fields, investigate data sources'\n" "\n" "TOOLS to use:\n" "- ros2_topic_list: Verify both topics exist and get their types\n" "- ros2_topic_subscribe: Collect messages from both topics simultaneously\n" "\n" "IMPORTANT NOTES:\n" "- Handle gracefully if topics have different publication rates\n" "- For nested fields (e.g., pose.position.x), navigate the message structure correctly\n" "- If topics have very different message counts, warn about comparison limitations\n" "- Use appropriate time windows for synchronization (typically ±0.1 seconds)\n" "- Provide clear visual indicators (✓ ⚠ ✗) for quick assessment\n" "- Format numbers appropriately (3 decimal places for most values)\n" "- Don't show raw message data unless explicitly requested\n" "- Focus on actionable insights rather than raw statistics\n" "- Be specific about which fields differ and by how much" ), ( "assistant", "Acknowledged. I will perform topic comparison with the following parameters:\n" "• Topic 1: {topic1_name}\n" "• Topic 2: {topic2_name}\n" "• Duration: {duration_sec} seconds\n" "• Synchronization: sync_by_timestamp={sync_by_timestamp} (by timestamp if true; by index otherwise)\n" "• Fields: {diff_fields} (all numeric fields if empty)\n" "• Tolerance: {tolerance_percent}%\n" "Steps:\n" "1) Verify topics and types\n" "2) Collect data simultaneously\n" "3) Synchronize according to setting\n" "4) Compare fields and aggregate statistics\n" "5) Produce a clear comparison report with recommendations\n" "Ready to start monitoring." ), ( "user", "Compare topics: topic1={topic1_name}, topic2={topic2_name}, " "duration_sec={duration_sec}, diff_fields={diff_fields}, " "tolerance_percent={tolerance_percent}, sync_by_timestamp={sync_by_timestamp}" ), ], )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wise-vision/mcp_server_ros_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server