Skip to main content
Glama

execute_command

Execute shell commands through the AI Runtime Guard policy layer to enforce security boundaries before system actions.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYes
retry_countNo
ctxNo

Implementation Reference

  • The implementation of the `execute_command` tool, which handles policy checks, simulation of blast radius, budget tracking, command execution via `run_shell_command`, and output handling.
    def execute_command(command: str, retry_count: int = 0, ctx: Context | None = None) -> str:
        context_tokens = activate_runtime_context(ctx)
        network_warning = None
        shell_containment_warning = None
        shell_containment_paths: list[str] = []
        budget_fields: dict = {}
        simulation = None
        simulation_diagnostic: tuple[str, str] | None = None
    
        try:
            if has_shell_unsafe_control_chars(command):
                result = PolicyResult(
                    allowed=False,
                    reason="Command contains disallowed control characters (newline, carriage return, or NUL)",
                    decision_tier="blocked",
                    matched_rule="command_control_characters",
                )
            elif command_targets_backup_storage(command):
                result = PolicyResult(
                    allowed=False,
                    reason="Command targets protected backup storage; use restore_backup for controlled recovery operations",
                    decision_tier="blocked",
                    matched_rule="backup_storage_protected",
                )
            else:
                net_allowed, net_reason = network_policy_check(command)
                mode = str(POLICY.get("network", {}).get("enforcement_mode", "off")).lower()
                if not net_allowed:
                    result = PolicyResult(
                        allowed=False,
                        reason=net_reason or "Network command blocked by policy",
                        decision_tier="blocked",
                        matched_rule="network_policy",
                    )
                else:
                    if mode == "monitor" and net_reason:
                        network_warning = net_reason
                    containment_allowed, containment_reason, containment_paths = shell_workspace_containment_check(command)
                    if not containment_allowed:
                        result = PolicyResult(
                            allowed=False,
                            reason=containment_reason or "Shell workspace containment blocked command.",
                            decision_tier="blocked",
                            matched_rule="execution.shell_workspace_containment",
                        )
                        shell_containment_paths = containment_paths
                    else:
                        if containment_reason:
                            shell_containment_warning = containment_reason
                            shell_containment_paths = containment_paths
                        sim_commands = [c.lower() for c in POLICY.get("requires_simulation", {}).get("commands", [])]
                        if sim_commands:
                            simulation = simulate_blast_radius(command, sim_commands)
                        result = check_policy(command, simulation=simulation)
                        if simulation is not None:
                            simulation_diagnostic = check_simulation_tier(command, simulation=simulation)
    
            affected_for_budget: list[str] = []
            affected_for_limits: list[str] = []
            if result.allowed:
                if simulation and simulation["affected"]:
                    affected_for_budget = simulation["affected"]
                    affected_for_limits = simulation["affected"]
                else:
                    affected_for_budget = extract_paths(command)
                    affected_for_limits = affected_for_budget
    
                # Allowed-tier safety cap for default-allowed multi-target operations.
                # Keep simulation-specific wildcard logic governed by simulation thresholds.
                is_simulation_wildcard_flow = bool(simulation and simulation.get("saw_wildcard"))
                if not is_simulation_wildcard_flow:
                    resolved_unique: list[str] = []
                    seen: set[str] = set()
                    for candidate in affected_for_limits:
                        try:
                            resolved = str(pathlib.Path(candidate).resolve())
                        except OSError:
                            continue
                        if not is_within_workspace(resolved):
                            continue
                        if resolved in seen:
                            continue
                        seen.add(resolved)
                        resolved_unique.append(resolved)
    
                    max_files = int(POLICY.get("allowed", {}).get("max_files_per_operation", 10))
                    if max_files >= 0 and len(resolved_unique) > max_files:
                        result = PolicyResult(
                            allowed=False,
                            reason=(
                                f"Operation targets {len(resolved_unique)} file/path entries, "
                                f"which exceeds allowed.max_files_per_operation={max_files}"
                            ),
                            decision_tier="blocked",
                            matched_rule="allowed.max_files_per_operation",
                        )
    
                if result.allowed:
                    budget_allowed, budget_reason, budget_rule, budget_fields = check_and_record_cumulative_budget(
                        tool="execute_command",
                        command=command,
                        affected_paths=affected_for_budget,
                        operation_count=1,
                    )
                    if not budget_allowed:
                        result = PolicyResult(
                            allowed=False,
                            reason=budget_reason or "Cumulative blast-radius budget exceeded for current scope.",
                            decision_tier="blocked",
                            matched_rule=budget_rule or "requires_simulation.cumulative_budget_exceeded",
                        )
    
            server_retry_count = 0
            final_block = False
            if not result.allowed and result.decision_tier != "requires_confirmation":
                server_retry_count = register_retry(command, result.decision_tier, result.matched_rule)
                final_block = server_retry_count >= MAX_RETRIES
    
            log_entry = build_log_entry(
                "execute_command",
                result,
                command=command,
                normalized_command=normalize_for_audit(command),
                retry_count=retry_count,
                server_retry_count=server_retry_count,
                affected_paths_count=len(affected_for_budget),
                **({"network_warning": network_warning} if network_warning else {}),
                **({"shell_containment_warning": shell_containment_warning} if shell_containment_warning else {}),
                **({"shell_containment_offending_paths": shell_containment_paths} if shell_containment_paths else {}),
                **(
                    {
                        "simulation_diagnostic_message": simulation_diagnostic[0],
                        "simulation_diagnostic_rule": simulation_diagnostic[1],
                    }
                    if simulation_diagnostic and result.decision_tier == "requires_confirmation"
                    else {}
                ),
                **budget_fields,
                **({"final_block": True} if final_block else {}),
            )
            append_log_entry(log_entry)
    
            if not result.allowed:
                if result.decision_tier == "requires_confirmation":
                    approval_paths = simulation["affected"] if simulation and simulation.get("affected") else extract_paths(command)
                    token, expires_at = issue_or_reuse_approval_token(
                        command,
                        session_id=current_agent_session_id(),
                        affected_paths=approval_paths,
                    )
                    simulation_context = (
                        f"Simulation context: {simulation_diagnostic[0]}\n"
                        if simulation_diagnostic
                        else ""
                    )
                    return (
                        f"[POLICY BLOCK] {result.reason}\n\n"
                        "This command requires an explicit confirmation handshake.\n"
                        f"{simulation_context}"
                        "Ask a human operator to approve it via the control-plane GUI/API using this exact command and token, then retry execute_command:\n"
                        f"approval_token={token}\n"
                        f"token_expires_at={expires_at.isoformat()}Z"
                    )
    
                if final_block:
                    return (
                        f"[POLICY BLOCK] {result.reason}\n\n"
                        f"Maximum retries reached ({MAX_RETRIES}/{MAX_RETRIES}). "
                        "This action is permanently blocked for the current request. "
                        "No further attempts will be accepted."
                    )
    
                attempts_remaining = MAX_RETRIES - server_retry_count
                return (
                    f"[POLICY BLOCK] {result.reason}\n\n"
                    f"You have {attempts_remaining} attempt(s) remaining. "
                    "Please retry execute_command with a safer alternative command "
                    f"(server attempt {server_retry_count}/{MAX_RETRIES})."
                )
    
            if MODIFYING_COMMAND_RE.search(command):
                affected = extract_paths(command)
                if affected and POLICY.get("audit", {}).get("backup_enabled", True):
                    backup_location = backup_paths(affected)
                    if backup_location:
                        append_log_entry(
                            {
                                **log_entry,
                                "source": "mcp-server",
                                "backup_location": backup_location,
                                "event": "backup_created",
                            }
                        )
    
            timeout_seconds, max_output_chars = execution_limits()
            try:
                proc = run_shell_command(command, timeout_seconds)
            except subprocess.TimeoutExpired:
                return f"Command timed out after {timeout_seconds} seconds"
    
            stdout = truncate_output(proc.stdout or "", max_output_chars)
            stderr = truncate_output(proc.stderr or "", max_output_chars)
    
            if proc.returncode != 0:
                return stderr or f"Command exited with code {proc.returncode}"
            return stdout
        finally:
            reset_runtime_context(context_tokens)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jimmyracheta/ai-runtime-guard'

If you have feedback or need assistance with the MCP directory API, please join our Discord server