Skip to main content
Glama

ssh_run_async

Execute SSH commands asynchronously on remote servers, returning a task ID for monitoring progress and retrieving results through separate polling tools.

Instructions

Start SSH command asynchronously (SEP-1686 compliant).

Returns immediately with task_id for polling. Use ssh_get_task_status and ssh_get_task_result to monitor and retrieve results.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aliasNo
commandNo

Implementation Reference

  • Primary handler function for ssh_run_async tool. Decorated with @mcp.tool() for MCP registration. Performs input validation, policy and network checks, creates SSHClient instance, defines progress callback, builds notification handler, and delegates execution to ASYNC_TASKS.start_async_task for asynchronous operation.
    @mcp.tool() async def ssh_run_async( alias: str = "", command: str = "", ctx: Context | None = None ) -> ToolResult: """Start SSH command asynchronously (SEP-1686 compliant). Returns immediately with task_id for polling. Use ssh_get_task_status and ssh_get_task_result to monitor and retrieve results. """ try: # Input validation valid, error_msg = _validate_alias(alias) if not valid: return f"Error: {error_msg}" valid, error_msg = _validate_command(command) if not valid: return f"Error: {error_msg}" # Normalize after validation alias = alias.strip() command = command.strip() host = config.get_host(alias) hostname = host.get("host", "") cmd_hash = hash_command(command) tags = config.get_host_tags(alias) pol = Policy(config.get_policy()) # Command policy allowed = pol.is_allowed(alias, tags, command) pol.log_decision(alias, cmd_hash, allowed) if not allowed: return json.dumps( _policy_denied_response(alias, command, cmd_hash), indent=2, ) # Network precheck (DNS -> allowlist) ok, reason = _precheck_network(pol, hostname) if not ok: return json.dumps( _network_denied_response(alias, hostname, reason), indent=2, ) limits = pol.limits_for(alias, tags) require_known_host_config = bool( limits.get("require_known_host", pol.require_known_host()) ) # Security: Always require known_host for security (CWE-295) if not require_known_host_config: log_json( { "level": "warn", "msg": "deprecation_warning", "type": "host_key_policy_deprecated", "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.", "alias": alias, "cwe": "CWE-295", } ) require_known_host = True # Always enforce strict host key verification # Create SSH client client = _client_for(alias, limits, require_known_host) # Enhanced progress callback for async tasks def progress_cb(phase: str, bytes_read: int, elapsed_ms: int) -> None: pol.log_progress( f"async:{alias}:{cmd_hash}", phase, int(bytes_read), int(elapsed_ms) ) current_loop: asyncio.AbstractEventLoop | None = None if ctx is not None: try: current_loop = asyncio.get_running_loop() except RuntimeError: current_loop = None notification_handler = _build_notification_handler(ctx, current_loop) # Start async task task_id = ASYNC_TASKS.start_async_task( alias=alias, command=command, ssh_client=client, limits=limits, progress_cb=progress_cb, notification_handler=notification_handler, ) # Return SEP-1686 compliant response result = { "task_id": task_id, "status": "pending", "keepAlive": int(limits.get("task_result_ttl", 300)), "pollFrequency": int(limits.get("task_progress_interval", 5)), "alias": alias, "command": command, "hash": cmd_hash, } return result except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "async_run_exception", "error": error_str}) return f"Async run error: {sanitize_error(error_str)}"
  • ASYNC_TASKS.start_async_task method in AsyncTaskManager class. Initializes task state, starts background thread for execution via _execute_task_in_thread, sends 'created' notification, and returns task_id immediately. This is the core async execution starter invoked by the handler.
    def start_async_task( self, alias: str, command: str, ssh_client, limits: dict[str, Any], progress_cb: Callable | None = None, notification_handler: Callable[[str, str, dict[str, Any]], None] | None = None, ) -> str: """Start task in background thread, return task_id immediately.""" cmd_hash = hash_command(command) timestamp = int(time.time() * 1000000) task_id = f"{alias}:{cmd_hash}:{timestamp}" with self._lock: self._tasks[task_id] = { "status": "pending", "alias": alias, "command": command, "hash": cmd_hash, "created": time.time(), "started": None, "completed": None, "exit_code": None, "duration_ms": 0, "cancelled": False, "timeout": False, "bytes_out": 0, "bytes_err": 0, "target_ip": "", "output": "", "error": None, "cancel": threading.Event(), "thread": None, "limits": limits, "progress_cb": progress_cb, "ssh_client": ssh_client, "notification_handler": notification_handler, } self._output_buffers[task_id] = deque(maxlen=1000) # Keep last 1000 lines # Start background execution thread = threading.Thread( target=self._execute_task_in_thread, args=(task_id,), daemon=True ) thread.start() with self._lock: self._tasks[task_id]["thread"] = thread # Send creation notification self._send_notification( "created", task_id, {"alias": alias, "command": command, "status": "pending"}, ) return task_id
  • _execute_task_in_thread method performs the actual SSH execution in background thread. Updates task status, invokes SSHClient.run_streaming with limits and callbacks, stores results, sends notifications, handles errors.
    def _execute_task_in_thread(self, task_id: str): """Background thread worker for async task execution.""" try: with self._lock: task_info = self._tasks.get(task_id) if not task_info: return # Update status to running task_info["status"] = "running" task_info["started"] = time.time() command = task_info["command"] ssh_client = task_info["ssh_client"] limits = task_info["limits"] progress_cb = task_info["progress_cb"] cancel_event = task_info["cancel"] # Enhanced progress callback that captures output def enhanced_progress_cb(phase: str, bytes_read: int, elapsed_ms: int): if progress_cb: progress_cb(phase, bytes_read, elapsed_ms) # Send progress notification every 5 seconds if ( phase == "running" and elapsed_ms % 5000 < 100 ): # ~5 second intervals self._send_notification( "progress", task_id, { "phase": phase, "bytes_read": bytes_read, "elapsed_ms": elapsed_ms, "max_seconds": int(limits.get("max_seconds", 60)), "output_lines": len( self._output_buffers.get(task_id, deque()) ), }, ) # Execute SSH command ( exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip, ) = ssh_client.run_streaming( command=command, cancel_event=cancel_event, max_seconds=int(limits.get("max_seconds", 60)), max_output_bytes=int(limits.get("max_output_bytes", 1024 * 1024)), progress_cb=enhanced_progress_cb, ) # Update task with results with self._lock: if task_id in self._tasks: task_info = self._tasks[task_id] task_info["status"] = "completed" if exit_code == 0 else "failed" task_info["completed"] = time.time() task_info["exit_code"] = exit_code task_info["duration_ms"] = duration_ms task_info["cancelled"] = cancelled task_info["timeout"] = timeout task_info["bytes_out"] = bytes_out task_info["bytes_err"] = bytes_err task_info["target_ip"] = peer_ip task_info["output"] = combined # Store result with TTL ttl = int(limits.get("task_result_ttl", 300)) # 5 minutes default self._results[task_id] = { "task_id": task_id, "alias": task_info["alias"], "command": task_info["command"], "hash": task_info["hash"], "status": task_info["status"], "exit_code": exit_code, "duration_ms": duration_ms, "cancelled": cancelled, "timeout": timeout, "target_ip": peer_ip, "output": combined, "created": time.time(), "expires": time.time() + ttl, "max_seconds": int(limits.get("max_seconds", 60)), } # Send completion notification event_type = "completed" if exit_code == 0 else "failed" self._send_notification( event_type, task_id, { "exit_code": exit_code, "duration_ms": duration_ms, "cancelled": cancelled, "timeout": timeout, "target_ip": peer_ip, "max_seconds": int(limits.get("max_seconds", 60)), }, ) except Exception as e: # Mark as failed with sanitized error message error_str = str(e) sanitized_error = sanitize_error(error_str) with self._lock: if task_id in self._tasks: self._tasks[task_id]["status"] = "failed" self._tasks[task_id]["error"] = sanitized_error self._tasks[task_id]["completed"] = time.time() # Store sanitized error in output for consistency if task_id in self._output_buffers: self._output_buffers[task_id].append(sanitized_error) # Send failure notification with sanitized error self._send_notification( "failed", task_id, { "error": sanitized_error, "max_seconds": int( self._tasks.get(task_id, {}) .get("limits", {}) .get("max_seconds", 60) ), }, ) def get_task_status(self, task_id: str) -> dict[str, Any] | None:
  • SSHClient.run_streaming executes the remote command via Paramiko, handles streaming stdout/stderr with size limits, cancellation, timeout, progress callbacks, and returns detailed results including peer IP.
    def run_streaming( self, command: str, cancel_event, max_seconds: int, max_output_bytes: int, progress_cb=None, ): """Execute command with streaming, cancellation, timeout, and size caps. Returns: (exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip) """ start = time.time() out_buf = bytearray() err_buf = bytearray() exit_code = -1 cancelled = False timeout = False peer_ip = "" client = None try: if progress_cb: progress_cb("connecting", 0, int((time.time() - start) * 1000)) client, peer_ip = self._connect() if progress_cb: progress_cb("connected", 0, int((time.time() - start) * 1000)) transport = client.get_transport() chan = transport.open_session() chan.settimeout(1.0) chan.exec_command(command) last_progress = time.time() while True: if cancel_event and cancel_event.is_set(): cancelled = True try: chan.close() except Exception: pass break now = time.time() elapsed = now - start if max_seconds and elapsed > max_seconds: timeout = True try: chan.close() except Exception: pass break if chan.recv_ready(): chunk = chan.recv(4096) if chunk: out_buf.extend(chunk) if len(out_buf) > max_output_bytes: out_buf = out_buf[:max_output_bytes] if progress_cb and (now - last_progress) > 0.5: progress_cb("running", len(out_buf), int(elapsed * 1000)) last_progress = now if chan.recv_stderr_ready(): chunk = chan.recv_stderr(4096) if chunk: err_buf.extend(chunk) if len(err_buf) > max_output_bytes: err_buf = err_buf[:max_output_bytes] if progress_cb and (now - last_progress) > 0.5: progress_cb( "running", len(out_buf) + len(err_buf), int(elapsed * 1000) ) last_progress = now if ( chan.exit_status_ready() and not chan.recv_ready() and not chan.recv_stderr_ready() ): exit_code = chan.recv_exit_status() break time.sleep(0.05) except Exception as e: traceback.print_exc(file=sys.stderr) err_buf.extend(str(e).encode("utf-8", errors="ignore")) finally: try: if client: client.close() except Exception: pass duration_ms = int((time.time() - start) * 1000) out_txt = out_buf.decode("utf-8", errors="replace") err_txt = err_buf.decode("utf-8", errors="replace") combined = (out_txt + ("\n" if out_txt and err_txt else "") + err_txt).strip() return ( exit_code, duration_ms, cancelled, timeout, len(out_buf), len(err_buf), combined, peer_ip, )
  • @mcp.tool() decorator registers ssh_run_async as an MCP tool on the FastMCP instance.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server