abort_workflow
Terminate an active workflow instance in the MCP Kafka Schema Reg server by specifying its unique instance ID to halt ongoing processes.
Instructions
Abort an active workflow
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| instance_id | Yes |
Implementation Reference
- workflow_mcp_integration.py:135-148 (handler)MCP tool handler for 'abort_workflow'. This is the main entry point for the tool, decorated with @self.mcp.tool(). It takes instance_id, calls the underlying MultiStepElicitationManager.abort_workflow(), and returns JSON status.@self.mcp.tool(description="Abort an active workflow") async def abort_workflow(instance_id: str) -> str: """Abort an active workflow.""" success = await self.multi_step_manager.abort_workflow(instance_id) if success: return json.dumps( {"status": "aborted", "instance_id": instance_id, "message": "Workflow aborted successfully"} ) else: return json.dumps( {"error": f"Failed to abort workflow '{instance_id}'. It may not exist or already be completed."} )
- multi_step_elicitation.py:360-370 (helper)Core implementation of abort_workflow in MultiStepElicitationManager. Marks the workflow as aborted, moves state to completed_workflows, removes from active_states, and returns success boolean.async def abort_workflow(self, workflow_instance_id: str) -> bool: """Abort an active workflow.""" if workflow_instance_id in self.active_states: state = self.active_states[workflow_instance_id] state.metadata["aborted"] = True state.metadata["aborted_at"] = datetime.now(timezone.utc).isoformat() self.completed_workflows[workflow_instance_id] = state del self.active_states[workflow_instance_id] logger.info(f"Aborted workflow instance '{workflow_instance_id}'") return True return False