import requests
import json
import threading
import time
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass
try:
import websocket
except ImportError:
print("警告: 未安装websocket-client库,WebSocket功能将不可用")
print("请运行: pip install websocket-client")
@dataclass
class Position:
"""位置数据类"""
x: float
y: float
z: float = 0.0
@dataclass
class Orientation:
"""方向数据类"""
x: float = 0.0
y: float = 0.0
z: float = 0.0
w: float = 1.0
@dataclass
class Pose:
"""姿态数据类"""
position: Position
angle: float
orientation: Optional[Orientation] = None
class RobotNavClient:
"""Sevnce机器人客户端"""
def __init__(self, host: str = "localhost", http_port: int = 80, ws_port: int = 8089):
"""
初始化客户端
Args:
host: 服务器地址
http_port: HTTP端口
ws_port: WebSocket端口
"""
self.host = host
self.http_port = http_port
self.ws_port = ws_port
self.base_url = f"http://{host}:{http_port}"
self.ws_base_url = f"ws://{host}:{ws_port}"
# WebSocket连接管理
self.ws_connections = {}
self.ws_callbacks = {}
self.ws_threads = {}
def close(self):
"""关闭所有WebSocket连接"""
for key in list(self.ws_connections.keys()):
self.stop_websocket(key)
def _make_request(self, method: str, endpoint: str, params: Dict = None, data: Dict = None) -> Dict:
"""
发送HTTP请求
Args:
method: 请求方法 (GET, POST)
endpoint: API端点
params: URL参数
data: 请求体数据
Returns:
响应数据
"""
url = f"{self.base_url}{endpoint}"
try:
if method.upper() == "GET":
response = requests.get(url, params=params)
elif method.upper() == "POST":
response = requests.post(url, json=data, params=params)
else:
raise ValueError(f"不支持的请求方法: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return {"successed": False, "errorCode": "REQUEST_ERROR", "msg": str(e)}
# ==================== 数据查询API ====================
def get_laser_data(self) -> Dict:
"""
获取激光数据
Returns:
激光数据响应
"""
return self._make_request("GET", "/sevnce/real_time_data/laser_phit")
def get_cmd_vel(self) -> Dict:
"""
获取实时角速度和线速度数据
线速度看x,角速度看z
Returns:
速度数据响应
"""
return self._make_request("GET", "/sevnce/real_time_data/cmd_vel")
def get_positions(self, map_name: str, position_type: Optional[int] = None) -> Dict:
"""
获取地图点数据
Args:
map_name: 地图名称
position_type: 点类型,不传则返回所有点
Returns:
地图点数据响应
"""
params = {"map_name": map_name}
if position_type is not None:
params["type"] = position_type
return self._make_request("GET", "/sevnce/data/positions", params=params)
# ==================== 命令API ====================
def add_position(self, map_name: str, position_name: str, angle: float,
grid_x: int, grid_y: int, position_type: int = 0) -> Dict:
"""
添加点
Args:
map_name: 地图名称
position_name: 点名称
angle: 角度
grid_x: 网格X坐标
grid_y: 网格Y坐标
position_type: 点类型,默认0为标记点
Returns:
添加结果响应
"""
data = {
"angle": angle,
"gridX": grid_x,
"gridY": grid_y,
"mapName": map_name,
"name": position_name,
"type": position_type
}
return self._make_request("POST", "/sevnce/cmd/position/add_position", data=data)
def delete_position(self, map_name: str, position_name: str) -> Dict:
"""
删除点
Args:
map_name: 地图名称
position_name: 点名称
Returns:
删除结果响应
"""
params = {
"map_name": map_name,
"position_name": position_name
}
return self._make_request("GET", "/sevnce/cmd/delete_position", params=params)
# ==================== 导航相关API ====================
def navigate_to_position(self, map_name: str, position_name: str) -> Dict:
"""
导航到导航点
Args:
map_name: 地图名称
position_name: 导航点名称
Returns:
导航结果响应
"""
params = {
"map_name": map_name,
"position_name": position_name
}
return self._make_request("GET", "/sevnce/cmd/position/navigate", params=params)
def navigate_to_coordinates(self, x: float, y: float, angle: float) -> Dict:
"""
导航到任意指定坐标点
Args:
x: X坐标
y: Y坐标
angle: 角度
Returns:
导航结果响应
"""
data = {
"destination": {
"angle": angle,
"position": {
"x": x,
"y": y,
"z": 0.0
}
}
}
return self._make_request("POST", "/sevnce/cmd/navigate", data=data)
def pause_navigation(self) -> Dict:
"""
暂停导航
Returns:
暂停结果响应
"""
return self._make_request("GET", "/sevnce/cmd/pause_navigate")
def resume_navigation(self) -> Dict:
"""
恢复导航
Returns:
恢复结果响应
"""
return self._make_request("GET", "/sevnce/cmd/resume_navigate")
def cancel_navigation(self) -> Dict:
"""
取消导航
Returns:
取消结果响应
"""
return self._make_request("GET", "/sevnce/cmd/cancel_navigate")
def get_navigation_path(self) -> Dict:
"""
获取导航行驶轨迹
Returns:
导航路径数据
"""
return self._make_request("GET", "/sevnce/cmd/navigation_path")
def replan_navigation(self) -> Dict:
"""
重新规划路径
Returns:
重新规划结果响应
"""
return self._make_request("GET", "/sevnce/cmd/replan")
def get_work_status(self) -> Dict:
"""
获取当前任务状态
Returns:
任务状态数据
"""
return self._make_request("GET", "/sevnce/real_time_data/work_status")
def get_path_between_points(self, start_x: float, start_y: float, start_angle: float,
end_x: float, end_y: float, end_angle: float,
map_name: str, start_floor: int = 1, end_floor: int = 1) -> Dict:
"""
获取任意两点间的导航路线
Args:
start_x: 起点X坐标
start_y: 起点Y坐标
start_angle: 起点角度
end_x: 终点X坐标
end_y: 终点Y坐标
end_angle: 终点角度
map_name: 地图名称
start_floor: 起点楼层,默认1
end_floor: 终点楼层,默认1
Returns:
导航路径数据
"""
data = {
"start": {
"angle": start_angle,
"worldPosition": {
"x": start_x,
"y": start_y,
"z": 0.0,
"floor": start_floor
}
},
"end": {
"angle": end_angle,
"worldPosition": {
"x": end_x,
"y": end_y,
"z": 0.0,
"floor": end_floor
}
},
"mapName": map_name
}
return self._make_request("POST", "/sevnce/data/navigation_path", data=data)
# ==================== 任务队列相关API ====================
def get_task_queues(self, map_name: str) -> Dict:
"""
获取任务队列列表
Args:
map_name: 地图名称
Returns:
任务队列列表响应
"""
params = {"map_name": map_name}
return self._make_request("GET", "/sevnce/data/task_queues", params=params)
def start_task_queue(self, name: str, loop: bool, loop_count: int, map_name: str,
tasks: List[Dict]) -> Dict:
"""
开始执行任务队列
Args:
name: 任务队列名称
loop: 是否循环执行
loop_count: 循环次数
map_name: 地图名称
tasks: 任务列表
Returns:
执行结果响应
"""
data = {
"name": name,
"loop": loop,
"loop_count": loop_count,
"map_name": map_name,
"tasks": tasks
}
return self._make_request("POST", "/sevnce/cmd/start_task_queue", data=data)
def stop_task_queue(self) -> Dict:
"""
停止所有队列任务
Returns:
停止结果响应
"""
return self._make_request("GET", "/sevnce/cmd/stop_task_queue")
def stop_current_task(self) -> Dict:
"""
停止当前任务
Returns:
停止结果响应
"""
return self._make_request("GET", "/sevnce/cmd/stop_current_task")
def is_task_queue_finished(self) -> Dict:
"""
检查队列任务是否完成
Returns:
任务完成状态响应
"""
return self._make_request("GET", "/sevnce/cmd/is_task_queue_finished")
# ==================== WebSocket连接管理 ====================
def _on_ws_message(self, ws, message, callback_key):
"""WebSocket消息处理"""
try:
data = json.loads(message)
if callback_key in self.ws_callbacks:
self.ws_callbacks[callback_key](data)
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
except Exception as e:
print(f"消息处理错误: {e}")
def _on_ws_error(self, ws, error):
"""WebSocket错误处理"""
print(f"WebSocket错误: {error}")
def _on_ws_close(self, ws, close_status_code, close_msg):
"""WebSocket关闭处理"""
print(f"WebSocket连接关闭: {close_status_code} - {close_msg}")
def _on_ws_open(self, ws):
"""WebSocket连接打开处理"""
print("WebSocket连接已建立")
def _start_websocket(self, endpoint: str, callback: Callable[[Dict], None]) -> str:
"""
启动WebSocket连接
Args:
endpoint: WebSocket端点
callback: 消息回调函数
Returns:
连接键值
"""
ws_url = f"{self.ws_base_url}{endpoint}"
callback_key = endpoint
self.ws_callbacks[callback_key] = callback
def run_websocket():
ws = websocket.WebSocketApp(
ws_url,
on_message=lambda ws, msg: self._on_ws_message(ws, msg, callback_key),
on_error=self._on_ws_error,
on_close=self._on_ws_close,
on_open=self._on_ws_open
)
self.ws_connections[callback_key] = ws
ws.run_forever()
thread = threading.Thread(target=run_websocket, daemon=True)
self.ws_threads[callback_key] = thread
thread.start()
return callback_key
def stop_websocket(self, connection_key: str):
"""
停止WebSocket连接
Args:
connection_key: 连接键值
"""
if connection_key in self.ws_connections:
self.ws_connections[connection_key].close()
del self.ws_connections[connection_key]
if connection_key in self.ws_callbacks:
del self.ws_callbacks[connection_key]
if connection_key in self.ws_threads:
del self.ws_threads[connection_key]
# ==================== WebSocket数据订阅 ====================
def subscribe_device_status(self, callback: Callable[[Dict], None]) -> str:
"""
订阅设备状态数据
Args:
callback: 状态数据回调函数
Returns:
连接键值
"""
return self._start_websocket("/sevnce/data/device_status", callback)
def subscribe_navigation_status(self, callback: Callable[[Dict], None]) -> str:
"""
订阅导航状态数据
Args:
callback: 导航状态回调函数
Returns:
连接键值
"""
return self._start_websocket("/sevnce/data/navigation_status", callback)
def subscribe_task_status(self, callback: Callable[[Dict], None]) -> str:
"""
订阅任务状态数据
Args:
callback: 任务状态回调函数
Returns:
连接键值
"""
return self._start_websocket("/sevnce/data/status", callback)
def subscribe_laser_data(self, callback: Callable[[Dict], None]) -> str:
"""
订阅激光数据
Args:
callback: 激光数据回调函数
Returns:
连接键值
"""
return self._start_websocket("/sevnce/real_time_data/laser_data", callback)
def subscribe_robot_pose(self, callback: Callable[[Dict], None]) -> str:
"""
订阅机器人实时位置
Args:
callback: 位置数据回调函数
Returns:
连接键值
"""
return self._start_websocket("/sevnce/real_time_data/pose", callback)
# ==================== 便捷方法 ====================
def get_robot_velocity(self) -> Optional[Dict[str, float]]:
"""
获取机器人当前速度
Returns:
包含线速度和角速度的字典,失败返回None
"""
response = self.get_cmd_vel()
if response.get("successed"):
data = response.get("data", {})
return {
"linear_x": data.get("linear", {}).get("x", 0),
"angular_z": data.get("angular", {}).get("z", 0)
}
return None
def get_map_positions_by_type(self, map_name: str, position_type: int) -> List[Dict]:
"""
根据类型获取地图点
Args:
map_name: 地图名称
position_type: 点类型
Returns:
点数据列表
"""
response = self.get_positions(map_name, position_type)
if response.get("successed"):
return response.get("data", [])
return []
def close_all_websockets(self):
"""关闭所有WebSocket连接"""
for key in list(self.ws_connections.keys()):
self.stop_websocket(key)
# ==================== 新增便捷方法 ====================
def get_battery_level(self) -> Optional[float]:
"""
获取电池电量百分比
Returns:
电池电量百分比,失败返回None
"""
def on_message(data):
self._battery_level = data.get("data", {}).get("battery", 0)
# 临时订阅以获取数据
key = self.subscribe_device_status(on_message)
time.sleep(0.1) # 等待数据
self.stop_websocket(key)
return getattr(self, '_battery_level', None)
def get_robot_pose(self) -> Optional[Dict]:
"""
获取机器人当前姿态
Returns:
包含位置和角度信息的字典,失败返回None
"""
def on_message(data):
self._robot_pose = data
# print(data)
# 临时订阅以获取数据
key = self.subscribe_robot_pose(on_message)
time.sleep(1) # 等待数据
self.stop_websocket(key)
return getattr(self, '_robot_pose', None)
def get_device_status(self) -> Optional[Dict]:
"""
获取设备状态信息
Returns:
设备状态信息,失败返回None
"""
def on_message(data):
self._device_status = data.get("data", {})
# 临时订阅以获取数据
key = self.subscribe_device_status(on_message)
time.sleep(0.1) # 等待数据
self.stop_websocket(key)
return getattr(self, '_device_status', None)
# ==================== 状态码常量 ====================
class StatusCode:
"""任务状态码常量"""
IDLE = 0 # 空闲中
PAUSED = 100 # 暂停状态
TARGET_RECEIVED = 400 # 收到目标点
PATH_PLANNING_START = 401 # 开始路径规划
PLANNING_FAILED = 402 # 规划失败
OBSTACLE_DETECTED = 403 # 前方有障碍物停障中
PLANNING_SUCCESS = 405 # 规划成功
PATH_FOLLOWING_START = 406 # 开始跟线
TARGET_REACHED = 407 # 到达目标点
NAVIGATION_FAILED = 408 # 导航失败
INVALID_PATH = 420 # 路径无效
PATH_FOLLOWING = 421 # 跟线中
TARGET_UNREACHABLE = 430 # 目标点不可达
OFF_TRACK = 431 # 偏离路网
SPEED_ABNORMAL = 432 # 速度异常
OBSTACLE_TIMEOUT = 434 # 绕障超时
SPEED_CMD_NOT_EXECUTED = 435 # 速度指令未执行
FINE_TUNING_FAILED = 436 # 精调失败
SPEED_FEEDBACK_TIMEOUT = 505 # 速度反馈超时
PATH_PLANNING_FAILED = 506 # 路径规划失败
LOCALIZATION_TIMEOUT = 511 # 定位数据超时
# 新增任务队列相关状态码
TASK_QUEUE_IDLE = 0 # 任务队列空闲
TASK_QUEUE_RUNNING = 1 # 任务队列执行中
TASK_QUEUE_PAUSED = 2 # 任务队列暂停中
TASK_QUEUE_COMPLETED = 3 # 任务队列完成
status_descriptions = {
StatusCode.IDLE: "空闲中",
StatusCode.PAUSED: "暂停状态",
StatusCode.TARGET_RECEIVED: "收到目标点",
StatusCode.PATH_PLANNING_START: "开始路径规划",
StatusCode.PLANNING_FAILED: "规划失败",
StatusCode.OBSTACLE_DETECTED: "前方有障碍物停障中",
StatusCode.PLANNING_SUCCESS: "规划成功",
StatusCode.PATH_FOLLOWING_START: "开始跟线",
StatusCode.TARGET_REACHED: "到达目标点",
StatusCode.NAVIGATION_FAILED: "导航失败",
StatusCode.INVALID_PATH: "路径无效",
StatusCode.PATH_FOLLOWING: "跟线中",
StatusCode.TARGET_UNREACHABLE: "目标点不可达",
StatusCode.OFF_TRACK: "偏离路网",
StatusCode.SPEED_ABNORMAL: "速度异常",
StatusCode.OBSTACLE_TIMEOUT: "绕障超时",
StatusCode.SPEED_CMD_NOT_EXECUTED: "速度指令未执行",
StatusCode.FINE_TUNING_FAILED: "精调失败",
StatusCode.SPEED_FEEDBACK_TIMEOUT: "速度反馈超时",
StatusCode.PATH_PLANNING_FAILED: "路径规划失败",
StatusCode.LOCALIZATION_TIMEOUT: "定位数据超时",
# 新增任务队列相关状态描述
StatusCode.TASK_QUEUE_IDLE: "任务队列空闲",
StatusCode.TASK_QUEUE_RUNNING: "任务队列执行中",
StatusCode.TASK_QUEUE_PAUSED: "任务队列暂停中",
StatusCode.TASK_QUEUE_COMPLETED: "任务队列完成",
}
def get_status_description(code: int, default: str = "未知状态") -> str:
"""根据状态码获取描述,不存在则返回默认值"""
return status_descriptions.get(code, default)
if __name__ == "__main__":
# 使用示例
# 创建客户端实例
client = RobotNavClient(host="192.168.1.108", http_port=8086, ws_port=8089) # 替换为实际的机器人IP
# # 获取激光数据
# laser_data = client.get_laser_data()
# print("激光数据:", laser_data)
# 获取速度数据
velocity = client.get_robot_velocity()
print("当前速度:", velocity)
# 获取地图点数据
positions = client.get_positions("agent2")
print("地图点数据:", positions)
# # 订阅设备状态
# def on_device_status(data):
# battery = data.get("data", {}).get("battery", 0)
# print(f"电池电量: {battery}%")
# device_status_key = client.subscribe_device_status(on_device_status)
# # 订阅任务状态
# def on_task_status(data):
# status_code = data.get("statusCode")
# status_msg = data.get("statusMsg")
# print(f"任务状态: {status_code} - {status_msg}")
# if status_code == StatusCode.TARGET_REACHED:
# print("机器人已到达目标点!")
# elif status_code == StatusCode.NAVIGATION_FAILED:
# print("导航失败!")
# task_status_key = client.subscribe_task_status(on_task_status)
# # 订阅机器人位置
# def on_robot_pose(data):
# position = data.get("position", {})
# angle = data.get("angle", 0)
# print(f"机器人位置: x={position.get('x')}, y={position.get('y')}, angle={angle}")
# pose_key = client.subscribe_robot_pose(on_robot_pose)
# try:
# # 保持程序运行以接收WebSocket数据
# print("正在监听数据,按Ctrl+C退出...")
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# print("正在关闭连接...")
# client.close_all_websockets()
# print("程序已退出")