cancel_task
Stop a running A2A agent task by providing its task ID to manage agent operations.
Instructions
Cancel a running task on an A2A agent.
Args: task_id: ID of the task to cancel
Returns: Cancellation result
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | Yes |
Implementation Reference
- a2a_mcp_server.py:686-752 (handler)MCP tool handler for 'cancel_task'. Looks up the agent URL for the task, creates an A2AClient, calls client.cancel_task with the task ID, and returns success/error based on the A2A response.async def cancel_task( task_id: str, ctx: Context = None, ) -> Dict[str, Any]: """ Cancel a running task on an A2A agent. Args: task_id: ID of the task to cancel Returns: Cancellation result """ if task_id not in task_agent_mapping: return { "status": "error", "message": f"Task ID not found: {task_id}", } agent_url = task_agent_mapping[task_id] # Create a client for the agent client = A2AClient(url=agent_url) try: # Create the request payload payload = { "id": task_id } if ctx: await ctx.info(f"Cancelling task: {task_id}") # Send the cancel task request result = await client.cancel_task(payload) # Debug: Print the raw response for analysis if ctx: await ctx.info(f"Raw cancellation result: {result}") # Create a response dictionary if hasattr(result, "error"): return { "status": "error", "task_id": task_id, "message": result.error.message, "code": result.error.code } elif hasattr(result, "result"): return { "status": "success", "task_id": task_id, "message": "Task cancelled successfully" } else: return { "status": "unknown", "task_id": task_id, "message": "Unexpected response format" } except Exception as e: return { "status": "error", "message": f"Error cancelling task: {str(e)}", }
- common/client/client.py:84-86 (helper)A2A client method that sends the CancelTaskRequest to the A2A server.async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse: request = CancelTaskRequest(params=payload) return CancelTaskResponse(**await self._send_request(request))
- common/server/server.py:83-86 (registration)A2A server request dispatcher that handles CancelTaskRequest by calling task_manager.on_cancel_task.elif isinstance(json_rpc_request, CancelTaskRequest): result = await self.task_manager.on_cancel_task( json_rpc_request )
- common/server/task_manager.py:106-119 (handler)InMemoryTaskManager implementation of on_cancel_task. Currently returns TaskNotCancelableError without performing cancellation.async def on_cancel_task( self, request: CancelTaskRequest ) -> CancelTaskResponse: logger.info(f'Cancelling task {request.params.id}') task_id_params: TaskIdParams = request.params async with self.lock: task = self.tasks.get(task_id_params.id) if task is None: return CancelTaskResponse( id=request.id, error=TaskNotFoundError() ) return CancelTaskResponse(id=request.id, error=TaskNotCancelableError())
- common/types.py:202-209 (schema)Pydantic models defining the request and response schemas for the A2A 'tasks/cancel' method.class CancelTaskRequest(JSONRPCRequest): method: Literal['tasks/cancel',] = 'tasks/cancel' params: TaskIdParams class CancelTaskResponse(JSONRPCResponse): result: Task | None = None