get_topics
Retrieve available ROS2 topics dynamically to interact with the Unitree Go2 robot via the MCP server, enabling precise control through natural language commands.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Input Schema (JSON Schema)
{
"properties": {},
"title": "get_topicsArguments",
"type": "object"
}
Implementation Reference
- server.py:20-39 (handler)The primary MCP tool handler for 'get_topics', decorated with @mcp.tool(). It retrieves ROS2 topics using either WebSocketManager (rosbridge) or direct Ros2Manager based on configuration.@mcp.tool() def get_topics(): if use_rosbridge: topic_info = ws_manager.get_topics() ws_manager.close() if topic_info: topics, types = zip(*topic_info) return { "topics": list(topics), "types": list(types) } else: return "No topics found" else: topics = ros2_manager.get_topics() return { "topics": topics }
- utils/ros2_manager.py:9-12 (helper)Helper method in Ros2Manager class that executes shell command to list ROS2 topics directly.def get_topics(self) -> list[str]: command = f"source {self.setup_sh_path} && ros2 topic list" output = subprocess.check_output(command, shell=True, executable='/bin/bash').decode() return output.split('\n')
- utils/websocket_manager.py:48-72 (helper)Helper method in WebSocketManager class that queries ROS topics via rosbridge WebSocket service.def get_topics(self) -> list[tuple[str, str]]: self.connect() if self.ws: try: self.send({ "op": "call_service", "service": "/rosapi/topics", "id": "get_topics_request_1" }) response = self.receive_binary() print(f"[WebSocket] Received response: {response}") if response: data = json.loads(response) if "values" in data: topics = data["values"].get("topics", []) types = data["values"].get("types", []) if topics and types and len(topics) == len(types): return list(zip(topics, types)) else: print("[WebSocket] Mismatch in topics and types length") except json.JSONDecodeError as e: print(f"[WebSocket] JSON decode error: {e}") except Exception as e: print(f"[WebSocket] Error: {e}") return []