ssh_cancel
Cancel a running SSH task to stop execution immediately.
Instructions
Request cancellation for a running task.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/mcp_ssh/mcp_server.py:1352-1384 (handler)The `ssh_cancel` tool handler function. Validates the task_id, calls TASKS.cancel() to signal cancellation on the synchronous TaskManager, and returns a JSON response indicating success or failure.
@mcp.tool() def ssh_cancel(task_id: str = "", ctx: Context | None = None) -> ToolResult: """Request cancellation for a running task.""" try: # Input validation valid, error_msg = _validate_task_id(task_id) if not valid: return f"Error: {error_msg}" task_id = task_id.strip() ok = TASKS.cancel(task_id) response = { "task_id": task_id, "cancelled": bool(ok), "message": "Cancellation signaled" if ok else "Task not found", } _ctx_log( ctx, "info", "ssh_cancel", {"task_id": task_id, "cancelled": bool(ok)}, ) return response except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "cancel_exception", "error": error_str}) _ctx_log( ctx, "debug", "ssh_cancel_error", {"task_id": task_id.strip(), "error": sanitize_error(error_str)}, ) return f"Cancel error: {sanitize_error(error_str)}" - src/mcp_ssh/mcp_server.py:1352-1354 (registration)The `@mcp.tool()` decorator registers `ssh_cancel` as an MCP tool.
@mcp.tool() def ssh_cancel(task_id: str = "", ctx: Context | None = None) -> ToolResult: """Request cancellation for a running task.""" - src/mcp_ssh/mcp_server.py:293-326 (schema)Validation of the `task_id` input parameter used by `ssh_cancel`, enforcing format, length, and character restrictions.
def _validate_task_id(task_id: str) -> tuple[bool, str]: """Validate task_id parameter. Security: Validates task_id format. - Length limit: 200 characters - Format validation (expected: alias:hash:timestamp) - Cannot be empty Args: task_id: Task ID string to validate Returns: Tuple of (is_valid, error_message) If valid: (True, "") If invalid: (False, error_message) """ if not task_id or not task_id.strip(): return False, "task_id is required" task_id = task_id.strip() # Length validation if len(task_id) > MAX_TASK_ID_LENGTH: return False, f"task_id too long (max {MAX_TASK_ID_LENGTH} characters)" # Format validation: should match pattern alias:hash:timestamp # Allow alphanumeric, colon, dash, underscore if not re.match(r"^[a-zA-Z0-9:_-]+$", task_id): return ( False, "task_id contains invalid characters (only alphanumeric, colon, dash, underscore allowed)", ) return True, "" - The `TaskManager` class (aliased as `TASKS`) provides the `cancel()` method that actually signals cancellation by setting a threading.Event. The `ssh_cancel` handler delegates to `TASKS.cancel(task_id)`.
class TaskManager: """In-memory task registry for cancellation.""" def __init__(self): self._lock = threading.Lock() self._tasks = {} def create(self, alias: str, command_hash: str) -> str: """Create task and return id.""" with self._lock: # Use microsecond precision to avoid collisions timestamp = int(time.time() * 1000000) task_id = f"{alias}:{command_hash}:{timestamp}" self._tasks[task_id] = { "cancel": threading.Event(), "created": time.time(), "alias": alias, "hash": command_hash, } return task_id def cancel(self, task_id: str) -> bool: """Signal cancellation for task id.""" with self._lock: if task_id in self._tasks: self._tasks[task_id]["cancel"].set() return True return False def get_event(self, task_id: str): """Return cancel event for task id.""" with self._lock: t = self._tasks.get(task_id) if t: return t["cancel"] return None def cleanup(self, task_id: str): """Remove task.""" with self._lock: if task_id in self._tasks: del self._tasks[task_id] - tests/test_server_tools.py:126-141 (helper)Tests for `ssh_cancel`: verifies behavior when task_id not found and when task_id is empty.
def test_ssh_cancel_not_found(): """Test cancel tool with non-existent task.""" result = mcp_server.ssh_cancel(task_id="nonexistent") assert isinstance(result, dict) assert result.get("cancelled") is False assert "not found" in result.get("message", "").lower() def test_ssh_cancel_no_task_id(): """Test cancel tool without task_id.""" result = mcp_server.ssh_cancel(task_id="") # Error case - still returns string assert isinstance(result, str) or ( isinstance(result, dict) and "error" in str(result).lower() ) assert "required" in str(result).lower()