Skip to main content
Glama
jupiterbak

AYX-MCP-Wrapper

by jupiterbak

execute_workflow_with_monitoring

Execute workflows by ID with real-time monitoring to track job status and retrieve detailed results upon completion or failure.

Instructions

Execute a workflow by its ID and monitor its execution status. This call will return a jobID, he Job status and the job details once the execution is completed or failed. The input data parameter is a list of name-value pairs, each containing a name and value.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
workflow_idYes
input_dataNo

Implementation Reference

  • Core handler function in AYXMCPTools class that enqueues the workflow job on Alteryx Server and polls for completion, handling inputs, validation, timeout, and returning formatted results.
    def execute_workflow_with_monitoring(
            self,
            workflow_id: str, 
            input_data: Optional[List[InputData]] = None, 
            wait_for_completion: bool = True,
            timeout_seconds: int = 3600,
            poll_interval_seconds: int = 5
    ):
        """ Execute a workflow and monitor its execution status. 
        This call will return a jobID as well as the complete job details. 
        The input data parameter is a list of name-value pairs, each containing a name and value. """
        try:
            workflow = self.workflows_api.workflows_get_workflow(workflow_id)
            if not workflow:
                return "Error: Workflow not found"
            
            questions = self.workflows_api.workflows_get_workflow_questions(workflow_id)
            if (not questions or len(questions) == 0) and (input_data):
                return "Error: Workflow has no questions, input data not allowed"
            if questions and len(questions) > 0:
                for question in questions:
                    if question.name not in [item.name for item in input_data]:
                        return f"Error: Input data must contain the question '{question.name}'"
    
            # Convert InputData objects to AppValue objects
            app_values = None
            if input_data:
                app_values = [server_client.AppValue(name=item.name, value=item.value) for item in input_data]
    
            # Start the workflow execution
            contract = server_client.EnqueueJobContract(worker_tag=workflow.worker_tag, questions=app_values)
            job_response = self.workflows_api.workflows_enqueue(workflow_id, contract)
    
            # Parse the job response to get the job ID
            if job_response.status != "Queued":
                # Out put error
                return pprint.pformat({
                    "success": False,
                    "job_id": job_id,
                    "status": job_response.status,
                    "message": f"Error: Workfow was not successfully started. Current JobID:'{job_response.id}"
                })
    
            # Extract the job id
            job_id = job_response.id
    
            if not wait_for_completion:
                return pprint.pformat({
                    "success": True,
                    "job_id": job_id,
                    "status": "Started",
                    "message": "Job started successfully, not waiting for completion"
                })
    
            start_time = time.time()
    
            while True:
                # Check if timeout exceeded
                if time.time() - start_time > timeout_seconds:
                    return pprint.pformat({
                        "success": False,
                        "job_id": job_id,
                        "status": "Timeout",
                        "error": f"Job execution timed out after {timeout_seconds} seconds"
                    })
                
                # Get job status
                try:
                    job_details = self.jobs_api.jobs_get_job_v3(job_id)
                    
                    if job_details.status in ["Completed", "Cancelled"]:
                        # Get final job details including outputs and messages
                        final_details = job_details
                        job_messages = self.jobs_api.jobs_get_job_messages(job_id=job_id)
                        return  pprint.pformat({
                                "success": job_details.status == "Completed",
                                "job_id": job_id,
                                "status": job_details.status,
                                "job_details": final_details,
                                "execution_time_seconds": time.time() - start_time
                            })
                    # else:
                    #     return pprint.pformat({
                    #         "success": False,
                    #         "job_id": job_id,
                    #         "status": "Failed",
                    #         "error": "Could not extract status from job details, continuing to monitor..."
                    #     }) 
                    
                except Exception as e:
                    return pprint.pformat({
                        "success": False,
                        "job_id": job_id,
                        "status": "Failed",
                        "error": f"Error checking job status: {str(e)}"
                    })           
                # Wait before next check
                time.sleep(poll_interval_seconds)
    
        except ApiException as e:
            return pprint.pformat({
                "success": False,
                "error": f"Unexpected error: {str(e)}",
                "job_id": None,
                "status": "Failed"
            })
  • MCP tool registration decorator (@self.app.tool()) that defines the tool interface and delegates execution to the underlying AYXMCPTools instance with default monitoring parameters.
    def execute_workflow_with_monitoring(
            workflow_id: str, 
            input_data: list[InputData] = None
    ):
        """Execute a workflow by its ID and monitor its execution status. This call will return a jobID, he Job status and the job details once the execution is completed or failed.
        The input data parameter is a list of name-value pairs, each containing a name and value. """
        return self.tools.execute_workflow_with_monitoring(
            workflow_id, 
            input_data, 
            wait_for_completion=True,
            timeout_seconds=300,  # 5 minutes timeout
            poll_interval_seconds=10
        )
  • Pydantic model defining the structure for input data items used in workflow execution (list of InputData for questions/app values).
    class InputData(BaseModel):
        name: str
        value: str

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jupiterbak/AYX-MCP-Wrapper'

If you have feedback or need assistance with the MCP directory API, please join our Discord server