Skip to main content
Glama
samerfarida

MCP SSH Orchestrator

ssh_run_async

Execute SSH commands asynchronously on remote servers, returning a task ID for monitoring progress and retrieving results through separate polling tools.

Instructions

Start SSH command asynchronously (SEP-1686 compliant).

Returns immediately with task_id for polling. Use ssh_get_task_status
and ssh_get_task_result to monitor and retrieve results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aliasNo
commandNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • Primary handler function for ssh_run_async tool. Decorated with @mcp.tool() for MCP registration. Performs input validation, policy and network checks, creates SSHClient instance, defines progress callback, builds notification handler, and delegates execution to ASYNC_TASKS.start_async_task for asynchronous operation.
    @mcp.tool()
    async def ssh_run_async(
        alias: str = "", command: str = "", ctx: Context | None = None
    ) -> ToolResult:
        """Start SSH command asynchronously (SEP-1686 compliant).
    
        Returns immediately with task_id for polling. Use ssh_get_task_status
        and ssh_get_task_result to monitor and retrieve results.
        """
        try:
            # Input validation
            valid, error_msg = _validate_alias(alias)
            if not valid:
                return f"Error: {error_msg}"
    
            valid, error_msg = _validate_command(command)
            if not valid:
                return f"Error: {error_msg}"
    
            # Normalize after validation
            alias = alias.strip()
            command = command.strip()
    
            host = config.get_host(alias)
            hostname = host.get("host", "")
            cmd_hash = hash_command(command)
            tags = config.get_host_tags(alias)
            pol = Policy(config.get_policy())
    
            # Command policy
            allowed = pol.is_allowed(alias, tags, command)
            pol.log_decision(alias, cmd_hash, allowed)
            if not allowed:
                return json.dumps(
                    _policy_denied_response(alias, command, cmd_hash),
                    indent=2,
                )
    
            # Network precheck (DNS -> allowlist)
            ok, reason = _precheck_network(pol, hostname)
            if not ok:
                return json.dumps(
                    _network_denied_response(alias, hostname, reason),
                    indent=2,
                )
    
            limits = pol.limits_for(alias, tags)
            require_known_host_config = bool(
                limits.get("require_known_host", pol.require_known_host())
            )
            # Security: Always require known_host for security (CWE-295)
            if not require_known_host_config:
                log_json(
                    {
                        "level": "warn",
                        "msg": "deprecation_warning",
                        "type": "host_key_policy_deprecated",
                        "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.",
                        "alias": alias,
                        "cwe": "CWE-295",
                    }
                )
            require_known_host = True  # Always enforce strict host key verification
    
            # Create SSH client
            client = _client_for(alias, limits, require_known_host)
    
            # Enhanced progress callback for async tasks
            def progress_cb(phase: str, bytes_read: int, elapsed_ms: int) -> None:
                pol.log_progress(
                    f"async:{alias}:{cmd_hash}", phase, int(bytes_read), int(elapsed_ms)
                )
    
            current_loop: asyncio.AbstractEventLoop | None = None
            if ctx is not None:
                try:
                    current_loop = asyncio.get_running_loop()
                except RuntimeError:
                    current_loop = None
    
            notification_handler = _build_notification_handler(ctx, current_loop)
    
            # Start async task
            task_id = ASYNC_TASKS.start_async_task(
                alias=alias,
                command=command,
                ssh_client=client,
                limits=limits,
                progress_cb=progress_cb,
                notification_handler=notification_handler,
            )
    
            # Return SEP-1686 compliant response
            result = {
                "task_id": task_id,
                "status": "pending",
                "keepAlive": int(limits.get("task_result_ttl", 300)),
                "pollFrequency": int(limits.get("task_progress_interval", 5)),
                "alias": alias,
                "command": command,
                "hash": cmd_hash,
            }
            return result
    
        except Exception as e:
            error_str = str(e)
            log_json({"level": "error", "msg": "async_run_exception", "error": error_str})
            return f"Async run error: {sanitize_error(error_str)}"
  • ASYNC_TASKS.start_async_task method in AsyncTaskManager class. Initializes task state, starts background thread for execution via _execute_task_in_thread, sends 'created' notification, and returns task_id immediately. This is the core async execution starter invoked by the handler.
    def start_async_task(
        self,
        alias: str,
        command: str,
        ssh_client,
        limits: dict[str, Any],
        progress_cb: Callable | None = None,
        notification_handler: Callable[[str, str, dict[str, Any]], None] | None = None,
    ) -> str:
        """Start task in background thread, return task_id immediately."""
        cmd_hash = hash_command(command)
        timestamp = int(time.time() * 1000000)
        task_id = f"{alias}:{cmd_hash}:{timestamp}"
    
        with self._lock:
            self._tasks[task_id] = {
                "status": "pending",
                "alias": alias,
                "command": command,
                "hash": cmd_hash,
                "created": time.time(),
                "started": None,
                "completed": None,
                "exit_code": None,
                "duration_ms": 0,
                "cancelled": False,
                "timeout": False,
                "bytes_out": 0,
                "bytes_err": 0,
                "target_ip": "",
                "output": "",
                "error": None,
                "cancel": threading.Event(),
                "thread": None,
                "limits": limits,
                "progress_cb": progress_cb,
                "ssh_client": ssh_client,
                "notification_handler": notification_handler,
            }
            self._output_buffers[task_id] = deque(maxlen=1000)  # Keep last 1000 lines
    
        # Start background execution
        thread = threading.Thread(
            target=self._execute_task_in_thread, args=(task_id,), daemon=True
        )
        thread.start()
    
        with self._lock:
            self._tasks[task_id]["thread"] = thread
    
        # Send creation notification
        self._send_notification(
            "created",
            task_id,
            {"alias": alias, "command": command, "status": "pending"},
        )
    
        return task_id
  • _execute_task_in_thread method performs the actual SSH execution in background thread. Updates task status, invokes SSHClient.run_streaming with limits and callbacks, stores results, sends notifications, handles errors.
    def _execute_task_in_thread(self, task_id: str):
        """Background thread worker for async task execution."""
        try:
            with self._lock:
                task_info = self._tasks.get(task_id)
                if not task_info:
                    return
    
                # Update status to running
                task_info["status"] = "running"
                task_info["started"] = time.time()
    
                command = task_info["command"]
                ssh_client = task_info["ssh_client"]
                limits = task_info["limits"]
                progress_cb = task_info["progress_cb"]
                cancel_event = task_info["cancel"]
    
            # Enhanced progress callback that captures output
            def enhanced_progress_cb(phase: str, bytes_read: int, elapsed_ms: int):
                if progress_cb:
                    progress_cb(phase, bytes_read, elapsed_ms)
    
                # Send progress notification every 5 seconds
                if (
                    phase == "running" and elapsed_ms % 5000 < 100
                ):  # ~5 second intervals
                    self._send_notification(
                        "progress",
                        task_id,
                        {
                            "phase": phase,
                            "bytes_read": bytes_read,
                            "elapsed_ms": elapsed_ms,
                            "max_seconds": int(limits.get("max_seconds", 60)),
                            "output_lines": len(
                                self._output_buffers.get(task_id, deque())
                            ),
                        },
                    )
    
            # Execute SSH command
            (
                exit_code,
                duration_ms,
                cancelled,
                timeout,
                bytes_out,
                bytes_err,
                combined,
                peer_ip,
            ) = ssh_client.run_streaming(
                command=command,
                cancel_event=cancel_event,
                max_seconds=int(limits.get("max_seconds", 60)),
                max_output_bytes=int(limits.get("max_output_bytes", 1024 * 1024)),
                progress_cb=enhanced_progress_cb,
            )
    
            # Update task with results
            with self._lock:
                if task_id in self._tasks:
                    task_info = self._tasks[task_id]
                    task_info["status"] = "completed" if exit_code == 0 else "failed"
                    task_info["completed"] = time.time()
                    task_info["exit_code"] = exit_code
                    task_info["duration_ms"] = duration_ms
                    task_info["cancelled"] = cancelled
                    task_info["timeout"] = timeout
                    task_info["bytes_out"] = bytes_out
                    task_info["bytes_err"] = bytes_err
                    task_info["target_ip"] = peer_ip
                    task_info["output"] = combined
    
                    # Store result with TTL
                    ttl = int(limits.get("task_result_ttl", 300))  # 5 minutes default
                    self._results[task_id] = {
                        "task_id": task_id,
                        "alias": task_info["alias"],
                        "command": task_info["command"],
                        "hash": task_info["hash"],
                        "status": task_info["status"],
                        "exit_code": exit_code,
                        "duration_ms": duration_ms,
                        "cancelled": cancelled,
                        "timeout": timeout,
                        "target_ip": peer_ip,
                        "output": combined,
                        "created": time.time(),
                        "expires": time.time() + ttl,
                        "max_seconds": int(limits.get("max_seconds", 60)),
                    }
    
            # Send completion notification
            event_type = "completed" if exit_code == 0 else "failed"
            self._send_notification(
                event_type,
                task_id,
                {
                    "exit_code": exit_code,
                    "duration_ms": duration_ms,
                    "cancelled": cancelled,
                    "timeout": timeout,
                    "target_ip": peer_ip,
                    "max_seconds": int(limits.get("max_seconds", 60)),
                },
            )
    
        except Exception as e:
            # Mark as failed with sanitized error message
            error_str = str(e)
            sanitized_error = sanitize_error(error_str)
            with self._lock:
                if task_id in self._tasks:
                    self._tasks[task_id]["status"] = "failed"
                    self._tasks[task_id]["error"] = sanitized_error
                    self._tasks[task_id]["completed"] = time.time()
                    # Store sanitized error in output for consistency
                    if task_id in self._output_buffers:
                        self._output_buffers[task_id].append(sanitized_error)
    
            # Send failure notification with sanitized error
            self._send_notification(
                "failed",
                task_id,
                {
                    "error": sanitized_error,
                    "max_seconds": int(
                        self._tasks.get(task_id, {})
                        .get("limits", {})
                        .get("max_seconds", 60)
                    ),
                },
            )
    
    def get_task_status(self, task_id: str) -> dict[str, Any] | None:
  • SSHClient.run_streaming executes the remote command via Paramiko, handles streaming stdout/stderr with size limits, cancellation, timeout, progress callbacks, and returns detailed results including peer IP.
    def run_streaming(
        self,
        command: str,
        cancel_event,
        max_seconds: int,
        max_output_bytes: int,
        progress_cb=None,
    ):
        """Execute command with streaming, cancellation, timeout, and size caps.
    
        Returns: (exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip)
        """
        start = time.time()
        out_buf = bytearray()
        err_buf = bytearray()
        exit_code = -1
        cancelled = False
        timeout = False
        peer_ip = ""
        client = None
        try:
            if progress_cb:
                progress_cb("connecting", 0, int((time.time() - start) * 1000))
            client, peer_ip = self._connect()
            if progress_cb:
                progress_cb("connected", 0, int((time.time() - start) * 1000))
    
            transport = client.get_transport()
            chan = transport.open_session()
            chan.settimeout(1.0)
            chan.exec_command(command)
    
            last_progress = time.time()
            while True:
                if cancel_event and cancel_event.is_set():
                    cancelled = True
                    try:
                        chan.close()
                    except Exception:
                        pass
                    break
    
                now = time.time()
                elapsed = now - start
                if max_seconds and elapsed > max_seconds:
                    timeout = True
                    try:
                        chan.close()
                    except Exception:
                        pass
                    break
    
                if chan.recv_ready():
                    chunk = chan.recv(4096)
                    if chunk:
                        out_buf.extend(chunk)
                        if len(out_buf) > max_output_bytes:
                            out_buf = out_buf[:max_output_bytes]
                    if progress_cb and (now - last_progress) > 0.5:
                        progress_cb("running", len(out_buf), int(elapsed * 1000))
                        last_progress = now
    
                if chan.recv_stderr_ready():
                    chunk = chan.recv_stderr(4096)
                    if chunk:
                        err_buf.extend(chunk)
                        if len(err_buf) > max_output_bytes:
                            err_buf = err_buf[:max_output_bytes]
                    if progress_cb and (now - last_progress) > 0.5:
                        progress_cb(
                            "running", len(out_buf) + len(err_buf), int(elapsed * 1000)
                        )
                        last_progress = now
    
                if (
                    chan.exit_status_ready()
                    and not chan.recv_ready()
                    and not chan.recv_stderr_ready()
                ):
                    exit_code = chan.recv_exit_status()
                    break
    
                time.sleep(0.05)
    
        except Exception as e:
            traceback.print_exc(file=sys.stderr)
            err_buf.extend(str(e).encode("utf-8", errors="ignore"))
        finally:
            try:
                if client:
                    client.close()
            except Exception:
                pass
    
        duration_ms = int((time.time() - start) * 1000)
        out_txt = out_buf.decode("utf-8", errors="replace")
        err_txt = err_buf.decode("utf-8", errors="replace")
        combined = (out_txt + ("\n" if out_txt and err_txt else "") + err_txt).strip()
        return (
            exit_code,
            duration_ms,
            cancelled,
            timeout,
            len(out_buf),
            len(err_buf),
            combined,
            peer_ip,
        )
  • @mcp.tool() decorator registers ssh_run_async as an MCP tool on the FastMCP instance.
    @mcp.tool()
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It effectively describes key behavioral traits: the asynchronous nature ('Returns immediately'), the polling mechanism ('task_id for polling'), and the workflow dependencies ('Use ssh_get_task_status and ssh_get_task_result'). However, it doesn't mention potential side effects, error handling, or execution environment details.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is highly concise and well-structured: three sentences that efficiently convey the tool's purpose, return behavior, and usage guidelines. Every sentence adds essential information with zero waste, making it easy to parse and understand.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (asynchronous execution with polling), no annotations, and an output schema (implied by 'Returns immediately with task_id'), the description is mostly complete. It explains the asynchronous behavior and workflow but lacks details on error cases, timeouts, or what the task_id represents structurally. The output schema likely covers the return format, so this is adequate.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides no information about the two parameters (alias and command), and schema description coverage is 0%. While the description doesn't add parameter semantics, the parameters are straightforward (string inputs for alias and command), and the tool's purpose implies their use. The baseline score of 3 reflects that the schema covers the parameters structurally, though without descriptions.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Start SSH command asynchronously' with the specific verb 'Start' and resource 'SSH command'. It distinguishes from synchronous siblings like 'ssh_run' by specifying 'asynchronously' and mentions SEP-1686 compliance, though it doesn't explicitly differentiate from all siblings like 'ssh_run_on_tag'.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit guidance on when to use this tool: 'Returns immediately with task_id for polling' and directs to specific alternatives: 'Use ssh_get_task_status and ssh_get_task_result to monitor and retrieve results.' This clearly defines the asynchronous workflow and names the complementary tools.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server