get_workflow_status
Retrieve the current status of active workflows in the MCP Kafka Schema Registry. Use the workflow ID to filter results for targeted tracking and management.
Instructions
Get the status of active workflows.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| workflow_id | No |
Implementation Reference
- workflow_mcp_integration.py:93-134 (handler)Handler function for the workflow status tool. Retrieves status of specific or all active workflows, including progress, responses, and completion status. This corresponds to the 'get_workflow_status' tool mentioned in tests, though registered under 'workflow_status' name.@self.mcp.tool(description="Get the status of active workflows") async def workflow_status(instance_id: Optional[str] = None) -> str: """Get workflow status.""" if instance_id: # Get specific workflow status state = self.multi_step_manager.active_states.get(instance_id) if state: workflow_id = state.metadata.get("workflow_definition_id") workflow = get_workflow_by_id(workflow_id) if workflow_id else None return json.dumps( { "instance_id": instance_id, "workflow_name": workflow.name if workflow else "Unknown", "current_step": state.current_step_id, "steps_completed": len(state.step_history) - 1, "total_steps": len(workflow.steps) if workflow else 0, "responses": state.get_all_responses(), "created_at": state.created_at.isoformat(), "updated_at": state.updated_at.isoformat(), } ) else: # Check completed workflows completed_state = self.multi_step_manager.completed_workflows.get(instance_id) if completed_state: return json.dumps( { "instance_id": instance_id, "status": "completed", "workflow_name": completed_state.metadata.get("workflow_name"), "steps_completed": len(completed_state.step_history), "responses": completed_state.get_all_responses(), "completed": True, } ) else: return json.dumps({"error": f"Workflow instance '{instance_id}' not found"}) else: # Get all active workflows active = self.multi_step_manager.get_active_workflows() return json.dumps({"active_workflows": active, "total_active": len(active)})