ssh_get_task_status
Check the current status of asynchronous SSH tasks, including state, progress, elapsed time, and output summary, for monitoring infrastructure operations.
Instructions
Get current status of an async task (SEP-1686 compliant).
Returns task state, progress, elapsed time, and output summary.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | No |
Implementation Reference
- src/mcp_ssh/mcp_server.py:1517-1546 (handler)The primary handler for the ssh_get_task_status tool. Validates the task_id input and retrieves the task status from the ASYNC_TASKS manager, returning the status dictionary or an error message.def ssh_get_task_status(task_id: str = "", ctx: Context | None = None) -> ToolResult: """Get current status of an async task (SEP-1686 compliant). Returns task state, progress, elapsed time, and output summary. """ try: # Input validation valid, error_msg = _validate_task_id(task_id) if not valid: return f"Error: {error_msg}" task_id = task_id.strip() status = ASYNC_TASKS.get_task_status(task_id) if not status: return f"Error: Task not found: {task_id}" _ctx_log(ctx, "debug", "ssh_get_task_status", {"task_id": task_id}) return status except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "status_exception", "error": error_str}) _ctx_log( ctx, "debug", "ssh_get_task_status_error", {"task_id": task_id.strip(), "error": sanitize_error(error_str)}, ) return f"Status error: {sanitize_error(error_str)}"
- Core helper method in AsyncTaskManager that retrieves and computes the current status of an async task, including progress calculation and SEP-1686 compliant metadata.def get_task_status(self, task_id: str) -> dict[str, Any] | None: """Get current status with SEP-1686 metadata.""" with self._lock: task_info = self._tasks.get(task_id) if not task_info: # Check if result exists (completed task) result = self._results.get(task_id) if result: return { "task_id": task_id, "status": result["status"], "keepAlive": int(result["expires"] - time.time()), "pollFrequency": 5, "progress_percent": 100, "elapsed_ms": result["duration_ms"], "bytes_read": len(result["output"]), "output_lines_available": len( self._output_buffers.get(task_id, deque()) ), } return None # Calculate progress percentage based on elapsed time vs max_seconds elapsed_ms = int((time.time() - task_info["created"]) * 1000) max_seconds = int(task_info["limits"].get("max_seconds", 60)) progress_percent = min(100, int((elapsed_ms / (max_seconds * 1000)) * 100)) return { "task_id": task_id, "status": task_info["status"], "keepAlive": 300, # 5 minutes default "pollFrequency": 5, # 5 seconds "progress_percent": progress_percent, "elapsed_ms": elapsed_ms, "bytes_read": task_info["bytes_out"] + task_info["bytes_err"], "output_lines_available": len( self._output_buffers.get(task_id, deque()) ), }
- src/mcp_ssh/mcp_server.py:1517-1517 (registration)The @mcp.tool() decorator registers the ssh_get_task_status function as an MCP tool.def ssh_get_task_status(task_id: str = "", ctx: Context | None = None) -> ToolResult: