Skip to main content
Glama

GS Robot MCP Server

by cfrs2005
robot_router.py10.3 kB
""" 机器人API智能路由工具。 根据机器人系列自动选择正确的API版本和端点。 """ from typing import Dict, Any, Optional, List from ..mcp.gausium_mcp import GausiumMCP class RobotAPIRouter: """机器人API智能路由器。 根据机器人序列号前缀自动路由到正确的API版本: - M-line: GS100/GS500 (75系列), GS301/GS401 (50系列), GS442 (40系列) - S-line: GS438/GS408 (S系列), GS43C (SW系列) - 默认: 未知前缀使用V1 API """ # 机器人系列前缀映射 ROBOT_SERIES_MAPPING = { # M-line 机器人 "GS100": "75", # 75系列 "GS400": "75", # 75系列 (新发现) "GS500": "75", # 75系列 "GS301": "50", # 50系列 "GS401": "50", # 50系列 "GS501": "50", # 50系列 (新发现) "GS442": "40", # 40系列 # S-line 机器人 "GS438": "S", # S系列 "GS408": "S", # S系列 "GS43C": "SW", # SW系列 } def __init__(self, mcp: GausiumMCP): self.mcp = mcp self._robot_cache: Dict[str, Dict[str, Any]] = {} async def get_robot_info(self, serial_number: str) -> Optional[Dict[str, Any]]: """获取机器人基本信息(带缓存)。""" if serial_number in self._robot_cache: return self._robot_cache[serial_number] try: # 从机器人列表中查找 robots_response = await self.mcp.list_robots(page=1, page_size=100, relation="bound") for robot in robots_response.get("robots", []): if robot["serialNumber"] == serial_number: self._robot_cache[serial_number] = robot return robot # 如果第一页没找到,搜索更多页面 total = int(robots_response.get("total", "0")) pages_needed = (total + 99) // 100 # 向上取整 for page in range(2, min(pages_needed + 1, 20)): # 最多搜索20页 robots_response = await self.mcp.list_robots(page=page, page_size=100, relation="bound") for robot in robots_response.get("robots", []): if robot["serialNumber"] == serial_number: self._robot_cache[serial_number] = robot return robot except Exception as e: print(f"Failed to get robot info for {serial_number}: {e}") return None def _determine_robot_series_from_sn(self, serial_number: str) -> str: """根据序列号前缀判断机器人系列。 Args: serial_number: 机器人序列号 Returns: 机器人系列代码 (40, 50, 75, S, SW) 或 "unknown" """ if len(serial_number) < 5: return "unknown" prefix = serial_number[:5] return self.ROBOT_SERIES_MAPPING.get(prefix, "unknown") def is_m_line_robot(self, model_family_code: str) -> bool: """判断是否为M-line机器人。""" return model_family_code in ["40", "50", "75", "OMNIE"] def is_s_line_robot(self, model_family_code: str) -> bool: """判断是否为S-line机器人。""" return model_family_code in ["S", "SW"] async def get_robot_status_smart(self, serial_number: str) -> Dict[str, Any]: """智能获取机器人状态。 自动根据机器人序列号前缀选择V1或V2 API。 """ # 基于序列号前缀判断机器人系列 detected_series = self._determine_robot_series_from_sn(serial_number) if self.is_s_line_robot(detected_series): # S-line 机器人使用 V2 API result = await self.mcp.get_robot_status_v2(serial_number) result["api_version"] = "V2 (S-line)" result["detected_series"] = detected_series return result else: # M-line 机器人或未知类型默认使用 V1 API result = await self.mcp.get_robot_status_v1(serial_number) result["api_version"] = "V1 (M-line/Default)" result["detected_series"] = detected_series return result async def get_task_reports_smart(self, serial_number: str, **kwargs) -> Dict[str, Any]: """智能获取任务报告。 自动根据机器人序列号前缀选择M-line或S-line任务报告API。 """ # 基于序列号前缀判断机器人系列 detected_series = self._determine_robot_series_from_sn(serial_number) if self.is_s_line_robot(detected_series): # S-line 机器人使用专用任务报告API result = await self.mcp.list_robot_task_reports_s(serial_number, **kwargs) result["api_version"] = "S-line API" result["detected_series"] = detected_series return result else: # M-line 机器人或未知类型使用默认任务报告API result = await self.mcp.list_robot_task_reports(serial_number, **kwargs) result["api_version"] = "M-line/Default API" result["detected_series"] = detected_series return result async def batch_get_robot_statuses_smart(self, serial_numbers: List[str]) -> Dict[str, Any]: """智能批量获取机器人状态。 自动根据机器人序列号前缀分组并选择正确的批量API。 """ if not serial_numbers: return {"results": []} # 按机器人系列分组(基于序列号前缀) m_line_robots = [] s_line_robots = [] unknown_robots = [] for serial_number in serial_numbers: detected_series = self._determine_robot_series_from_sn(serial_number) if self.is_s_line_robot(detected_series): s_line_robots.append(serial_number) elif self.is_m_line_robot(detected_series): m_line_robots.append(serial_number) else: # 未知前缀默认归类为M-line (使用V1 API) unknown_robots.append(serial_number) results = [] # 合并M-line机器人和未知机器人,都用V1 API v1_robots = m_line_robots + unknown_robots # 批量查询V1 API机器人状态 if v1_robots: try: v1_results = await self.mcp.batch_get_robot_statuses_v1(v1_robots) if isinstance(v1_results, dict) and "results" in v1_results: for result in v1_results["results"]: result["api_version"] = "V1 (M-line/Default)" result["detected_series"] = self._determine_robot_series_from_sn(result.get("serialNumber", "")) results.extend(v1_results["results"]) else: results.extend(v1_results if isinstance(v1_results, list) else [v1_results]) except Exception as e: for sn in v1_robots: results.append({ "serialNumber": sn, "error": str(e), "robotType": "V1 API", "detected_series": self._determine_robot_series_from_sn(sn) }) # 批量查询S-line机器人状态 if s_line_robots: try: s_results = await self.mcp.batch_get_robot_statuses_v2(s_line_robots) if isinstance(s_results, dict) and "results" in s_results: for result in s_results["results"]: result["api_version"] = "V2 (S-line)" result["detected_series"] = self._determine_robot_series_from_sn(result.get("serialNumber", "")) results.extend(s_results["results"]) else: results.extend(s_results if isinstance(s_results, list) else [s_results]) except Exception as e: for sn in s_line_robots: results.append({ "serialNumber": sn, "error": str(e), "robotType": "V2 API", "detected_series": self._determine_robot_series_from_sn(sn) }) return { "total": len(serial_numbers), "v1_count": len(v1_robots), "s_line_count": len(s_line_robots), "results": results } async def get_capabilities(self, serial_number: str) -> Dict[str, Any]: """获取机器人支持的API能力。""" # 基于序列号前缀判断机器人系列 detected_series = self._determine_robot_series_from_sn(serial_number) capabilities = { "basic_status": True, "command_control": True, "task_reports": True, "maps": True, "site_info": False, "advanced_tasks": False } if self.is_s_line_robot(detected_series): capabilities.update({ "site_info": True, "advanced_tasks": True, "v2_status": True, "v1_status": False }) robot_series = "S-line" elif self.is_m_line_robot(detected_series): capabilities.update({ "v1_status": True, "v2_status": False, "command_control": True }) robot_series = "M-line" else: # 未知前缀默认使用V1 API capabilities.update({ "v1_status": True, "v2_status": False, "command_control": True }) robot_series = "Unknown (Default V1)" return { "serial_number": serial_number, "robot_series": robot_series, "detected_series": detected_series, "prefix": serial_number[:5] if len(serial_number) >= 5 else serial_number, "capabilities": capabilities }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cfrs2005/mcp-gs-robot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server