Skip to main content
Glama

execute_m_line_task_workflow

Executes automated robot task workflows by querying status, selecting tasks, and running commands for GS cleaning robots.

Instructions

Executes complete M-line robot task workflow.

Automated process: Status query → Task selection → Command execution Args: serial_number: The serial number of the target robot. task_selection_criteria: Optional task selection criteria. Returns: A dictionary containing the workflow execution result.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
serial_numberYes
task_selection_criteriaNo

Implementation Reference

  • MCP tool registration with @mcp.tool() decorator. Thin wrapper that delegates to GausiumMCP instance method.
    @mcp.tool() async def execute_m_line_task_workflow( serial_number: str, task_selection_criteria: Optional[dict] = None ): """Executes complete M-line robot task workflow. Automated process: Status query → Task selection → Command execution Args: serial_number: The serial number of the target robot. task_selection_criteria: Optional task selection criteria. Returns: A dictionary containing the workflow execution result. """ return await mcp.execute_m_line_task_workflow( serial_number=serial_number, task_selection_criteria=task_selection_criteria )
  • Core handler logic: fetches robot status, extracts and selects available tasks, issues START_TASK command via API client.
    async def execute_m_line_task( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人任务。 工作流: 1. 获取机器人状态 2. 从状态中提取可执行任务列表 3. 根据条件选择任务 4. 通过Create Robot Command下发任务 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 任务执行结果 """ logger.info(f"Starting M-line task execution for robot: {serial_number}") async with GausiumAPIClient() as client: try: # 1. 获取机器人状态 status = await client.call_endpoint( 'get_robot_status_v1', path_params={'serial_number': serial_number} ) # 2. 解析可执行任务列表 available_tasks = self._extract_m_line_tasks(status) if not available_tasks: raise ValueError("No executable tasks found in robot status") # 3. 选择任务 selected_task = self._select_m_line_task( available_tasks, task_selection_criteria or {} ) # 4. 构建并下发任务指令 command_result = await client.call_endpoint( 'create_command', path_params={'serial_number': serial_number}, json_data={ "serialNumber": serial_number, "remoteTaskCommandType": "START_TASK", "commandParameter": { "startTaskParameter": selected_task } } ) logger.info(f"M-line task executed successfully: {command_result}") return command_result except Exception as e: logger.error(f"M-line task execution failed: {str(e)}") raise
  • GausiumMCP class method that delegates workflow execution to the task_engine instance.
    async def execute_m_line_task_workflow( self, serial_number: str, task_selection_criteria: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 执行M线机器人完整任务工作流。 自动化流程:状态查询 → 任务选择 → 指令下发 Args: serial_number: 机器人序列号 task_selection_criteria: 任务选择条件 Returns: 工作流执行结果 """ return await self.task_engine.execute_m_line_task( serial_number=serial_number, task_selection_criteria=task_selection_criteria )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cfrs2005/mcp-gs-robot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server