Skip to main content
Glama

execute_workflow_with_monitoring

Run and monitor a workflow by its ID, receiving job status and details upon completion or failure. Input data is provided as name-value pairs for workflow execution.

Instructions

Execute a workflow by its ID and monitor its execution status. This call will return a jobID, he Job status and the job details once the execution is completed or failed. The input data parameter is a list of name-value pairs, each containing a name and value.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
input_dataNo
workflow_idYes

Implementation Reference

  • Core handler function that enqueues a workflow job, monitors its status with polling until completion or timeout, and returns detailed results including success status, job details, and execution time.
    def execute_workflow_with_monitoring( self, workflow_id: str, input_data: Optional[List[InputData]] = None, wait_for_completion: bool = True, timeout_seconds: int = 3600, poll_interval_seconds: int = 5 ): """ Execute a workflow and monitor its execution status. This call will return a jobID as well as the complete job details. The input data parameter is a list of name-value pairs, each containing a name and value. """ try: workflow = self.workflows_api.workflows_get_workflow(workflow_id) if not workflow: return "Error: Workflow not found" questions = self.workflows_api.workflows_get_workflow_questions(workflow_id) if (not questions or len(questions) == 0) and (input_data): return "Error: Workflow has no questions, input data not allowed" if questions and len(questions) > 0: for question in questions: if question.name not in [item.name for item in input_data]: return f"Error: Input data must contain the question '{question.name}'" # Convert InputData objects to AppValue objects app_values = None if input_data: app_values = [server_client.AppValue(name=item.name, value=item.value) for item in input_data] # Start the workflow execution contract = server_client.EnqueueJobContract(worker_tag=workflow.worker_tag, questions=app_values) job_response = self.workflows_api.workflows_enqueue(workflow_id, contract) # Parse the job response to get the job ID if job_response.status != "Queued": # Out put error return pprint.pformat({ "success": False, "job_id": job_id, "status": job_response.status, "message": f"Error: Workfow was not successfully started. Current JobID:'{job_response.id}" }) # Extract the job id job_id = job_response.id if not wait_for_completion: return pprint.pformat({ "success": True, "job_id": job_id, "status": "Started", "message": "Job started successfully, not waiting for completion" }) start_time = time.time() while True: # Check if timeout exceeded if time.time() - start_time > timeout_seconds: return pprint.pformat({ "success": False, "job_id": job_id, "status": "Timeout", "error": f"Job execution timed out after {timeout_seconds} seconds" }) # Get job status try: job_details = self.jobs_api.jobs_get_job_v3(job_id) if job_details.status in ["Completed", "Cancelled"]: # Get final job details including outputs and messages final_details = job_details job_messages = self.jobs_api.jobs_get_job_messages(job_id=job_id) return pprint.pformat({ "success": job_details.status == "Completed", "job_id": job_id, "status": job_details.status, "job_details": final_details, "execution_time_seconds": time.time() - start_time }) # else: # return pprint.pformat({ # "success": False, # "job_id": job_id, # "status": "Failed", # "error": "Could not extract status from job details, continuing to monitor..." # }) except Exception as e: return pprint.pformat({ "success": False, "job_id": job_id, "status": "Failed", "error": f"Error checking job status: {str(e)}" }) # Wait before next check time.sleep(poll_interval_seconds) except ApiException as e: return pprint.pformat({ "success": False, "error": f"Unexpected error: {str(e)}", "job_id": None, "status": "Failed" })
  • MCP tool registration decorator (@self.app.tool()) that defines the tool interface and delegates to the underlying tools.execute_workflow_with_monitoring with default monitoring parameters.
    def execute_workflow_with_monitoring( workflow_id: str, input_data: list[InputData] = None ): """Execute a workflow by its ID and monitor its execution status. This call will return a jobID, he Job status and the job details once the execution is completed or failed. The input data parameter is a list of name-value pairs, each containing a name and value. """ return self.tools.execute_workflow_with_monitoring( workflow_id, input_data, wait_for_completion=True, timeout_seconds=300, # 5 minutes timeout poll_interval_seconds=10 )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jupiterbak/AYX-MCP-Wrapper'

If you have feedback or need assistance with the MCP directory API, please join our Discord server