ssh_get_task_output
Retrieve recent output lines from running or completed SSH tasks to monitor execution progress and results in server fleet management.
Instructions
Get recent output lines from running or completed task.
Enhanced beyond SEP-1686: enables streaming output visibility.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| task_id | No | ||
| max_lines | No |
Implementation Reference
- src/mcp_ssh/mcp_server.py:1580-1620 (handler)The main handler function for the 'ssh_get_task_output' MCP tool. It performs input validation on task_id and max_lines, then delegates to ASYNC_TASKS.get_task_output to retrieve recent output lines from the task.@mcp.tool() def ssh_get_task_output( task_id: str = "", max_lines: int = 50, ctx: Context | None = None ) -> ToolResult: """Get recent output lines from running or completed task. Enhanced beyond SEP-1686: enables streaming output visibility. """ try: # Input validation valid, error_msg = _validate_task_id(task_id) if not valid: return f"Error: {error_msg}" if max_lines < 1 or max_lines > 1000: return "Error: max_lines must be between 1 and 1000" task_id = task_id.strip() output = ASYNC_TASKS.get_task_output(task_id, max_lines) if not output: return f"Error: Task not found or no output available: {task_id}" _ctx_log( ctx, "debug", "ssh_get_task_output", {"task_id": task_id, "max_lines": max_lines}, ) return output except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "output_exception", "error": error_str}) _ctx_log( ctx, "debug", "ssh_get_task_output_error", {"task_id": task_id.strip(), "error": sanitize_error(error_str)}, ) return f"Output error: {sanitize_error(error_str)}"
- The core helper method in AsyncTaskManager that implements the logic for retrieving recent output lines from a task's output buffer (for running tasks) or stored result (for completed tasks).self, task_id: str, max_lines: int = 50 ) -> dict[str, Any] | None: """Get recent output lines.""" with self._lock: # First check if task is still running and has output buffer output_buffer = self._output_buffers.get(task_id) if output_buffer and len(output_buffer) > 0: # Convert deque to list and get recent lines all_lines = list(output_buffer) recent_lines = ( all_lines[-max_lines:] if len(all_lines) > max_lines else all_lines ) return { "task_id": task_id, "output_lines": recent_lines, "total_lines": len(all_lines), "has_more": len(all_lines) > max_lines, } # If no output buffer or empty buffer, check if task is completed and has result result = self._results.get(task_id) if result and result["expires"] > time.time(): # Split the output into lines and return recent ones output_text = result.get("output", "") all_lines = output_text.split("\n") if output_text else [] recent_lines = ( all_lines[-max_lines:] if len(all_lines) > max_lines else all_lines ) return { "task_id": task_id, "output_lines": recent_lines, "total_lines": len(all_lines), "has_more": len(all_lines) > max_lines, } # Also check if task is still in _tasks but completed (no output buffer) task_info = self._tasks.get(task_id) if task_info and task_info.get("output"): # Split the output into lines and return recent ones output_text = task_info.get("output", "") all_lines = output_text.split("\n") if output_text else [] recent_lines = ( all_lines[-max_lines:] if len(all_lines) > max_lines else all_lines ) return { "task_id": task_id, "output_lines": recent_lines, "total_lines": len(all_lines), "has_more": len(all_lines) > max_lines, } return None
- Global ASYNC_TASKS instance of AsyncTaskManager used by the tool handler.ASYNC_TASKS = AsyncTaskManager()
- src/mcp_ssh/mcp_server.py:1580-1620 (registration)The @mcp.tool() decorator registers ssh_get_task_output as an MCP tool with FastMCP.@mcp.tool() def ssh_get_task_output( task_id: str = "", max_lines: int = 50, ctx: Context | None = None ) -> ToolResult: """Get recent output lines from running or completed task. Enhanced beyond SEP-1686: enables streaming output visibility. """ try: # Input validation valid, error_msg = _validate_task_id(task_id) if not valid: return f"Error: {error_msg}" if max_lines < 1 or max_lines > 1000: return "Error: max_lines must be between 1 and 1000" task_id = task_id.strip() output = ASYNC_TASKS.get_task_output(task_id, max_lines) if not output: return f"Error: Task not found or no output available: {task_id}" _ctx_log( ctx, "debug", "ssh_get_task_output", {"task_id": task_id, "max_lines": max_lines}, ) return output except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "output_exception", "error": error_str}) _ctx_log( ctx, "debug", "ssh_get_task_output_error", {"task_id": task_id.strip(), "error": sanitize_error(error_str)}, ) return f"Output error: {sanitize_error(error_str)}"