Skip to main content
Glama
samerfarida

MCP SSH Orchestrator

ssh_run_on_tag

Execute SSH commands on multiple tagged hosts simultaneously with network verification for efficient infrastructure management.

Instructions

Execute SSH command on all hosts with a tag (with network checks).

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
tagNo
commandNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The primary handler function for the 'ssh_run_on_tag' MCP tool. It performs input validation, retrieves matching hosts via config.find_hosts_by_tag, applies per-host policy and network checks, executes SSH commands with error handling for partial failures, and aggregates results into a summary dictionary.
    @mcp.tool()
    def ssh_run_on_tag(
        tag: str = "",
        command: str = "",
        ctx: Context | None = None,
    ) -> ToolResult:
        """Execute SSH command on all hosts with a tag (with network checks)."""
        try:
            # Input validation
            valid, error_msg = _validate_tag(tag)
            if not valid:
                return f"Error: {error_msg}"
    
            valid, error_msg = _validate_command(command)
            if not valid:
                return f"Error: {error_msg}"
    
            # Normalize after validation
            tag = tag.strip()
            command = command.strip()
            cmd_hash = hash_command(command)
    
            aliases = config.find_hosts_by_tag(tag)
            _ctx_log(
                ctx,
                "debug",
                "ssh_run_on_tag_start",
                {"tag": tag, "hash": cmd_hash, "target_count": len(aliases)},
            )
            if not aliases:
                return {"tag": tag, "results": [], "note": "No hosts matched."}
    
            results = []
            for alias in aliases:
                host = config.get_host(alias)
                hostname = host.get("host", "")
                tags = config.get_host_tags(alias)
                pol = Policy(config.get_policy())
    
                # Command policy
                allowed = pol.is_allowed(alias, tags, command)
                pol.log_decision(alias, cmd_hash, allowed)
                if not allowed:
                    results.append(
                        {
                            "alias": alias,
                            "hash": cmd_hash,
                            "denied": True,
                            "reason": "policy",
                            "hint": _POLICY_DENY_HINT,
                        }
                    )
                    continue
    
                # Network precheck
                ok, reason = _precheck_network(pol, hostname)
                if not ok:
                    results.append(
                        {
                            "alias": alias,
                            "hash": cmd_hash,
                            "denied": True,
                            "reason": f"network: {reason}",
                            "detail": reason,
                            "hint": _NETWORK_DENY_HINT,
                        }
                    )
                    continue
    
                # Wrap SSH connection/execution in try-except for per-host error handling
                task_id = None
                try:
                    limits = pol.limits_for(alias, tags)
                    max_seconds = int(limits.get("max_seconds", 60))
                    max_output_bytes = int(limits.get("max_output_bytes", 1024 * 1024))
                    require_known_host_config = bool(
                        limits.get("require_known_host", pol.require_known_host())
                    )
                    # Security: Always require known_host for security (CWE-295)
                    if not require_known_host_config:
                        log_json(
                            {
                                "level": "warn",
                                "msg": "deprecation_warning",
                                "type": "host_key_policy_deprecated",
                                "detail": "require_known_host=False is deprecated and ignored. Always requiring known_hosts entry for security.",
                                "alias": alias,
                                "cwe": "CWE-295",
                            }
                        )
                    require_known_host = True  # Always enforce strict host key verification
    
                    task_id = TASKS.create(alias, cmd_hash)
    
                    def progress_cb(
                        phase: str,
                        bytes_read: int,
                        elapsed_ms: int,
                        pol_ref: Policy = pol,
                        task_ref: str = task_id,
                    ) -> None:
                        pol_ref.log_progress(
                            task_ref, phase, int(bytes_read), int(elapsed_ms)
                        )
    
                    client = _client_for(alias, limits, require_known_host)
                    cancel_event = TASKS.get_event(task_id)
                    (
                        exit_code,
                        duration_ms,
                        cancelled,
                        timeout,
                        bytes_out,
                        bytes_err,
                        combined,
                        peer_ip,
                    ) = client.run_streaming(
                        command=command,
                        cancel_event=cancel_event,
                        max_seconds=max_seconds,
                        max_output_bytes=max_output_bytes,
                        progress_cb=progress_cb,
                    )
                    TASKS.cleanup(task_id)
    
                    # Post-connect enforcement
                    if peer_ip and not pol.is_ip_allowed(peer_ip):
                        pol.log_audit(
                            alias,
                            cmd_hash,
                            int(exit_code),
                            int(duration_ms),
                            int(bytes_out),
                            int(bytes_err),
                            bool(cancelled),
                            bool(timeout),
                            peer_ip,
                        )
                        results.append(
                            {
                                "alias": alias,
                                "task_id": task_id,
                                "hash": cmd_hash,
                                "denied": True,
                                "reason": f"network: peer {peer_ip} not allowed",
                                "detail": f"peer {peer_ip} not allowed",
                                "hint": _NETWORK_DENY_HINT,
                            }
                        )
                        continue
    
                    pol.log_audit(
                        alias,
                        cmd_hash,
                        int(exit_code),
                        int(duration_ms),
                        int(bytes_out),
                        int(bytes_err),
                        bool(cancelled),
                        bool(timeout),
                        peer_ip,
                    )
                    results.append(
                        {
                            "alias": alias,
                            "task_id": task_id,
                            "hash": cmd_hash,
                            "exit_code": int(exit_code),
                            "duration_ms": int(duration_ms),
                            "cancelled": bool(cancelled),
                            "timeout": bool(timeout),
                            "target_ip": peer_ip,
                            "output": combined,
                        }
                    )
                except Exception as e:
                    # Handle per-host failures gracefully
                    error_str = str(e)
                    log_json(
                        {
                            "level": "error",
                            "msg": "run_on_tag_host_failed",
                            "alias": alias,
                            "error": error_str,
                        }
                    )
    
                    # Clean up task if it was created
                    if task_id:
                        try:
                            TASKS.cleanup(task_id)
                        except Exception:
                            pass
    
                    # Add error result for this host
                    results.append(
                        {
                            "alias": alias,
                            "task_id": task_id if task_id else "",
                            "hash": cmd_hash,
                            "exit_code": -1,
                            "duration_ms": 0,
                            "cancelled": False,
                            "timeout": False,
                            "target_ip": "",
                            "output": sanitize_error(error_str),
                        }
                    )
                    continue  # Continue with next host
    
            summary = {
                "tag": tag,
                "results": results,
            }
            _ctx_log(
                ctx,
                "info",
                "ssh_run_on_tag_complete",
                {
                    "tag": tag,
                    "hash": cmd_hash,
                    "target_count": len(aliases),
                    "succeeded": sum(1 for r in results if not r.get("denied")),
                },
            )
            return summary
        except Exception as e:
            error_str = str(e)
            log_json({"level": "error", "msg": "run_on_tag_exception", "error": error_str})
            _ctx_log(
                ctx,
                "debug",
                "ssh_run_on_tag_error",
                {"tag": tag.strip(), "error": sanitize_error(error_str)},
            )
            return f"Run on tag error: {sanitize_error(error_str)}"
  • Helper method on Config class used by ssh_run_on_tag to find all host aliases matching the specified tag.
    def find_hosts_by_tag(self, tag: str) -> list:
        """Find all host aliases with a given tag."""
        out = []
        tag_str = str(tag).strip()
        for h in self._data.get("servers", {}).get("hosts", []):
            tags = h.get("tags", []) or []
            if tag_str in tags:
                out.append(str(h.get("alias", "")))
        return out
  • Utility function hash_command used to generate command hashes for logging and task IDs in ssh_run_on_tag.
    def hash_command(command: str) -> str:
        """Return short SHA256 hash for a command.
    
        Returns 16 characters (64 bits) for better collision resistance in audit trails.
        """
        return hashlib.sha256((command or "").encode()).hexdigest()[:16]
  • MCP tool registration decorator @mcp.tool() applied to the ssh_run_on_tag handler function.
    @mcp.tool()
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. It mentions 'with network checks', which adds some context about execution behavior, but fails to describe critical aspects such as whether this is a read-only or destructive operation, authentication requirements, error handling, or output format. For a tool that executes commands remotely, this is a significant gap in transparency.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that front-loads the core purpose. It avoids unnecessary words, but could be slightly improved by structuring key details more explicitly. Overall, it's appropriately sized for the tool's complexity.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness3/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool has an output schema (which handles return values) and no annotations, the description is minimally complete but lacks depth. It covers the basic action and a hint of behavior ('network checks'), but for a command execution tool with remote hosts, more context on permissions, safety, or error scenarios would enhance completeness. It's adequate but has clear gaps.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 0% description coverage, so parameters 'tag' and 'command' are undocumented in the schema. The description doesn't add any semantic details about these parameters, such as what the 'tag' represents, format examples for 'command', or how they interact. With low schema coverage, the description fails to compensate, leaving parameters poorly understood.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the action ('Execute SSH command') and target ('on all hosts with a tag'), which is specific and actionable. However, it doesn't explicitly differentiate from sibling tools like 'ssh_run' (which might run on specific hosts rather than tagged groups) or 'ssh_run_async' (which might be asynchronous), leaving room for ambiguity in sibling differentiation.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides minimal guidance by mentioning 'with network checks', which implies a use case involving network verification, but it doesn't specify when to use this tool versus alternatives like 'ssh_run' or 'ssh_run_async'. No explicit when-not-to-use or prerequisite information is given, making it inadequate for informed tool selection.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samerfarida/mcp-ssh-orchestrator'

If you have feedback or need assistance with the MCP directory API, please join our Discord server