execute_m_line_task_workflow
Executes automated robot task workflows by querying status, selecting tasks, and running commands for GS cleaning robots.
Instructions
Executes complete M-line robot task workflow.
Automated process: Status query → Task selection → Command execution
Args:
serial_number: The serial number of the target robot.
task_selection_criteria: Optional task selection criteria.
Returns:
A dictionary containing the workflow execution result.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| serial_number | Yes | ||
| task_selection_criteria | No |
Implementation Reference
- src/gs_openapi/main.py:238-257 (registration)MCP tool registration with @mcp.tool() decorator. Thin wrapper that delegates to GausiumMCP instance method.@mcp.tool() async def execute_m_line_task_workflow( serial_number: str, task_selection_criteria: Optional[dict] = None ): """Executes complete M-line robot task workflow. Automated process: Status query → Task selection → Command execution Args: serial_number: The serial number of the target robot. task_selection_criteria: Optional task selection criteria. Returns: A dictionary containing the workflow execution result. """ return await mcp.execute_m_line_task_workflow( serial_number=serial_number, task_selection_criteria=task_selection_criteria )
- Core handler logic: fetches robot status, extracts and selects available tasks, issues START_TASK command via API client.async def execute_m_line_task( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人任务。 工作流: 1. 获取机器人状态 2. 从状态中提取可执行任务列表 3. 根据条件选择任务 4. 通过Create Robot Command下发任务 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 任务执行结果 """ logger.info(f"Starting M-line task execution for robot: {serial_number}") async with GausiumAPIClient() as client: try: # 1. 获取机器人状态 status = await client.call_endpoint( 'get_robot_status_v1', path_params={'serial_number': serial_number} ) # 2. 解析可执行任务列表 available_tasks = self._extract_m_line_tasks(status) if not available_tasks: raise ValueError("No executable tasks found in robot status") # 3. 选择任务 selected_task = self._select_m_line_task( available_tasks, task_selection_criteria or {} ) # 4. 构建并下发任务指令 command_result = await client.call_endpoint( 'create_command', path_params={'serial_number': serial_number}, json_data={ "serialNumber": serial_number, "remoteTaskCommandType": "START_TASK", "commandParameter": { "startTaskParameter": selected_task } } ) logger.info(f"M-line task executed successfully: {command_result}") return command_result except Exception as e: logger.error(f"M-line task execution failed: {str(e)}") raise
- GausiumMCP class method that delegates workflow execution to the task_engine instance.async def execute_m_line_task_workflow( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人完整任务工作流。 自动化流程:状态查询 → 任务选择 → 指令下发 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 工作流执行结果 """ return await self.task_engine.execute_m_line_task( serial_number=serial_number, task_selection_criteria=task_selection_criteria )