execute_m_line_task_workflow
Execute complete robot task workflows by querying robot status, selecting appropriate tasks, and running commands automatically for GS cleaning robots.
Instructions
Executes complete M-line robot task workflow.
Automated process: Status query → Task selection → Command execution
Args:
serial_number: The serial number of the target robot.
task_selection_criteria: Optional task selection criteria.
Returns:
A dictionary containing the workflow execution result.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| serial_number | Yes | ||
| task_selection_criteria | No |
Implementation Reference
- Core handler implementing the M-line task workflow logic: queries robot status, extracts and selects available tasks, then creates and sends START_TASK command.async def execute_m_line_task( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人任务。 工作流: 1. 获取机器人状态 2. 从状态中提取可执行任务列表 3. 根据条件选择任务 4. 通过Create Robot Command下发任务 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 任务执行结果 """ logger.info(f"Starting M-line task execution for robot: {serial_number}") async with GausiumAPIClient() as client: try: # 1. 获取机器人状态 status = await client.call_endpoint( 'get_robot_status_v1', path_params={'serial_number': serial_number} ) # 2. 解析可执行任务列表 available_tasks = self._extract_m_line_tasks(status) if not available_tasks: raise ValueError("No executable tasks found in robot status") # 3. 选择任务 selected_task = self._select_m_line_task( available_tasks, task_selection_criteria or {} ) # 4. 构建并下发任务指令 command_result = await client.call_endpoint( 'create_command', path_params={'serial_number': serial_number}, json_data={ "serialNumber": serial_number, "remoteTaskCommandType": "START_TASK", "commandParameter": { "startTaskParameter": selected_task } } ) logger.info(f"M-line task executed successfully: {command_result}") return command_result except Exception as e: logger.error(f"M-line task execution failed: {str(e)}") raise
- src/gs_openapi/main.py:238-257 (registration)MCP tool registration and entry-point handler for 'execute_m_line_task_workflow', delegates to GausiumMCP instance.@mcp.tool() async def execute_m_line_task_workflow( serial_number: str, task_selection_criteria: Optional[dict] = None ): """Executes complete M-line robot task workflow. Automated process: Status query → Task selection → Command execution Args: serial_number: The serial number of the target robot. task_selection_criteria: Optional task selection criteria. Returns: A dictionary containing the workflow execution result. """ return await mcp.execute_m_line_task_workflow( serial_number=serial_number, task_selection_criteria=task_selection_criteria )
- Intermediate handler in GausiumMCP class that forwards the call to TaskExecutionEngine.async def execute_m_line_task_workflow( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人完整任务工作流。 自动化流程:状态查询 → 任务选择 → 指令下发 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 工作流执行结果 """ return await self.task_engine.execute_m_line_task( serial_number=serial_number, task_selection_criteria=task_selection_criteria )
- Helper method to extract available tasks from M-line robot status.async def execute_s_line_site_task( self, robot_id: str, task_parameters: Dict[str, Any] ) -> Dict[str, Any]: """ 执行S线有站点任务。 工作流: 1. 获取站点信息 2. 解析可用地图 3. 获取目标地图分区 4. 构建并下发有站点临时任务 Args: robot_id: 机器人ID task_parameters: 任务参数 Returns: 任务执行结果 """ logger.info(f"Starting S-line site task execution for robot: {robot_id}") async with GausiumAPIClient() as client: try: # 1. 获取站点信息 site_info = await client.call_endpoint( 'get_site_info', path_params={'robot_id': robot_id} ) # 2. 解析可用地图 available_maps = self._extract_maps_from_site(site_info) if not available_maps: raise ValueError("No maps found in site information") # 3. 选择目标地图 target_map_id = self._select_map( available_maps, task_parameters.get('map_criteria', {}) ) # 4. 获取地图分区 subareas = await client.call_endpoint( 'get_map_subareas', path_params={'map_id': target_map_id} ) # 5. 构建任务数据 task_data = self._build_site_task_data( target_map_id, subareas, task_parameters ) # 6. 下发有站点临时任务 task_result = await client.call_endpoint( 'submit_temp_site_task', json_data=task_data ) logger.info(f"S-line site task executed successfully: {task_result}") return task_result except Exception as e: logger.error(f"S-line site task execution failed: {str(e)}") raise async def execute_s_line_no_site_task( self, robot_sn: str, task_parameters: Dict[str, Any] ) -> Dict[str, Any]: """ 执行S线无站点任务。 工作流: 1. 获取机器人地图列表 2. 获取目标地图分区 3. 构建并下发无站点临时任务 Args: robot_sn: 机器人序列号 task_parameters: 任务参数 Returns: 任务执行结果 """ logger.info(f"Starting S-line no-site task execution for robot: {robot_sn}") async with GausiumAPIClient() as client: try: # 1. 获取机器人地图列表 maps_response = await client.call_endpoint( 'list_maps', json_data={'robotSn': robot_sn} ) # 处理Gausium特殊响应格式 if maps_response.get('code') != 0: raise RuntimeError(f"Failed to get maps: {maps_response.get('msg')}") available_maps = maps_response.get('data', []) if not available_maps: raise ValueError("No maps found for robot") # 2. 选择目标地图 target_map_id = self._select_map_from_list( available_maps, task_parameters.get('map_criteria', {}) ) # 3. 获取地图分区 subareas = await client.call_endpoint( 'get_map_subareas', path_params={'map_id': target_map_id} ) # 4. 构建任务数据 task_data = self._build_no_site_task_data( target_map_id, subareas, task_parameters ) # 5. 下发无站点临时任务 task_result = await client.call_endpoint( 'submit_temp_no_site_task', json_data=task_data ) logger.info(f"S-line no-site task executed successfully: {task_result}") return task_result except Exception as e: logger.error(f"S-line no-site task execution failed: {str(e)}") raise def _extract_m_line_tasks(self, robot_status: Dict[str, Any]) -> List[Dict[str, Any]]:
- Helper method to select a specific task from available tasks based on criteria.def _select_m_line_task( self, available_tasks: List[Dict[str, Any]], criteria: Dict[str, Any] ) -> Dict[str, Any]: """ 根据条件选择M线任务。 Args: available_tasks: 可用任务列表 criteria: 选择条件 Returns: 选中的任务 """ if not criteria: # 如果没有指定条件,返回第一个任务 return available_tasks[0] # TODO: 实现更复杂的任务选择逻辑 # 可以根据任务名称、地图、清洁模式等条件筛选 for task in available_tasks: if self._matches_criteria(task, criteria): return task # 如果没有匹配的任务,返回第一个 return available_tasks[0]