Skip to main content
Glama

ROS2 MCP Server

ros2-mcp-server.py2.71 kB
import rclpy from rclpy.node import Node from geometry_msgs.msg import Twist from fastmcp import FastMCP import asyncio import threading class ROS2MCPNode(Node): def __init__(self): super().__init__('ros2_mcp_server_node') # pub / sub self.pub_cmd_vel = self.create_publisher(Twist, '/cmd_vel', 10) self.get_logger().info('ROS2 node initialized') async def send_twist_with_duration(self, linear: list[float], angular: list[float], duration: float) -> str: """指定時間だけTwistメッセージを発行し、その後停止""" try: # 速度メッセージの作成 msg = Twist() msg.linear.x = float(linear[0]) msg.linear.y = float(linear[1]) if len(linear) > 1 else 0.0 msg.linear.z = float(linear[2]) if len(linear) > 2 else 0.0 msg.angular.x = float(angular[0]) if len(angular) > 0 else 0.0 msg.angular.y = float(angular[1]) if len(angular) > 1 else 0.0 msg.angular.z = float(angular[2]) if len(angular) > 2 else 0.0 # 速度メッセージの発行(10Hzで安定発行) start_time = self.get_clock().now() duration_sec = duration while (self.get_clock().now() - start_time).nanoseconds / 1e9 < duration_sec: self.pub_cmd_vel.publish(msg) self.get_logger().info(f'Publishing Twist: linear={msg.linear.x}, angular={msg.angular.z}') await asyncio.sleep(0.1) # 10Hz # 停止メッセージの発行 stop_msg = Twist() self.pub_cmd_vel.publish(stop_msg) self.get_logger().info('Published stop command') return f"Successfully moved for {duration} seconds and stopped" except Exception as e: self.get_logger().error(f"Error publishing Twist: {str(e)}") return f"Error: {str(e)}" def run_ros2_spin(node): rclpy.spin(node) async def main(): rclpy.init() ros_node = ROS2MCPNode() ros_thread = threading.Thread(target=run_ros2_spin, args=(ros_node,), daemon=True) ros_thread.start() mcp = FastMCP("ros2-mcp-server") @mcp.tool() async def move_robot(linear: list[float], angular: list[float], duration: float) -> str: """Send movement commands to the robot for a specified duration""" return await ros_node.send_twist_with_duration(linear, angular, duration) # MCP サーバの実行(stdio トランスポート) # anyioの代わりに直接asyncioを使用 await mcp.run_async() # clean up ros_node.destroy_node() rclpy.shutdown() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kakimochi/ros2-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server