execute_m_line_task_workflow
Automates the process of querying robot status, selecting tasks, and executing commands for M-line robot operations.
Instructions
Executes complete M-line robot task workflow.
Automated process: Status query → Task selection → Command execution
Args:
serial_number: The serial number of the target robot.
task_selection_criteria: Optional task selection criteria.
Returns:
A dictionary containing the workflow execution result.Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| serial_number | Yes | ||
| task_selection_criteria | No |
Implementation Reference
- src/gs_openapi/main.py:238-257 (registration)MCP tool registration for execute_m_line_task_workflow using @mcp.tool() decorator. Delegates to mcp.execute_m_line_task_workflow method.
@mcp.tool() async def execute_m_line_task_workflow( serial_number: str, task_selection_criteria: Optional[dict] = None ): """Executes complete M-line robot task workflow. Automated process: Status query → Task selection → Command execution Args: serial_number: The serial number of the target robot. task_selection_criteria: Optional task selection criteria. Returns: A dictionary containing the workflow execution result. """ return await mcp.execute_m_line_task_workflow( serial_number=serial_number, task_selection_criteria=task_selection_criteria ) - Handler method in GausiumMCP class that delegates to TaskExecutionEngine.execute_m_line_task.
async def execute_m_line_task_workflow( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人完整任务工作流。 自动化流程:状态查询 → 任务选择 → 指令下发 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 工作流执行结果 """ return await self.task_engine.execute_m_line_task( serial_number=serial_number, task_selection_criteria=task_selection_criteria ) - Core implementation: executes M-line robot task workflow by querying robot status, extracting available tasks, selecting a task based on criteria, and submitting a START_TASK command via the Gausium API.
async def execute_m_line_task( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人任务。 工作流: 1. 获取机器人状态 2. 从状态中提取可执行任务列表 3. 根据条件选择任务 4. 通过Create Robot Command下发任务 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 任务执行结果 """ logger.info(f"Starting M-line task execution for robot: {serial_number}") async with GausiumAPIClient() as client: try: # 1. 获取机器人状态 status = await client.call_endpoint( 'get_robot_status_v1', path_params={'serial_number': serial_number} ) # 2. 解析可执行任务列表 available_tasks = self._extract_m_line_tasks(status) if not available_tasks: raise ValueError("No executable tasks found in robot status") # 3. 选择任务 selected_task = self._select_m_line_task( available_tasks, task_selection_criteria or {} ) # 4. 构建并下发任务指令 command_result = await client.call_endpoint( 'create_command', path_params={'serial_number': serial_number}, json_data={ "serialNumber": serial_number, "remoteTaskCommandType": "START_TASK", "commandParameter": { "startTaskParameter": selected_task } } ) logger.info(f"M-line task executed successfully: {command_result}") return command_result except Exception as e: logger.error(f"M-line task execution failed: {str(e)}") raise - Helper to extract available tasks from robot status response.
def _extract_m_line_tasks(self, robot_status: Dict[str, Any]) -> List[Dict[str, Any]]: """ 从M线机器人状态中提取可执行任务列表。 Args: robot_status: 机器人状态信息 Returns: 可执行任务列表 """ # TODO: 根据实际API响应格式实现 # 这里需要根据实际的机器人状态响应格式来解析任务列表 tasks = robot_status.get('available_tasks', []) if not tasks: # 如果没有直接的任务列表,尝试从其他字段解析 # 需要根据实际响应格式调整 logger.warning("No 'available_tasks' field found in robot status") return tasks - Helper to select a task from the available list based on criteria (returns first if no criteria or no match).
def _select_m_line_task( self, available_tasks: List[Dict[str, Any]], criteria: Dict[str, Any] ) -> Dict[str, Any]: """ 根据条件选择M线任务。 Args: available_tasks: 可用任务列表 criteria: 选择条件 Returns: 选中的任务 """ if not criteria: # 如果没有指定条件,返回第一个任务 return available_tasks[0] # TODO: 实现更复杂的任务选择逻辑 # 可以根据任务名称、地图、清洁模式等条件筛选 for task in available_tasks: if self._matches_criteria(task, criteria): return task # 如果没有匹配的任务,返回第一个 return available_tasks[0]