ssh_plan
Check if SSH commands are permitted and preview execution details before running them on server fleets.
Instructions
Show what would be executed and if policy allows.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| alias | No | ||
| command | No |
Implementation Reference
- src/mcp_ssh/mcp_server.py:894-939 (handler)The primary handler function for the 'ssh_plan' MCP tool. It performs input validation on alias and command, computes a hash of the command, checks the host's tags and policy to determine if the command is allowed, summarizes relevant execution limits, and returns a structured preview dictionary. If denied, it includes explanation and hints. Registered via @mcp.tool() decorator.def ssh_plan(alias: str = "", command: str = "") -> ToolResult: """Show what would be executed and if policy allows.""" try: # Input validation valid, error_msg = _validate_alias(alias) if not valid: return f"Error: {error_msg}" valid, error_msg = _validate_command(command) if not valid: return f"Error: {error_msg}" cmd_hash = hash_command(command) tags = config.get_host_tags(alias) pol = Policy(config.get_policy()) allowed = pol.is_allowed(alias, tags, command) limits = pol.limits_for(alias, tags) preview = { "alias": alias, "command": command, "hash": cmd_hash, "allowed": allowed, "limits": { "max_seconds": limits.get("max_seconds", 60), "max_output_bytes": limits.get("max_output_bytes", 1024 * 1024), "host_key_auto_add": bool(limits.get("host_key_auto_add", False)), "require_known_host": bool(limits.get("require_known_host", True)), }, } if not allowed: # Enhanced error message: identify which command in chain is denied denied_cmd = pol.get_denied_command_in_chain(alias, tags, command) if denied_cmd and denied_cmd != command: # Command chain with denied command preview["why"] = f"Policy blocked command in chain: '{denied_cmd}'" preview["denied_command"] = denied_cmd else: # Single command or entire chain denied preview["why"] = "Policy blocked this command." preview["hint"] = _POLICY_DENY_HINT return preview except Exception as e: error_str = str(e) log_json({"level": "error", "msg": "plan_exception", "error": error_str}) return f"Error: {sanitize_error(error_str)}"