batch_get_robot_statuses_smart
Retrieve status for multiple GS cleaning robots in a single request, automatically grouping by robot series and selecting the appropriate batch API for mixed M-line and S-line models.
Instructions
智能批量获取机器人状态。
自动根据机器人系列分组并选择正确的批量API。
支持混合查询M-line和S-line机器人。
Args:
serial_numbers: 机器人序列号列表
Returns:
批量状态查询结果字典
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| serial_numbers | Yes |
Implementation Reference
- src/gs_openapi/main.py:342-355 (handler)MCP tool handler registration for 'batch_get_robot_statuses_smart'. This decorated function is called by the MCP framework and delegates to the router implementation.@mcp.tool() async def batch_get_robot_statuses_smart(serial_numbers: list): """智能批量获取机器人状态。 自动根据机器人系列分组并选择正确的批量API。 支持混合查询M-line和S-line机器人。 Args: serial_numbers: 机器人序列号列表 Returns: 批量状态查询结果字典 """ return await router.batch_get_robot_statuses_smart(serial_numbers)
- Core handler logic implementation in RobotAPIRouter class. Performs smart grouping of robots by series (M-line/S-line), calls appropriate batch APIs (v1/v2), handles errors, and formats results.async def batch_get_robot_statuses_smart(self, serial_numbers: List[str]) -> Dict[str, Any]: """智能批量获取机器人状态。 自动根据机器人序列号前缀分组并选择正确的批量API。 """ if not serial_numbers: return {"results": []} # 按机器人系列分组(基于序列号前缀) m_line_robots = [] s_line_robots = [] unknown_robots = [] for serial_number in serial_numbers: detected_series = self._determine_robot_series_from_sn(serial_number) if self.is_s_line_robot(detected_series): s_line_robots.append(serial_number) elif self.is_m_line_robot(detected_series): m_line_robots.append(serial_number) else: # 未知前缀默认归类为M-line (使用V1 API) unknown_robots.append(serial_number) results = [] # 合并M-line机器人和未知机器人,都用V1 API v1_robots = m_line_robots + unknown_robots # 批量查询V1 API机器人状态 if v1_robots: try: v1_results = await self.mcp.batch_get_robot_statuses_v1(v1_robots) if isinstance(v1_results, dict) and "results" in v1_results: for result in v1_results["results"]: result["api_version"] = "V1 (M-line/Default)" result["detected_series"] = self._determine_robot_series_from_sn(result.get("serialNumber", "")) results.extend(v1_results["results"]) else: results.extend(v1_results if isinstance(v1_results, list) else [v1_results]) except Exception as e: for sn in v1_robots: results.append({ "serialNumber": sn, "error": str(e), "robotType": "V1 API", "detected_series": self._determine_robot_series_from_sn(sn) }) # 批量查询S-line机器人状态 if s_line_robots: try: s_results = await self.mcp.batch_get_robot_statuses_v2(s_line_robots) if isinstance(s_results, dict) and "results" in s_results: for result in s_results["results"]: result["api_version"] = "V2 (S-line)" result["detected_series"] = self._determine_robot_series_from_sn(result.get("serialNumber", "")) results.extend(s_results["results"]) else: results.extend(s_results if isinstance(s_results, list) else [s_results]) except Exception as e: for sn in s_line_robots: results.append({ "serialNumber": sn, "error": str(e), "robotType": "V2 API", "detected_series": self._determine_robot_series_from_sn(sn) }) return { "total": len(serial_numbers), "v1_count": len(v1_robots), "s_line_count": len(s_line_robots), "results": results }