Skip to main content
Glama
samerfarida

MCP SSH Orchestrator

ssh_get_task_result

Retrieve final execution results, output, and metadata for completed SSH tasks to verify command outcomes and audit infrastructure operations.

Instructions

Get final result of completed task (SEP-1686 compliant).

Returns complete output, exit code, and execution metadata.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
task_idNo

Implementation Reference

  • Primary handler for the 'ssh_get_task_result' MCP tool. Performs input validation on task_id, retrieves the final task result from the ASYNC_TASKS manager, logs activity, and returns the result or an appropriate error message.
    @mcp.tool()
    def ssh_get_task_result(task_id: str = "", ctx: Context | None = None) -> ToolResult:
        """Get final result of completed task (SEP-1686 compliant).
    
        Returns complete output, exit code, and execution metadata.
        """
        try:
            # Input validation
            valid, error_msg = _validate_task_id(task_id)
            if not valid:
                return f"Error: {error_msg}"
    
            task_id = task_id.strip()
            result = ASYNC_TASKS.get_task_result(task_id)
            if not result:
                return f"Error: Task not found or expired: {task_id}"
    
            _ctx_log(ctx, "debug", "ssh_get_task_result", {"task_id": task_id})
            return result
    
        except Exception as e:
            error_str = str(e)
            log_json({"level": "error", "msg": "result_exception", "error": error_str})
            _ctx_log(
                ctx,
                "debug",
                "ssh_get_task_result_error",
                {"task_id": task_id.strip(), "error": sanitize_error(error_str)},
            )
            return f"Result error: {sanitize_error(error_str)}"
  • Core helper method in AsyncTaskManager.get_task_result() that fetches and formats the stored task result from _results dict if not expired by TTL, providing the complete execution metadata and output.
    def get_task_result(self, task_id: str) -> dict[str, Any] | None:
        """Get final result if completed."""
        with self._lock:
            result = self._results.get(task_id)
            if result and result["expires"] > time.time():
                return {
                    "task_id": task_id,
                    "status": result["status"],
                    "exit_code": result["exit_code"],
                    "duration_ms": result["duration_ms"],
                    "output": result["output"],
                    "cancelled": result["cancelled"],
                    "timeout": result["timeout"],
                    "target_ip": result["target_ip"],
                    "max_seconds": result.get("max_seconds"),
                }
            return None
  • The @mcp.tool() decorator registers the ssh_get_task_result function as an MCP tool with the FastMCP server instance.
    @mcp.tool()
  • Global ASYNC_TASKS instance of AsyncTaskManager used by the tool handler to manage and retrieve async task results.
    ASYNC_TASKS = AsyncTaskManager()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server