'''
MCP服务 - Sevnce机器人导航服务
提供以下功能:
- 订阅机器人位置
- 导航到某个坐标点
- 导航到导航点
- 订阅设备状态
- 订阅导航状态
- 订阅任务状态
- 获取速度数据
- 获取地图点数据
'''
from fastmcp import FastMCP
from .robot_nav_client import RobotNavClient, StatusCode,get_status_description
import json
import threading
import time
from typing import Literal, Optional, Annotated
from pydantic import BaseModel, Field
from contextlib import asynccontextmanager
from typing import AsyncIterator
# 初始化MCP服务
nav_mcp = FastMCP("nav_mcp")
nav_client = None# 初始化机器人客户端(默认连接本地机器人的导航服务)
@nav_mcp.tool(
name="get_robot_position",
description="获取机器人当前位置。"
)
def get_robot_position() -> dict:
# 使用便捷方法获取机器人当前位置
pose_data = nav_client.get_robot_pose()
if pose_data:
position = pose_data.get("position", {})
angle = pose_data.get("angle", 0)
return {
"status": "success",
"message": "成功获取机器人当前位置",
"data_description": f"机器人当前位于坐标 ({position.get('x', 0):.3f}, {position.get('y', 0):.3f}),朝向角度 {angle:.3f} 弧度",
"data": {
"x": position.get('x', 0),
"y": position.get('y', 0),
"angle": angle
}
}
else:
return {
"status": "error",
"message": "无法获取机器人位置数据",
"data_description": "无法获取机器人当前的位置信息",
"data": {}
}
@nav_mcp.tool(
name="get_robot_device_status",
description="获取设备状态,包括电量,线速度,角速度等。"
)
def get_robot_device_status() -> dict:
# 存储最新的设备状态数据
device_data_storage = {}
def on_device_status(data):
device_data = data.get("data", {})
battery = device_data.get("battery", 0)
voltage = device_data.get("batteryVoltage", 0)
mileage = device_data.get("totalMileage", 0)
linear_vel = device_data.get("linear_vel", 0)
angle_vel = device_data.get("angle_vel", 0)
# 更新存储的数据
device_data_storage.update({
"battery": battery,
"voltage": voltage,
"mileage": mileage,
"linear_vel": linear_vel,
"angle_vel": angle_vel
})
print(f"[设备状态] 电池: {battery:.1f}%, 电压: {voltage:.2f}V, 里程: {mileage:.2f}km, 线速度: {linear_vel:.2f}m/s, 角速度: {angle_vel:.2f}rad/s")
connection_id = nav_client.subscribe_device_status(on_device_status)
time.sleep(0.1)
nav_client.stop_websocket(connection_id)
return {
"status": "success",
"message": "成功获取设备状态信息",
"data_description": f"机器人当前电池电量 {device_data_storage.get('battery', 0):.1f}%,电池电压 {device_data_storage.get('voltage', 0):.2f}V,总里程 {device_data_storage.get('mileage', 0):.2f}km,线速度 {device_data_storage.get('linear_vel', 0):.2f}m/s,角速度 {device_data_storage.get('angle_vel', 0):.2f}rad/s",
"data": {
"battery": device_data_storage.get("battery", 0),
"voltage": device_data_storage.get("voltage", 0),
"mileage": device_data_storage.get("mileage", 0),
"linear_vel": device_data_storage.get("linear_vel", 0),
"angle_vel": device_data_storage.get("angle_vel", 0)
}
}
@nav_mcp.tool(
name="get_task_status",
description="获取任务状态。"
)
def get_task_status() -> dict:
# 存储最新的任务状态数据
task_status_data = {}
def on_task_status(data):
status_code = data.get("statusCode")
status_msg = data.get("statusMsg", "")
status_data = data.get("statusData", {})
description = get_status_description(status_code)
# 更新存储的数据
task_status_data.update({
"status_code": status_code,
"status_msg": status_msg,
"status_data": status_data,
"description": description
})
print(f"[任务状态] {description} - {status_msg}")
# 如果到达目标点,显示详细信息
if status_code == StatusCode.TARGET_REACHED and status_data:
position = status_data.get("position", {})
angle = status_data.get("angle", 0)
point_name = status_data.get("point_name", "")
print(f"[到达详情] 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad, 点名称: {point_name}")
connection_id = nav_client.subscribe_task_status(on_task_status)
# while True:
time.sleep(0.1)
nav_client.stop_websocket(connection_id)
return {
"status": "success",
"message": "成功获取任务状态信息",
"data_description": f"机器人当前任务状态: {task_status_data.get('description', '')} ({task_status_data.get('status_code', 0)}) - {task_status_data.get('status_msg', '')}",
"data": {
"status_code": task_status_data.get("status_code", 0),
"status_msg": task_status_data.get("status_msg", ""),
"status_data": task_status_data.get("status_data", {}),
"description": task_status_data.get("description", "")
}
}
@nav_mcp.tool(
name="navigate_to_coordinates",
description="导航到地图上的某个坐标点。机器人必须处于“就绪”状态方可调用。移动前自动执行路径规划、工作空间软限位校验和实时碰撞检测。若检测到障碍物、超出软限位范围,或收到急停信号,操作将立即中止并返回错误。"
)
def navigate_to_coordinates(
x: Annotated[float, Field(description="X坐标,单位米(m)")],
y: Annotated[float, Field(description="Y坐标,单位米(m)")],
angle: Annotated[float, Field(description="机器人朝向角度,单位弧度(rad)")],
) -> dict:
result = nav_client.navigate_to_coordinates(x, y, angle)
if result.get("successed", False):
return {
"status": "success",
"message": f"已成功发送导航指令,机器人将导航到坐标点 ({x:.3f}, {y:.3f}),目标朝向角度 {angle:.3f} 弧度",
"data_description": f"机器人已接受导航指令,正在前往坐标点 ({x:.3f}, {y:.3f}),目标朝向角度 {angle:.3f} 弧度",
"data": result
}
else:
return {
"status": "error",
"message": f"发送导航指令失败: {result.get('msg', '未知错误')}",
"data_description": f"发送导航指令到坐标点 ({x:.3f}, {y:.3f}) 失败,错误信息: {result.get('msg', '未知错误')}",
"data": result
}
@nav_mcp.tool(
name="navigate_to_position",
description="导航到导航点,只能导航到地图上已有的点位。机器人必须处于“就绪”状态方可调用。移动前自动执行路径规划、工作空间软限位校验和实时碰撞检测。若检测到障碍物、超出软限位范围,或收到急停信号,操作将立即中止并返回错误。"
)
def navigate_to_position(
map_name: Annotated[str, Field(description="地图名称")],
position_name: Annotated[str, Field(description="导航点名称")],
) -> dict:
task_status_data={}
result = nav_client.navigate_to_position(map_name, position_name)
time.sleep(1)
if result.get("successed", False):
def on_task_status(data):
status_code = data.get("statusCode")
status_msg = data.get("statusMsg", "")
status_data = data.get("statusData", {})
description = get_status_description(status_code)
# 更新存储的数据
task_status_data.update({
"status_code": status_code,
"status_msg": status_msg,
"status_data": status_data,
"description": description
})
time.sleep(0.1)
print("开始监听机器人任务状态")
connection_id = nav_client.subscribe_task_status(on_task_status)
success=True
nav_result=''
while True:
time.sleep(0.1)
print(f"task_status_data:{task_status_data}")
if task_status_data and task_status_data["status_code"]!=0:
# 如果到达目标点,显示详细信息
if task_status_data["status_code"] == StatusCode.TARGET_REACHED and task_status_data["status_data"]:
position = task_status_data["status_data"].get("position", {})
angle = task_status_data["status_data"].get("angle", 0)
point_name = task_status_data["status_data"].get("point_name", "")
print(f"[导航详情], 点位名称: {point_name} 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad")
success=True
# nav_result=f"[导航详情], 点位名称: {point_name} 位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad"
break
elif (task_status_data["status_code"] == StatusCode.PLANNING_FAILED or
task_status_data["status_code"] == StatusCode.OBSTACLE_DETECTED or
task_status_data["status_code"] == StatusCode.INVALID_PATH or
task_status_data["status_code"] == StatusCode.TARGET_UNREACHABLE or
task_status_data["status_code"] == StatusCode.OFF_TRACK or
task_status_data["status_code"] == StatusCode.SPEED_ABNORMAL or
task_status_data["status_code"] == StatusCode.OBSTACLE_TIMEOUT or
task_status_data["status_code"] == StatusCode.SPEED_FEEDBACK_TIMEOUT or
task_status_data["status_code"] == StatusCode.SPEED_CMD_NOT_EXECUTED or
task_status_data["status_code"] == StatusCode.PATH_PLANNING_FAILED
):
success=False
break
else:
break
nav_client.stop_websocket(connection_id)
if success:
return {
"status": "success",
"message": f"机器人成功导航到地图 '{map_name}' 上的导航点 '{position_name}'",
"data_description": f"机器人已成功到达目标点 '{position_name}',位置: ({position.get('x', 0):.2f}, {position.get('y', 0):.2f}), 角度: {angle:.2f}rad",
"data": task_status_data
}
else:
return {
"status": "error",
"message": f"机器人导航失败: {task_status_data.get('description', '未知错误')}",
"data_description": f"机器人导航到点 '{position_name}' 失败,错误状态: {task_status_data.get('description', '未知错误')}",
"data": task_status_data
}
else:
return {
"status": "error",
"message": f"发送导航指令失败: {result.get('msg', '未知错误')}",
"data_description": f"发送导航指令到点 '{position_name}' 失败,错误信息: {result.get('msg', '未知错误')}",
"data": result
}
@nav_mcp.tool(
name="get_map_position",
description="获取地图上已有的点位信息,包括点位名称,id等。"
)
def get_map_position(
map_name: Annotated[str, Field(description="地图名称")],
) -> dict:
result = nav_client.get_graph_paths(map_name)
if result.get("successed", False):
return {
"status": "success",
"message": f"获取地图点位成功",
"data_description": f"成功获取地图 '{map_name}' 上的所有点位信息",
"data": result
}
else:
return {
"status": "error",
"message": f"获取地图点位失败",
"data_description": f"获取地图 '{map_name}' 上的点位信息失败",
"data": result
}
@nav_mcp.tool(
name="stop_current_task",
description="停止机器人当前的导航任务。该操作会切断运动指令并保持当前位置锁定。注意:此操作不会断电,仅停止运动控制。"
)
def stop_current_task() -> dict:
result = nav_client.stop_current_task()
if result.get("successed", False):
return {
"status": "success",
"message": "机器人已急停",
"data_description": "所有关节电机已停止输出,当前位置保持锁定",
"data": {
"stopped_at": time.time(),
"emergency_stop_triggered": True
}
}
else:
return {
"status": "error",
"message": f"停止失败: {result.get('msg', '未知错误')}",
"data_description": f"停止机器人当前任务失败,错误信息: {result.get('msg', '未知错误')}",
"data": result
}
@nav_mcp.tool(
name="detect_target",
description="检测机器人深度相机画面的目标,返回目标的坐标。机器人必须处于“就绪”状态方可调用。"
)
def detect_target(prompt: Annotated[str, Field(description="检测目标描述")]) -> dict:
result = None
if result and result.get("successed", False):
return {
"status": "success",
"message": "目标检测成功",
"data_description": f"成功检测到目标 '{prompt}',位置信息: {result.get('data', {})}",
"data": result
}
else:
return {
"status": "error",
"message": "目标检测失败",
"data_description": f"未能检测到目标 '{prompt}'",
"data": result if result else {}
}
def main():
"""Run the MCP server"""
import os
import argparse
parser = argparse.ArgumentParser(description="MCP Server")
parser.add_argument("--port", type=int, default=int(8002), help="Port to run the MCP server on")
parser.add_argument("--nav_server_ip", type=str, default="192.168.1.108", help="IP address of the navigation server")
parser.add_argument("--nav_server_port", type=int, default=int(8086), help="Port to run the navigation server on")
parser.add_argument("--nav_server_ws_port", type=int, default=int(8089), help="WebSocket port to run the navigation server on")
args = parser.parse_args()
global nav_client
nav_client = RobotNavClient(host=args.nav_server_ip, http_port=args.nav_server_port, ws_port=args.nav_server_ws_port)
nav_mcp.run(transport="streamable-http", host="0.0.0.0", port=args.port, path="/mcp/")
if __name__ == "__main__":
main()