abort_workflow
Terminate an active workflow instance in the MCP Kafka Schema Reg server by specifying its unique instance ID to halt ongoing processes.
Instructions
Abort an active workflow
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| instance_id | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- workflow_mcp_integration.py:135-148 (handler)MCP tool handler for 'abort_workflow'. This is the main entry point for the tool, decorated with @self.mcp.tool(). It takes instance_id, calls the underlying MultiStepElicitationManager.abort_workflow(), and returns JSON status.
@self.mcp.tool(description="Abort an active workflow") async def abort_workflow(instance_id: str) -> str: """Abort an active workflow.""" success = await self.multi_step_manager.abort_workflow(instance_id) if success: return json.dumps( {"status": "aborted", "instance_id": instance_id, "message": "Workflow aborted successfully"} ) else: return json.dumps( {"error": f"Failed to abort workflow '{instance_id}'. It may not exist or already be completed."} ) - multi_step_elicitation.py:360-370 (helper)Core implementation of abort_workflow in MultiStepElicitationManager. Marks the workflow as aborted, moves state to completed_workflows, removes from active_states, and returns success boolean.
async def abort_workflow(self, workflow_instance_id: str) -> bool: """Abort an active workflow.""" if workflow_instance_id in self.active_states: state = self.active_states[workflow_instance_id] state.metadata["aborted"] = True state.metadata["aborted_at"] = datetime.now(timezone.utc).isoformat() self.completed_workflows[workflow_instance_id] = state del self.active_states[workflow_instance_id] logger.info(f"Aborted workflow instance '{workflow_instance_id}'") return True return False