Skip to main content
Glama
samerfarida

MCP SSH Orchestrator

ssh_run_async

Execute SSH commands asynchronously on remote servers, returning a task ID for monitoring progress and retrieving results through separate polling tools.

Instructions

Start SSH command asynchronously (SEP-1686 compliant).

Returns immediately with task_id for polling. Use ssh_get_task_status
and ssh_get_task_result to monitor and retrieve results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aliasNo
commandNo

Implementation Reference

  • Primary handler function for ssh_run_async tool. Decorated with @mcp.tool() for MCP registration. Performs input validation, policy and network checks, creates SSHClient instance, defines progress callback, builds notification handler, and delegates execution to ASYNC_TASKS.start_async_task for asynchronous operation.
    @mcp.tool()
    async def ssh_run_async(
        alias: str = "", command: str = "", ctx: Context | None = None
    ) -> ToolResult:
        """Start SSH command asynchronously (SEP-1686 compliant).
    
        Returns immediately with task_id for polling. Use ssh_get_task_status
        and ssh_get_task_result to monitor and retrieve results.
        """
        try:
            # Input validation
            valid, error_msg = _validate_alias(alias)
            if not valid:
                return f"Error: {error_msg}"
    
            valid, error_msg = _validate_command(command)
            if not valid:
                return f"Error: {error_msg}"
    
            # Normalize after validation
            alias = alias.strip()
            command = command.strip()
    
            host = config.get_host(alias)
            hostname = host.get("host", "")
            cmd_hash = hash_command(command)
            tags = config.get_host_tags(alias)
            pol = Policy(config.get_policy())
    
            # Command policy
            allowed = pol.is_allowed(alias, tags, command)
            pol.log_decision(alias, cmd_hash, allowed)
            if not allowed:
                return json.dumps(
                    _policy_denied_response(alias, command, cmd_hash),
                    indent=2,
                )
    
            # Network precheck (DNS -> allowlist)
            ok, reason = _precheck_network(pol, hostname)
            if not ok:
                return json.dumps(
                    _network_denied_response(alias, hostname, reason),
                    indent=2,
                )
    
            limits = pol.limits_for(alias, tags)
            require_known_host_config = bool(
                limits.get("require_known_host", pol.require_known_host())
            )
            # Security: Always require known_host for security (CWE-295)
            if not require_known_host_config:
                log_json(
                    {
                        "level": "warn",
                        "msg": "deprecation_warning",
                        "type": "host_key_policy_deprecated",
                        "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.",
                        "alias": alias,
                        "cwe": "CWE-295",
                    }
                )
            require_known_host = True  # Always enforce strict host key verification
    
            # Create SSH client
            client = _client_for(alias, limits, require_known_host)
    
            # Enhanced progress callback for async tasks
            def progress_cb(phase: str, bytes_read: int, elapsed_ms: int) -> None:
                pol.log_progress(
                    f"async:{alias}:{cmd_hash}", phase, int(bytes_read), int(elapsed_ms)
                )
    
            current_loop: asyncio.AbstractEventLoop | None = None
            if ctx is not None:
                try:
                    current_loop = asyncio.get_running_loop()
                except RuntimeError:
                    current_loop = None
    
            notification_handler = _build_notification_handler(ctx, current_loop)
    
            # Start async task
            task_id = ASYNC_TASKS.start_async_task(
                alias=alias,
                command=command,
                ssh_client=client,
                limits=limits,
                progress_cb=progress_cb,
                notification_handler=notification_handler,
            )
    
            # Return SEP-1686 compliant response
            result = {
                "task_id": task_id,
                "status": "pending",
                "keepAlive": int(limits.get("task_result_ttl", 300)),
                "pollFrequency": int(limits.get("task_progress_interval", 5)),
                "alias": alias,
                "command": command,
                "hash": cmd_hash,
            }
            return result
    
        except Exception as e:
            error_str = str(e)
            log_json({"level": "error", "msg": "async_run_exception", "error": error_str})
            return f"Async run error: {sanitize_error(error_str)}"
  • ASYNC_TASKS.start_async_task method in AsyncTaskManager class. Initializes task state, starts background thread for execution via _execute_task_in_thread, sends 'created' notification, and returns task_id immediately. This is the core async execution starter invoked by the handler.
    def start_async_task(
        self,
        alias: str,
        command: str,
        ssh_client,
        limits: dict[str, Any],
        progress_cb: Callable | None = None,
        notification_handler: Callable[[str, str, dict[str, Any]], None] | None = None,
    ) -> str:
        """Start task in background thread, return task_id immediately."""
        cmd_hash = hash_command(command)
        timestamp = int(time.time() * 1000000)
        task_id = f"{alias}:{cmd_hash}:{timestamp}"
    
        with self._lock:
            self._tasks[task_id] = {
                "status": "pending",
                "alias": alias,
                "command": command,
                "hash": cmd_hash,
                "created": time.time(),
                "started": None,
                "completed": None,
                "exit_code": None,
                "duration_ms": 0,
                "cancelled": False,
                "timeout": False,
                "bytes_out": 0,
                "bytes_err": 0,
                "target_ip": "",
                "output": "",
                "error": None,
                "cancel": threading.Event(),
                "thread": None,
                "limits": limits,
                "progress_cb": progress_cb,
                "ssh_client": ssh_client,
                "notification_handler": notification_handler,
            }
            self._output_buffers[task_id] = deque(maxlen=1000)  # Keep last 1000 lines
    
        # Start background execution
        thread = threading.Thread(
            target=self._execute_task_in_thread, args=(task_id,), daemon=True
        )
        thread.start()
    
        with self._lock:
            self._tasks[task_id]["thread"] = thread
    
        # Send creation notification
        self._send_notification(
            "created",
            task_id,
            {"alias": alias, "command": command, "status": "pending"},
        )
    
        return task_id
  • _execute_task_in_thread method performs the actual SSH execution in background thread. Updates task status, invokes SSHClient.run_streaming with limits and callbacks, stores results, sends notifications, handles errors.
    def _execute_task_in_thread(self, task_id: str):
        """Background thread worker for async task execution."""
        try:
            with self._lock:
                task_info = self._tasks.get(task_id)
                if not task_info:
                    return
    
                # Update status to running
                task_info["status"] = "running"
                task_info["started"] = time.time()
    
                command = task_info["command"]
                ssh_client = task_info["ssh_client"]
                limits = task_info["limits"]
                progress_cb = task_info["progress_cb"]
                cancel_event = task_info["cancel"]
    
            # Enhanced progress callback that captures output
            def enhanced_progress_cb(phase: str, bytes_read: int, elapsed_ms: int):
                if progress_cb:
                    progress_cb(phase, bytes_read, elapsed_ms)
    
                # Send progress notification every 5 seconds
                if (
                    phase == "running" and elapsed_ms % 5000 < 100
                ):  # ~5 second intervals
                    self._send_notification(
                        "progress",
                        task_id,
                        {
                            "phase": phase,
                            "bytes_read": bytes_read,
                            "elapsed_ms": elapsed_ms,
                            "max_seconds": int(limits.get("max_seconds", 60)),
                            "output_lines": len(
                                self._output_buffers.get(task_id, deque())
                            ),
                        },
                    )
    
            # Execute SSH command
            (
                exit_code,
                duration_ms,
                cancelled,
                timeout,
                bytes_out,
                bytes_err,
                combined,
                peer_ip,
            ) = ssh_client.run_streaming(
                command=command,
                cancel_event=cancel_event,
                max_seconds=int(limits.get("max_seconds", 60)),
                max_output_bytes=int(limits.get("max_output_bytes", 1024 * 1024)),
                progress_cb=enhanced_progress_cb,
            )
    
            # Update task with results
            with self._lock:
                if task_id in self._tasks:
                    task_info = self._tasks[task_id]
                    task_info["status"] = "completed" if exit_code == 0 else "failed"
                    task_info["completed"] = time.time()
                    task_info["exit_code"] = exit_code
                    task_info["duration_ms"] = duration_ms
                    task_info["cancelled"] = cancelled
                    task_info["timeout"] = timeout
                    task_info["bytes_out"] = bytes_out
                    task_info["bytes_err"] = bytes_err
                    task_info["target_ip"] = peer_ip
                    task_info["output"] = combined
    
                    # Store result with TTL
                    ttl = int(limits.get("task_result_ttl", 300))  # 5 minutes default
                    self._results[task_id] = {
                        "task_id": task_id,
                        "alias": task_info["alias"],
                        "command": task_info["command"],
                        "hash": task_info["hash"],
                        "status": task_info["status"],
                        "exit_code": exit_code,
                        "duration_ms": duration_ms,
                        "cancelled": cancelled,
                        "timeout": timeout,
                        "target_ip": peer_ip,
                        "output": combined,
                        "created": time.time(),
                        "expires": time.time() + ttl,
                        "max_seconds": int(limits.get("max_seconds", 60)),
                    }
    
            # Send completion notification
            event_type = "completed" if exit_code == 0 else "failed"
            self._send_notification(
                event_type,
                task_id,
                {
                    "exit_code": exit_code,
                    "duration_ms": duration_ms,
                    "cancelled": cancelled,
                    "timeout": timeout,
                    "target_ip": peer_ip,
                    "max_seconds": int(limits.get("max_seconds", 60)),
                },
            )
    
        except Exception as e:
            # Mark as failed with sanitized error message
            error_str = str(e)
            sanitized_error = sanitize_error(error_str)
            with self._lock:
                if task_id in self._tasks:
                    self._tasks[task_id]["status"] = "failed"
                    self._tasks[task_id]["error"] = sanitized_error
                    self._tasks[task_id]["completed"] = time.time()
                    # Store sanitized error in output for consistency
                    if task_id in self._output_buffers:
                        self._output_buffers[task_id].append(sanitized_error)
    
            # Send failure notification with sanitized error
            self._send_notification(
                "failed",
                task_id,
                {
                    "error": sanitized_error,
                    "max_seconds": int(
                        self._tasks.get(task_id, {})
                        .get("limits", {})
                        .get("max_seconds", 60)
                    ),
                },
            )
    
    def get_task_status(self, task_id: str) -> dict[str, Any] | None:
  • SSHClient.run_streaming executes the remote command via Paramiko, handles streaming stdout/stderr with size limits, cancellation, timeout, progress callbacks, and returns detailed results including peer IP.
    def run_streaming(
        self,
        command: str,
        cancel_event,
        max_seconds: int,
        max_output_bytes: int,
        progress_cb=None,
    ):
        """Execute command with streaming, cancellation, timeout, and size caps.
    
        Returns: (exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip)
        """
        start = time.time()
        out_buf = bytearray()
        err_buf = bytearray()
        exit_code = -1
        cancelled = False
        timeout = False
        peer_ip = ""
        client = None
        try:
            if progress_cb:
                progress_cb("connecting", 0, int((time.time() - start) * 1000))
            client, peer_ip = self._connect()
            if progress_cb:
                progress_cb("connected", 0, int((time.time() - start) * 1000))
    
            transport = client.get_transport()
            chan = transport.open_session()
            chan.settimeout(1.0)
            chan.exec_command(command)
    
            last_progress = time.time()
            while True:
                if cancel_event and cancel_event.is_set():
                    cancelled = True
                    try:
                        chan.close()
                    except Exception:
                        pass
                    break
    
                now = time.time()
                elapsed = now - start
                if max_seconds and elapsed > max_seconds:
                    timeout = True
                    try:
                        chan.close()
                    except Exception:
                        pass
                    break
    
                if chan.recv_ready():
                    chunk = chan.recv(4096)
                    if chunk:
                        out_buf.extend(chunk)
                        if len(out_buf) > max_output_bytes:
                            out_buf = out_buf[:max_output_bytes]
                    if progress_cb and (now - last_progress) > 0.5:
                        progress_cb("running", len(out_buf), int(elapsed * 1000))
                        last_progress = now
    
                if chan.recv_stderr_ready():
                    chunk = chan.recv_stderr(4096)
                    if chunk:
                        err_buf.extend(chunk)
                        if len(err_buf) > max_output_bytes:
                            err_buf = err_buf[:max_output_bytes]
                    if progress_cb and (now - last_progress) > 0.5:
                        progress_cb(
                            "running", len(out_buf) + len(err_buf), int(elapsed * 1000)
                        )
                        last_progress = now
    
                if (
                    chan.exit_status_ready()
                    and not chan.recv_ready()
                    and not chan.recv_stderr_ready()
                ):
                    exit_code = chan.recv_exit_status()
                    break
    
                time.sleep(0.05)
    
        except Exception as e:
            traceback.print_exc(file=sys.stderr)
            err_buf.extend(str(e).encode("utf-8", errors="ignore"))
        finally:
            try:
                if client:
                    client.close()
            except Exception:
                pass
    
        duration_ms = int((time.time() - start) * 1000)
        out_txt = out_buf.decode("utf-8", errors="replace")
        err_txt = err_buf.decode("utf-8", errors="replace")
        combined = (out_txt + ("\n" if out_txt and err_txt else "") + err_txt).strip()
        return (
            exit_code,
            duration_ms,
            cancelled,
            timeout,
            len(out_buf),
            len(err_buf),
            combined,
            peer_ip,
        )
  • @mcp.tool() decorator registers ssh_run_async as an MCP tool on the FastMCP instance.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server