Skip to main content
Glama

ssh_run

Execute SSH commands on server fleets with policy enforcement, network validation, and audit logging for secure infrastructure management.

Instructions

Execute SSH command with policy, network checks, progress, timeout, and cancellation.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aliasNo
commandNo

Implementation Reference

  • The primary handler for the 'ssh_run' MCP tool. It handles input validation, policy and network checks, SSH client creation, command execution with streaming, progress reporting, cancellation support, timeouts, and returns a detailed result including exit code, output, and metadata. Registered via @mcp.tool() decorator.
    @mcp.tool() def ssh_run( alias: str = "", command: str = "", ctx: Context | None = None ) -> ToolResult: """Execute SSH command with policy, network checks, progress, timeout, and cancellation.""" start = time.time() cmd_hash = "" alias = alias or "" try: # Input validation valid, error_msg = _validate_alias(alias) if not valid: return f"Error: {error_msg}" valid, error_msg = _validate_command(command) if not valid: return f"Error: {error_msg}" # Normalize after validation alias = alias.strip() command = command.strip() host = config.get_host(alias) hostname = host.get("host", "") cmd_hash = hash_command(command) _ctx_log(ctx, "debug", "ssh_run_start", {"alias": alias, "hash": cmd_hash}) tags = config.get_host_tags(alias) pol = Policy(config.get_policy()) # Command policy allowed = pol.is_allowed(alias, tags, command) pol.log_decision(alias, cmd_hash, allowed) if not allowed: return json.dumps( _policy_denied_response(alias, command, cmd_hash), indent=2, ) # Network precheck (DNS -> allowlist) ok, reason = _precheck_network(pol, hostname) if not ok: return json.dumps( _network_denied_response(alias, hostname, reason), indent=2, ) limits = pol.limits_for(alias, tags) max_seconds = int(limits.get("max_seconds", 60)) max_output_bytes = int(limits.get("max_output_bytes", 1024 * 1024)) require_known_host_config = bool( limits.get("require_known_host", pol.require_known_host()) ) # Security: Always require known_host for security (CWE-295) if not require_known_host_config: log_json( { "level": "warn", "msg": "deprecation_warning", "type": "host_key_policy_deprecated", "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.", "alias": alias, "cwe": "CWE-295", } ) require_known_host = True # Always enforce strict host key verification task_id = TASKS.create(alias, cmd_hash) _ctx_log( ctx, "debug", "ssh_run_task_created", {"alias": alias, "hash": cmd_hash, "task_id": task_id}, ) def progress_cb(phase: str, bytes_read: int, elapsed_ms: int) -> None: pol.log_progress(task_id, phase, int(bytes_read), int(elapsed_ms)) client = _client_for(alias, limits, require_known_host) cancel_event = TASKS.get_event(task_id) ( exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip, ) = client.run_streaming( command=command, cancel_event=cancel_event, max_seconds=max_seconds, max_output_bytes=max_output_bytes, progress_cb=progress_cb, ) TASKS.cleanup(task_id) # Post-connect enforcement: ensure actual peer IP is allowed if peer_ip and not pol.is_ip_allowed(peer_ip): pol.log_audit( alias, cmd_hash, int(exit_code), int(duration_ms), int(bytes_out), int(bytes_err), bool(cancelled), bool(timeout), peer_ip, ) return json.dumps( _network_denied_response( alias, hostname, f"peer IP {peer_ip} not allowed" ), indent=2, ) pol.log_audit( alias, cmd_hash, int(exit_code), int(duration_ms), int(bytes_out), int(bytes_err), bool(cancelled), bool(timeout), peer_ip, ) result = { "task_id": task_id, "alias": alias, "hash": cmd_hash, "exit_code": int(exit_code), "duration_ms": int(duration_ms), "cancelled": bool(cancelled), "timeout": bool(timeout), "target_ip": peer_ip, "output": combined, } _ctx_log( ctx, "info", "ssh_run_complete", { "alias": alias, "hash": cmd_hash, "task_id": task_id, "exit_code": int(exit_code), "timeout": bool(timeout), "cancelled": bool(cancelled), }, ) return result except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "run_exception", "error": error_str}) if cmd_hash: _ctx_log( ctx, "debug", "ssh_run_error", { "alias": alias.strip(), "hash": cmd_hash, "error": sanitize_error(error_str), }, ) return f"Run error: {sanitize_error(error_str)}" finally: elapsed = int((time.time() - start) * 1000) log_json({"type": "trace", "op": "run_done", "elapsed_ms": elapsed})

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server