Skip to main content
Glama
827346462

Robot Navigation MCP Server

by 827346462
robot_nav_client.py23 kB
import requests import json import threading import time from typing import Dict, List, Optional, Callable, Any from dataclasses import dataclass try: import websocket except ImportError: print("警告: 未安装websocket-client库,WebSocket功能将不可用") print("请运行: pip install websocket-client") @dataclass class Position: """位置数据类""" x: float y: float z: float = 0.0 @dataclass class Orientation: """方向数据类""" x: float = 0.0 y: float = 0.0 z: float = 0.0 w: float = 1.0 @dataclass class Pose: """姿态数据类""" position: Position angle: float orientation: Optional[Orientation] = None class RobotNavClient: """Sevnce机器人客户端""" def __init__(self, host: str = "localhost", http_port: int = 80, ws_port: int = 8089): """ 初始化客户端 Args: host: 服务器地址 http_port: HTTP端口 ws_port: WebSocket端口 """ self.host = host self.http_port = http_port self.ws_port = ws_port self.base_url = f"http://{host}:{http_port}" self.ws_base_url = f"ws://{host}:{ws_port}" # WebSocket连接管理 self.ws_connections = {} self.ws_callbacks = {} self.ws_threads = {} def close(self): """关闭所有WebSocket连接""" for key in list(self.ws_connections.keys()): self.stop_websocket(key) def _make_request(self, method: str, endpoint: str, params: Dict = None, data: Dict = None) -> Dict: """ 发送HTTP请求 Args: method: 请求方法 (GET, POST) endpoint: API端点 params: URL参数 data: 请求体数据 Returns: 响应数据 """ url = f"{self.base_url}{endpoint}" try: if method.upper() == "GET": response = requests.get(url, params=params) elif method.upper() == "POST": response = requests.post(url, json=data, params=params) else: raise ValueError(f"不支持的请求方法: {method}") response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return {"successed": False, "errorCode": "REQUEST_ERROR", "msg": str(e)} # ==================== 数据查询API ==================== def get_laser_data(self) -> Dict: """ 获取激光数据 Returns: 激光数据响应 """ return self._make_request("GET", "/sevnce/real_time_data/laser_phit") def get_cmd_vel(self) -> Dict: """ 获取实时角速度和线速度数据 线速度看x,角速度看z Returns: 速度数据响应 """ return self._make_request("GET", "/sevnce/real_time_data/cmd_vel") def get_positions(self, map_name: str, position_type: Optional[int] = None) -> Dict: """ 获取地图点数据 Args: map_name: 地图名称 position_type: 点类型,不传则返回所有点 Returns: 地图点数据响应 """ params = {"map_name": map_name} if position_type is not None: params["type"] = position_type return self._make_request("GET", "/sevnce/data/positions", params=params) # ==================== 命令API ==================== def add_position(self, map_name: str, position_name: str, angle: float, grid_x: int, grid_y: int, position_type: int = 0) -> Dict: """ 添加点 Args: map_name: 地图名称 position_name: 点名称 angle: 角度 grid_x: 网格X坐标 grid_y: 网格Y坐标 position_type: 点类型,默认0为标记点 Returns: 添加结果响应 """ data = { "angle": angle, "gridX": grid_x, "gridY": grid_y, "mapName": map_name, "name": position_name, "type": position_type } return self._make_request("POST", "/sevnce/cmd/position/add_position", data=data) def delete_position(self, map_name: str, position_name: str) -> Dict: """ 删除点 Args: map_name: 地图名称 position_name: 点名称 Returns: 删除结果响应 """ params = { "map_name": map_name, "position_name": position_name } return self._make_request("GET", "/sevnce/cmd/delete_position", params=params) # ==================== 导航相关API ==================== def navigate_to_position(self, map_name: str, position_name: str) -> Dict: """ 导航到导航点 Args: map_name: 地图名称 position_name: 导航点名称 Returns: 导航结果响应 """ params = { "map_name": map_name, "position_name": position_name } return self._make_request("GET", "/sevnce/cmd/position/navigate", params=params) def navigate_to_coordinates(self, x: float, y: float, angle: float) -> Dict: """ 导航到任意指定坐标点 Args: x: X坐标 y: Y坐标 angle: 角度 Returns: 导航结果响应 """ data = { "destination": { "angle": angle, "position": { "x": x, "y": y, "z": 0.0 } } } return self._make_request("POST", "/sevnce/cmd/navigate", data=data) def pause_navigation(self) -> Dict: """ 暂停导航 Returns: 暂停结果响应 """ return self._make_request("GET", "/sevnce/cmd/pause_navigate") def resume_navigation(self) -> Dict: """ 恢复导航 Returns: 恢复结果响应 """ return self._make_request("GET", "/sevnce/cmd/resume_navigate") def cancel_navigation(self) -> Dict: """ 取消导航 Returns: 取消结果响应 """ return self._make_request("GET", "/sevnce/cmd/cancel_navigate") def get_navigation_path(self) -> Dict: """ 获取导航行驶轨迹 Returns: 导航路径数据 """ return self._make_request("GET", "/sevnce/cmd/navigation_path") def replan_navigation(self) -> Dict: """ 重新规划路径 Returns: 重新规划结果响应 """ return self._make_request("GET", "/sevnce/cmd/replan") def get_work_status(self) -> Dict: """ 获取当前任务状态 Returns: 任务状态数据 """ return self._make_request("GET", "/sevnce/real_time_data/work_status") def get_path_between_points(self, start_x: float, start_y: float, start_angle: float, end_x: float, end_y: float, end_angle: float, map_name: str, start_floor: int = 1, end_floor: int = 1) -> Dict: """ 获取任意两点间的导航路线 Args: start_x: 起点X坐标 start_y: 起点Y坐标 start_angle: 起点角度 end_x: 终点X坐标 end_y: 终点Y坐标 end_angle: 终点角度 map_name: 地图名称 start_floor: 起点楼层,默认1 end_floor: 终点楼层,默认1 Returns: 导航路径数据 """ data = { "start": { "angle": start_angle, "worldPosition": { "x": start_x, "y": start_y, "z": 0.0, "floor": start_floor } }, "end": { "angle": end_angle, "worldPosition": { "x": end_x, "y": end_y, "z": 0.0, "floor": end_floor } }, "mapName": map_name } return self._make_request("POST", "/sevnce/data/navigation_path", data=data) # ==================== 任务队列相关API ==================== def get_task_queues(self, map_name: str) -> Dict: """ 获取任务队列列表 Args: map_name: 地图名称 Returns: 任务队列列表响应 """ params = {"map_name": map_name} return self._make_request("GET", "/sevnce/data/task_queues", params=params) def start_task_queue(self, name: str, loop: bool, loop_count: int, map_name: str, tasks: List[Dict]) -> Dict: """ 开始执行任务队列 Args: name: 任务队列名称 loop: 是否循环执行 loop_count: 循环次数 map_name: 地图名称 tasks: 任务列表 Returns: 执行结果响应 """ data = { "name": name, "loop": loop, "loop_count": loop_count, "map_name": map_name, "tasks": tasks } return self._make_request("POST", "/sevnce/cmd/start_task_queue", data=data) def stop_task_queue(self) -> Dict: """ 停止所有队列任务 Returns: 停止结果响应 """ return self._make_request("GET", "/sevnce/cmd/stop_task_queue") def stop_current_task(self) -> Dict: """ 停止当前任务 Returns: 停止结果响应 """ return self._make_request("GET", "/sevnce/cmd/stop_current_task") def is_task_queue_finished(self) -> Dict: """ 检查队列任务是否完成 Returns: 任务完成状态响应 """ return self._make_request("GET", "/sevnce/cmd/is_task_queue_finished") # ==================== WebSocket连接管理 ==================== def _on_ws_message(self, ws, message, callback_key): """WebSocket消息处理""" try: data = json.loads(message) if callback_key in self.ws_callbacks: self.ws_callbacks[callback_key](data) except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") except Exception as e: print(f"消息处理错误: {e}") def _on_ws_error(self, ws, error): """WebSocket错误处理""" print(f"WebSocket错误: {error}") def _on_ws_close(self, ws, close_status_code, close_msg): """WebSocket关闭处理""" print(f"WebSocket连接关闭: {close_status_code} - {close_msg}") def _on_ws_open(self, ws): """WebSocket连接打开处理""" print("WebSocket连接已建立") def _start_websocket(self, endpoint: str, callback: Callable[[Dict], None]) -> str: """ 启动WebSocket连接 Args: endpoint: WebSocket端点 callback: 消息回调函数 Returns: 连接键值 """ ws_url = f"{self.ws_base_url}{endpoint}" callback_key = endpoint self.ws_callbacks[callback_key] = callback def run_websocket(): ws = websocket.WebSocketApp( ws_url, on_message=lambda ws, msg: self._on_ws_message(ws, msg, callback_key), on_error=self._on_ws_error, on_close=self._on_ws_close, on_open=self._on_ws_open ) self.ws_connections[callback_key] = ws ws.run_forever() thread = threading.Thread(target=run_websocket, daemon=True) self.ws_threads[callback_key] = thread thread.start() return callback_key def stop_websocket(self, connection_key: str): """ 停止WebSocket连接 Args: connection_key: 连接键值 """ if connection_key in self.ws_connections: self.ws_connections[connection_key].close() del self.ws_connections[connection_key] if connection_key in self.ws_callbacks: del self.ws_callbacks[connection_key] if connection_key in self.ws_threads: del self.ws_threads[connection_key] # ==================== WebSocket数据订阅 ==================== def subscribe_device_status(self, callback: Callable[[Dict], None]) -> str: """ 订阅设备状态数据 Args: callback: 状态数据回调函数 Returns: 连接键值 """ return self._start_websocket("/sevnce/data/device_status", callback) def subscribe_navigation_status(self, callback: Callable[[Dict], None]) -> str: """ 订阅导航状态数据 Args: callback: 导航状态回调函数 Returns: 连接键值 """ return self._start_websocket("/sevnce/data/navigation_status", callback) def subscribe_task_status(self, callback: Callable[[Dict], None]) -> str: """ 订阅任务状态数据 Args: callback: 任务状态回调函数 Returns: 连接键值 """ return self._start_websocket("/sevnce/data/status", callback) def subscribe_laser_data(self, callback: Callable[[Dict], None]) -> str: """ 订阅激光数据 Args: callback: 激光数据回调函数 Returns: 连接键值 """ return self._start_websocket("/sevnce/real_time_data/laser_data", callback) def subscribe_robot_pose(self, callback: Callable[[Dict], None]) -> str: """ 订阅机器人实时位置 Args: callback: 位置数据回调函数 Returns: 连接键值 """ return self._start_websocket("/sevnce/real_time_data/pose", callback) # ==================== 便捷方法 ==================== def get_robot_velocity(self) -> Optional[Dict[str, float]]: """ 获取机器人当前速度 Returns: 包含线速度和角速度的字典,失败返回None """ response = self.get_cmd_vel() if response.get("successed"): data = response.get("data", {}) return { "linear_x": data.get("linear", {}).get("x", 0), "angular_z": data.get("angular", {}).get("z", 0) } return None def get_map_positions_by_type(self, map_name: str, position_type: int) -> List[Dict]: """ 根据类型获取地图点 Args: map_name: 地图名称 position_type: 点类型 Returns: 点数据列表 """ response = self.get_positions(map_name, position_type) if response.get("successed"): return response.get("data", []) return [] def close_all_websockets(self): """关闭所有WebSocket连接""" for key in list(self.ws_connections.keys()): self.stop_websocket(key) # ==================== 新增便捷方法 ==================== def get_battery_level(self) -> Optional[float]: """ 获取电池电量百分比 Returns: 电池电量百分比,失败返回None """ def on_message(data): self._battery_level = data.get("data", {}).get("battery", 0) # 临时订阅以获取数据 key = self.subscribe_device_status(on_message) time.sleep(0.1) # 等待数据 self.stop_websocket(key) return getattr(self, '_battery_level', None) def get_robot_pose(self) -> Optional[Dict]: """ 获取机器人当前姿态 Returns: 包含位置和角度信息的字典,失败返回None """ def on_message(data): self._robot_pose = data # print(data) # 临时订阅以获取数据 key = self.subscribe_robot_pose(on_message) time.sleep(1) # 等待数据 self.stop_websocket(key) return getattr(self, '_robot_pose', None) def get_device_status(self) -> Optional[Dict]: """ 获取设备状态信息 Returns: 设备状态信息,失败返回None """ def on_message(data): self._device_status = data.get("data", {}) # 临时订阅以获取数据 key = self.subscribe_device_status(on_message) time.sleep(0.1) # 等待数据 self.stop_websocket(key) return getattr(self, '_device_status', None) # ==================== 状态码常量 ==================== class StatusCode: """任务状态码常量""" IDLE = 0 # 空闲中 PAUSED = 100 # 暂停状态 TARGET_RECEIVED = 400 # 收到目标点 PATH_PLANNING_START = 401 # 开始路径规划 PLANNING_FAILED = 402 # 规划失败 OBSTACLE_DETECTED = 403 # 前方有障碍物停障中 PLANNING_SUCCESS = 405 # 规划成功 PATH_FOLLOWING_START = 406 # 开始跟线 TARGET_REACHED = 407 # 到达目标点 NAVIGATION_FAILED = 408 # 导航失败 INVALID_PATH = 420 # 路径无效 PATH_FOLLOWING = 421 # 跟线中 TARGET_UNREACHABLE = 430 # 目标点不可达 OFF_TRACK = 431 # 偏离路网 SPEED_ABNORMAL = 432 # 速度异常 OBSTACLE_TIMEOUT = 434 # 绕障超时 SPEED_CMD_NOT_EXECUTED = 435 # 速度指令未执行 FINE_TUNING_FAILED = 436 # 精调失败 SPEED_FEEDBACK_TIMEOUT = 505 # 速度反馈超时 PATH_PLANNING_FAILED = 506 # 路径规划失败 LOCALIZATION_TIMEOUT = 511 # 定位数据超时 # 新增任务队列相关状态码 TASK_QUEUE_IDLE = 0 # 任务队列空闲 TASK_QUEUE_RUNNING = 1 # 任务队列执行中 TASK_QUEUE_PAUSED = 2 # 任务队列暂停中 TASK_QUEUE_COMPLETED = 3 # 任务队列完成 status_descriptions = { StatusCode.IDLE: "空闲中", StatusCode.PAUSED: "暂停状态", StatusCode.TARGET_RECEIVED: "收到目标点", StatusCode.PATH_PLANNING_START: "开始路径规划", StatusCode.PLANNING_FAILED: "规划失败", StatusCode.OBSTACLE_DETECTED: "前方有障碍物停障中", StatusCode.PLANNING_SUCCESS: "规划成功", StatusCode.PATH_FOLLOWING_START: "开始跟线", StatusCode.TARGET_REACHED: "到达目标点", StatusCode.NAVIGATION_FAILED: "导航失败", StatusCode.INVALID_PATH: "路径无效", StatusCode.PATH_FOLLOWING: "跟线中", StatusCode.TARGET_UNREACHABLE: "目标点不可达", StatusCode.OFF_TRACK: "偏离路网", StatusCode.SPEED_ABNORMAL: "速度异常", StatusCode.OBSTACLE_TIMEOUT: "绕障超时", StatusCode.SPEED_CMD_NOT_EXECUTED: "速度指令未执行", StatusCode.FINE_TUNING_FAILED: "精调失败", StatusCode.SPEED_FEEDBACK_TIMEOUT: "速度反馈超时", StatusCode.PATH_PLANNING_FAILED: "路径规划失败", StatusCode.LOCALIZATION_TIMEOUT: "定位数据超时", # 新增任务队列相关状态描述 StatusCode.TASK_QUEUE_IDLE: "任务队列空闲", StatusCode.TASK_QUEUE_RUNNING: "任务队列执行中", StatusCode.TASK_QUEUE_PAUSED: "任务队列暂停中", StatusCode.TASK_QUEUE_COMPLETED: "任务队列完成", } def get_status_description(code: int, default: str = "未知状态") -> str: """根据状态码获取描述,不存在则返回默认值""" return status_descriptions.get(code, default) if __name__ == "__main__": # 使用示例 # 创建客户端实例 client = RobotNavClient(host="192.168.1.108", http_port=8086, ws_port=8089) # 替换为实际的机器人IP # # 获取激光数据 # laser_data = client.get_laser_data() # print("激光数据:", laser_data) # 获取速度数据 velocity = client.get_robot_velocity() print("当前速度:", velocity) # 获取地图点数据 positions = client.get_positions("agent2") print("地图点数据:", positions) # # 订阅设备状态 # def on_device_status(data): # battery = data.get("data", {}).get("battery", 0) # print(f"电池电量: {battery}%") # device_status_key = client.subscribe_device_status(on_device_status) # # 订阅任务状态 # def on_task_status(data): # status_code = data.get("statusCode") # status_msg = data.get("statusMsg") # print(f"任务状态: {status_code} - {status_msg}") # if status_code == StatusCode.TARGET_REACHED: # print("机器人已到达目标点!") # elif status_code == StatusCode.NAVIGATION_FAILED: # print("导航失败!") # task_status_key = client.subscribe_task_status(on_task_status) # # 订阅机器人位置 # def on_robot_pose(data): # position = data.get("position", {}) # angle = data.get("angle", 0) # print(f"机器人位置: x={position.get('x')}, y={position.get('y')}, angle={angle}") # pose_key = client.subscribe_robot_pose(on_robot_pose) # try: # # 保持程序运行以接收WebSocket数据 # print("正在监听数据,按Ctrl+C退出...") # while True: # time.sleep(1) # except KeyboardInterrupt: # print("正在关闭连接...") # client.close_all_websockets() # print("程序已退出")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/827346462/mcp-robot-navigation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server