ssh_run_on_tag
Execute SSH commands on multiple tagged hosts simultaneously with network verification for efficient infrastructure management.
Instructions
Execute SSH command on all hosts with a tag (with network checks).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| tag | No | ||
| command | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/mcp_ssh/mcp_server.py:1114-1350 (handler)The primary handler function for the 'ssh_run_on_tag' MCP tool. It performs input validation, retrieves matching hosts via config.find_hosts_by_tag, applies per-host policy and network checks, executes SSH commands with error handling for partial failures, and aggregates results into a summary dictionary.
@mcp.tool() def ssh_run_on_tag( tag: str = "", command: str = "", ctx: Context | None = None, ) -> ToolResult: """Execute SSH command on all hosts with a tag (with network checks).""" try: # Input validation valid, error_msg = _validate_tag(tag) if not valid: return f"Error: {error_msg}" valid, error_msg = _validate_command(command) if not valid: return f"Error: {error_msg}" # Normalize after validation tag = tag.strip() command = command.strip() cmd_hash = hash_command(command) aliases = config.find_hosts_by_tag(tag) _ctx_log( ctx, "debug", "ssh_run_on_tag_start", {"tag": tag, "hash": cmd_hash, "target_count": len(aliases)}, ) if not aliases: return {"tag": tag, "results": [], "note": "No hosts matched."} results = [] for alias in aliases: host = config.get_host(alias) hostname = host.get("host", "") tags = config.get_host_tags(alias) pol = Policy(config.get_policy()) # Command policy allowed = pol.is_allowed(alias, tags, command) pol.log_decision(alias, cmd_hash, allowed) if not allowed: results.append( { "alias": alias, "hash": cmd_hash, "denied": True, "reason": "policy", "hint": _POLICY_DENY_HINT, } ) continue # Network precheck ok, reason = _precheck_network(pol, hostname) if not ok: results.append( { "alias": alias, "hash": cmd_hash, "denied": True, "reason": f"network: {reason}", "detail": reason, "hint": _NETWORK_DENY_HINT, } ) continue # Wrap SSH connection/execution in try-except for per-host error handling task_id = None try: limits = pol.limits_for(alias, tags) max_seconds = int(limits.get("max_seconds", 60)) max_output_bytes = int(limits.get("max_output_bytes", 1024 * 1024)) require_known_host_config = bool( limits.get("require_known_host", pol.require_known_host()) ) # Security: Always require known_host for security (CWE-295) if not require_known_host_config: log_json( { "level": "warn", "msg": "deprecation_warning", "type": "host_key_policy_deprecated", "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.", "alias": alias, "cwe": "CWE-295", } ) require_known_host = True # Always enforce strict host key verification task_id = TASKS.create(alias, cmd_hash) def progress_cb( phase: str, bytes_read: int, elapsed_ms: int, pol_ref: Policy = pol, task_ref: str = task_id, ) -> None: pol_ref.log_progress( task_ref, phase, int(bytes_read), int(elapsed_ms) ) client = _client_for(alias, limits, require_known_host) cancel_event = TASKS.get_event(task_id) ( exit_code, duration_ms, cancelled, timeout, bytes_out, bytes_err, combined, peer_ip, ) = client.run_streaming( command=command, cancel_event=cancel_event, max_seconds=max_seconds, max_output_bytes=max_output_bytes, progress_cb=progress_cb, ) TASKS.cleanup(task_id) # Post-connect enforcement if peer_ip and not pol.is_ip_allowed(peer_ip): pol.log_audit( alias, cmd_hash, int(exit_code), int(duration_ms), int(bytes_out), int(bytes_err), bool(cancelled), bool(timeout), peer_ip, ) results.append( { "alias": alias, "task_id": task_id, "hash": cmd_hash, "denied": True, "reason": f"network: peer {peer_ip} not allowed", "detail": f"peer {peer_ip} not allowed", "hint": _NETWORK_DENY_HINT, } ) continue pol.log_audit( alias, cmd_hash, int(exit_code), int(duration_ms), int(bytes_out), int(bytes_err), bool(cancelled), bool(timeout), peer_ip, ) results.append( { "alias": alias, "task_id": task_id, "hash": cmd_hash, "exit_code": int(exit_code), "duration_ms": int(duration_ms), "cancelled": bool(cancelled), "timeout": bool(timeout), "target_ip": peer_ip, "output": combined, } ) except Exception as e: # Handle per-host failures gracefully error_str = str(e) log_json( { "level": "error", "msg": "run_on_tag_host_failed", "alias": alias, "error": error_str, } ) # Clean up task if it was created if task_id: try: TASKS.cleanup(task_id) except Exception: pass # Add error result for this host results.append( { "alias": alias, "task_id": task_id if task_id else "", "hash": cmd_hash, "exit_code": -1, "duration_ms": 0, "cancelled": False, "timeout": False, "target_ip": "", "output": sanitize_error(error_str), } ) continue # Continue with next host summary = { "tag": tag, "results": results, } _ctx_log( ctx, "info", "ssh_run_on_tag_complete", { "tag": tag, "hash": cmd_hash, "target_count": len(aliases), "succeeded": sum(1 for r in results if not r.get("denied")), }, ) return summary except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "run_on_tag_exception", "error": error_str}) _ctx_log( ctx, "debug", "ssh_run_on_tag_error", {"tag": tag.strip(), "error": sanitize_error(error_str)}, ) return f"Run on tag error: {sanitize_error(error_str)}" - src/mcp_ssh/config.py:564-573 (helper)Helper method on Config class used by ssh_run_on_tag to find all host aliases matching the specified tag.
def find_hosts_by_tag(self, tag: str) -> list: """Find all host aliases with a given tag.""" out = [] tag_str = str(tag).strip() for h in self._data.get("servers", {}).get("hosts", []): tags = h.get("tags", []) or [] if tag_str in tags: out.append(str(h.get("alias", ""))) return out - src/mcp_ssh/tools/utilities.py:12-17 (helper)Utility function hash_command used to generate command hashes for logging and task IDs in ssh_run_on_tag.
def hash_command(command: str) -> str: """Return short SHA256 hash for a command. Returns 16 characters (64 bits) for better collision resistance in audit trails. """ return hashlib.sha256((command or "").encode()).hexdigest()[:16] - src/mcp_ssh/mcp_server.py:1114-1114 (registration)MCP tool registration decorator @mcp.tool() applied to the ssh_run_on_tag handler function.
@mcp.tool()