ssh_cancel_async_task
Halt an active asynchronous task on your SSH-managed infrastructure by specifying its task ID.
Instructions
Cancel a running async task.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/mcp_ssh/mcp_server.py:1622-1661 (handler)The ssh_cancel_async_task tool implementation. It validates the task_id, delegates cancellation to ASYNC_TASKS.cancel_task(), and returns a response with cancellation status.
@mcp.tool() def ssh_cancel_async_task(task_id: str = "", ctx: Context | None = None) -> ToolResult: """Cancel a running async task.""" try: # Input validation valid, error_msg = _validate_task_id(task_id) if not valid: return f"Error: {error_msg}" task_id = task_id.strip() success = ASYNC_TASKS.cancel_task(task_id) response = { "task_id": task_id, "cancelled": bool(success), "message": ( "Cancellation signaled" if success else "Task not found or not cancellable" ), } _ctx_log( ctx, "info", "ssh_cancel_async_task", {"task_id": task_id, "cancelled": bool(success)}, ) return response except Exception as e: error_str = str(e) log_json( {"level": "error", "msg": "cancel_async_exception", "error": error_str} ) _ctx_log( ctx, "debug", "ssh_cancel_async_task_error", {"task_id": task_id.strip(), "error": sanitize_error(error_str)}, ) return f"Cancel error: {sanitize_error(error_str)}" - The AsyncTaskManager.cancel_task() method that performs the actual cancellation logic. It looks up the task, sets the cancel event, marks status as cancelled, and sends a notification.
def cancel_task(self, task_id: str) -> bool: """Cancel a running task.""" with self._lock: task_info = self._tasks.get(task_id) if task_info and task_info["status"] in ["pending", "running"]: task_info["cancel"].set() task_info["status"] = "cancelled" # Send cancellation notification self._send_notification( "cancelled", task_id, { "reason": "user_requested", "max_seconds": int( task_info.get("limits", {}).get("max_seconds", 60) ), }, ) return True return False - src/mcp_ssh/mcp_server.py:1622-1623 (registration)The @mcp.tool() decorator registering ssh_cancel_async_task as an MCP tool.
@mcp.tool() def ssh_cancel_async_task(task_id: str = "", ctx: Context | None = None) -> ToolResult: - src/mcp_ssh/mcp_server.py:293-326 (schema)The _validate_task_id() helper that validates the task_id input parameter for the ssh_cancel_async_task tool.
def _validate_task_id(task_id: str) -> tuple[bool, str]: """Validate task_id parameter. Security: Validates task_id format. - Length limit: 200 characters - Format validation (expected: alias:hash:timestamp) - Cannot be empty Args: task_id: Task ID string to validate Returns: Tuple of (is_valid, error_message) If valid: (True, "") If invalid: (False, error_message) """ if not task_id or not task_id.strip(): return False, "task_id is required" task_id = task_id.strip() # Length validation if len(task_id) > MAX_TASK_ID_LENGTH: return False, f"task_id too long (max {MAX_TASK_ID_LENGTH} characters)" # Format validation: should match pattern alias:hash:timestamp # Allow alphanumeric, colon, dash, underscore if not re.match(r"^[a-zA-Z0-9:_-]+$", task_id): return ( False, "task_id contains invalid characters (only alphanumeric, colon, dash, underscore allowed)", ) return True, "" - tests/test_server_tools.py:230-248 (helper)Tests for the ssh_cancel_async_task tool, covering invalid task ID and missing task_id cases.
def test_ssh_cancel_async_task_invalid_task(): """Test ssh_cancel_async_task with invalid task ID.""" result = mcp_server.ssh_cancel_async_task(task_id="invalid:task:id") assert isinstance(result, dict) assert result.get("cancelled") is False assert ( "not found" in result.get("message", "").lower() or "not cancellable" in result.get("message", "").lower() ) def test_ssh_cancel_async_task_no_task_id(): """Test ssh_cancel_async_task without task_id.""" result = mcp_server.ssh_cancel_async_task(task_id="") # Error case - still returns string assert isinstance(result, str) or ( isinstance(result, dict) and "error" in str(result).lower() ) assert "required" in str(result).lower()