Skip to main content
Glama
827346462

Robot Navigation MCP Server

by 827346462
server.py15.2 kB
''' MCP服务 - Sevnce机器人导航服务 提供以下功能: - 订阅机器人位置 - 导航到某个坐标点 - 导航到导航点 - 订阅设备状态 - 订阅导航状态 - 订阅任务状态 - 获取速度数据 - 获取地图点数据 ''' from fastmcp import FastMCP from .robot_nav_client import RobotNavClient, StatusCode,get_status_description import json import threading import time from typing import Literal, Optional, Annotated from pydantic import BaseModel, Field from contextlib import asynccontextmanager from typing import AsyncIterator # 初始化MCP服务 nav_mcp = FastMCP("nav_mcp") nav_client = None# 初始化机器人客户端(默认连接本地机器人的导航服务) @nav_mcp.tool( name="get_robot_position", description="获取机器人当前位置。" ) def get_robot_position() -> dict: # 使用便捷方法获取机器人当前位置 pose_data = nav_client.get_robot_pose() if pose_data: position = pose_data.get("position", {}) angle = pose_data.get("angle", 0) return { "status": "success", "message": "成功获取机器人当前位置", "data_description": f"机器人当前位于坐标 ({position.get('x', 0):.3f}, {position.get('y', 0):.3f}),朝向角度 {angle:.3f} 弧度", "data": { "x": position.get('x', 0), "y": position.get('y', 0), "angle": angle } } else: return { "status": "error", "message": "无法获取机器人位置数据", "data_description": "无法获取机器人当前的位置信息", "data": {} } @nav_mcp.tool( name="get_robot_device_status", description="获取设备状态,包括电量,线速度,角速度等。" ) def get_robot_device_status() -> dict: # 存储最新的设备状态数据 device_data_storage = {} def on_device_status(data): device_data = data.get("data", {}) battery = device_data.get("battery", 0) voltage = device_data.get("batteryVoltage", 0) mileage = device_data.get("totalMileage", 0) linear_vel = device_data.get("linear_vel", 0) angle_vel = device_data.get("angle_vel", 0) # 更新存储的数据 device_data_storage.update({ "battery": battery, "voltage": voltage, "mileage": mileage, "linear_vel": linear_vel, "angle_vel": angle_vel }) print(f"[设备状态] 电池: {battery:.1f}%, 电压: {voltage:.2f}V, 里程: {mileage:.2f}km, 线速度: {linear_vel:.2f}m/s, 角速度: {angle_vel:.2f}rad/s") connection_id = nav_client.subscribe_device_status(on_device_status) time.sleep(0.1) nav_client.stop_websocket(connection_id) return { "status": "success", "message": "成功获取设备状态信息", "data_description": f"机器人当前电池电量 {device_data_storage.get('battery', 0):.1f}%,电池电压 {device_data_storage.get('voltage', 0):.2f}V,总里程 {device_data_storage.get('mileage', 0):.2f}km,线速度 {device_data_storage.get('linear_vel', 0):.2f}m/s,角速度 {device_data_storage.get('angle_vel', 0):.2f}rad/s", "data": { "battery": device_data_storage.get("battery", 0), "voltage": device_data_storage.get("voltage", 0), "mileage": device_data_storage.get("mileage", 0), "linear_vel": device_data_storage.get("linear_vel", 0), "angle_vel": device_data_storage.get("angle_vel", 0) } } @nav_mcp.tool( name="get_task_status", description="获取任务状态。" ) def get_task_status() -> dict: # 存储最新的任务状态数据 task_status_data = {} def on_task_status(data): status_code = data.get("statusCode") status_msg = data.get("statusMsg", "") status_data = data.get("statusData", {}) description = get_status_description(status_code) # 更新存储的数据 task_status_data.update({ "status_code": status_code, "status_msg": status_msg, "status_data": status_data, "description": description }) print(f"[任务状态] {description} - {status_msg}") # 如果到达目标点,显示详细信息 if status_code == StatusCode.TARGET_REACHED and status_data: position = status_data.get("position", {}) angle = status_data.get("angle", 0) point_name = status_data.get("point_name", "") print(f"[到达详情] 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad, 点名称: {point_name}") connection_id = nav_client.subscribe_task_status(on_task_status) # while True: time.sleep(0.1) nav_client.stop_websocket(connection_id) return { "status": "success", "message": "成功获取任务状态信息", "data_description": f"机器人当前任务状态: {task_status_data.get('description', '')} ({task_status_data.get('status_code', 0)}) - {task_status_data.get('status_msg', '')}", "data": { "status_code": task_status_data.get("status_code", 0), "status_msg": task_status_data.get("status_msg", ""), "status_data": task_status_data.get("status_data", {}), "description": task_status_data.get("description", "") } } @nav_mcp.tool( name="navigate_to_coordinates", description="导航到地图上的某个坐标点。机器人必须处于“就绪”状态方可调用。移动前自动执行路径规划、工作空间软限位校验和实时碰撞检测。若检测到障碍物、超出软限位范围,或收到急停信号,操作将立即中止并返回错误。" ) def navigate_to_coordinates( x: Annotated[float, Field(description="X坐标,单位米(m)")], y: Annotated[float, Field(description="Y坐标,单位米(m)")], angle: Annotated[float, Field(description="机器人朝向角度,单位弧度(rad)")], ) -> dict: result = nav_client.navigate_to_coordinates(x, y, angle) if result.get("successed", False): return { "status": "success", "message": f"已成功发送导航指令,机器人将导航到坐标点 ({x:.3f}, {y:.3f}),目标朝向角度 {angle:.3f} 弧度", "data_description": f"机器人已接受导航指令,正在前往坐标点 ({x:.3f}, {y:.3f}),目标朝向角度 {angle:.3f} 弧度", "data": result } else: return { "status": "error", "message": f"发送导航指令失败: {result.get('msg', '未知错误')}", "data_description": f"发送导航指令到坐标点 ({x:.3f}, {y:.3f}) 失败,错误信息: {result.get('msg', '未知错误')}", "data": result } @nav_mcp.tool( name="navigate_to_position", description="导航到导航点,只能导航到地图上已有的点位。机器人必须处于“就绪”状态方可调用。移动前自动执行路径规划、工作空间软限位校验和实时碰撞检测。若检测到障碍物、超出软限位范围,或收到急停信号,操作将立即中止并返回错误。" ) def navigate_to_position( map_name: Annotated[str, Field(description="地图名称")], position_name: Annotated[str, Field(description="导航点名称")], ) -> dict: task_status_data={} result = nav_client.navigate_to_position(map_name, position_name) time.sleep(1) if result.get("successed", False): def on_task_status(data): status_code = data.get("statusCode") status_msg = data.get("statusMsg", "") status_data = data.get("statusData", {}) description = get_status_description(status_code) # 更新存储的数据 task_status_data.update({ "status_code": status_code, "status_msg": status_msg, "status_data": status_data, "description": description }) time.sleep(0.1) print("开始监听机器人任务状态") connection_id = nav_client.subscribe_task_status(on_task_status) success=True nav_result='' while True: time.sleep(0.1) print(f"task_status_data:{task_status_data}") if task_status_data and task_status_data["status_code"]!=0: # 如果到达目标点,显示详细信息 if task_status_data["status_code"] == StatusCode.TARGET_REACHED and task_status_data["status_data"]: position = task_status_data["status_data"].get("position", {}) angle = task_status_data["status_data"].get("angle", 0) point_name = task_status_data["status_data"].get("point_name", "") print(f"[导航详情], 点位名称: {point_name} 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad") success=True # nav_result=f"[导航详情], 点位名称: {point_name} 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad" break elif (task_status_data["status_code"] == StatusCode.PLANNING_FAILED or task_status_data["status_code"] == StatusCode.OBSTACLE_DETECTED or task_status_data["status_code"] == StatusCode.INVALID_PATH or task_status_data["status_code"] == StatusCode.TARGET_UNREACHABLE or task_status_data["status_code"] == StatusCode.OFF_TRACK or task_status_data["status_code"] == StatusCode.SPEED_ABNORMAL or task_status_data["status_code"] == StatusCode.OBSTACLE_TIMEOUT or task_status_data["status_code"] == StatusCode.SPEED_FEEDBACK_TIMEOUT or task_status_data["status_code"] == StatusCode.SPEED_CMD_NOT_EXECUTED or task_status_data["status_code"] == StatusCode.PATH_PLANNING_FAILED ): success=False break else: break nav_client.stop_websocket(connection_id) if success: return { "status": "success", "message": f"机器人成功导航到地图 '{map_name}' 上的导航点 '{position_name}'", "data_description": f"机器人已成功到达目标点 '{position_name}',位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad", "data": task_status_data } else: return { "status": "error", "message": f"机器人导航失败: {task_status_data.get('description', '未知错误')}", "data_description": f"机器人导航到点 '{position_name}' 失败,错误状态: {task_status_data.get('description', '未知错误')}", "data": task_status_data } else: return { "status": "error", "message": f"发送导航指令失败: {result.get('msg', '未知错误')}", "data_description": f"发送导航指令到点 '{position_name}' 失败,错误信息: {result.get('msg', '未知错误')}", "data": result } @nav_mcp.tool( name="get_map_position", description="获取地图上已有的点位信息,包括点位名称,id等。" ) def get_map_position( map_name: Annotated[str, Field(description="地图名称")], ) -> dict: result = nav_client.get_graph_paths(map_name) if result.get("successed", False): return { "status": "success", "message": f"获取地图点位成功", "data_description": f"成功获取地图 '{map_name}' 上的所有点位信息", "data": result } else: return { "status": "error", "message": f"获取地图点位失败", "data_description": f"获取地图 '{map_name}' 上的点位信息失败", "data": result } @nav_mcp.tool( name="stop_current_task", description="停止机器人当前的导航任务。该操作会切断运动指令并保持当前位置锁定。注意:此操作不会断电,仅停止运动控制。" ) def stop_current_task() -> dict: result = nav_client.stop_current_task() if result.get("successed", False): return { "status": "success", "message": "机器人已急停", "data_description": "所有关节电机已停止输出,当前位置保持锁定", "data": { "stopped_at": time.time(), "emergency_stop_triggered": True } } else: return { "status": "error", "message": f"停止失败: {result.get('msg', '未知错误')}", "data_description": f"停止机器人当前任务失败,错误信息: {result.get('msg', '未知错误')}", "data": result } @nav_mcp.tool( name="detect_target", description="检测机器人深度相机画面的目标,返回目标的坐标。机器人必须处于“就绪”状态方可调用。" ) def detect_target(prompt: Annotated[str, Field(description="检测目标描述")]) -> dict: result = None if result and result.get("successed", False): return { "status": "success", "message": "目标检测成功", "data_description": f"成功检测到目标 '{prompt}',位置信息: {result.get('data', {})}", "data": result } else: return { "status": "error", "message": "目标检测失败", "data_description": f"未能检测到目标 '{prompt}'", "data": result if result else {} } def main(): """Run the MCP server""" import os import argparse parser = argparse.ArgumentParser(description="MCP Server") parser.add_argument("--port", type=int, default=int(8002), help="Port to run the MCP server on") parser.add_argument("--nav_server_ip", type=str, default="192.168.1.108", help="IP address of the navigation server") parser.add_argument("--nav_server_port", type=int, default=int(8086), help="Port to run the navigation server on") parser.add_argument("--nav_server_ws_port", type=int, default=int(8089), help="WebSocket port to run the navigation server on") args = parser.parse_args() global nav_client nav_client = RobotNavClient(host=args.nav_server_ip, http_port=args.nav_server_port, ws_port=args.nav_server_ws_port) nav_mcp.run(transport="streamable-http", host="0.0.0.0", port=args.port, path="/mcp/") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/827346462/mcp-robot-navigation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server