Skip to main content
Glama
runtimeguard

runtime-guard

Official

execute_command

Execute commands safely by blocking dangerous operations like rm -rf and privilege escalation, requiring human approval for risky actions, and simulating blast radius before execution.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
commandYes
retry_countNo
ctxNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The `execute_command` function implements the core tool logic, including policy checks, simulation, budget tracking, and the actual execution of the command via a shell.
    def execute_command(command: str, retry_count: int = 0, ctx: Context | None = None) -> str:
        context_tokens = activate_runtime_context(ctx)
        network_warning = None
        shell_containment_warning = None
        shell_containment_paths: list[str] = []
        budget_fields: dict = {}
        simulation = None
        simulation_diagnostic: tuple[str, str] | None = None
    
        try:
            if has_shell_unsafe_control_chars(command):
                result = PolicyResult(
                    allowed=False,
                    reason="Command contains disallowed control characters (newline, carriage return, or NUL)",
                    decision_tier="blocked",
                    matched_rule="command_control_characters",
                )
            elif command_targets_backup_storage(command):
                result = PolicyResult(
                    allowed=False,
                    reason="Command targets protected backup storage; use restore_backup for controlled recovery operations",
                    decision_tier="blocked",
                    matched_rule="backup_storage_protected",
                )
            else:
                net_allowed, net_reason = network_policy_check(command)
                mode = str(POLICY.get("network", {}).get("enforcement_mode", "off")).lower()
                if not net_allowed:
                    result = PolicyResult(
                        allowed=False,
                        reason=net_reason or "Network command blocked by policy",
                        decision_tier="blocked",
                        matched_rule="network_policy",
                    )
                else:
                    if mode == "monitor" and net_reason:
                        network_warning = net_reason
                    containment_allowed, containment_reason, containment_paths = shell_workspace_containment_check(command)
                    if not containment_allowed:
                        result = PolicyResult(
                            allowed=False,
                            reason=containment_reason or "Shell workspace containment blocked command.",
                            decision_tier="blocked",
                            matched_rule="execution.shell_workspace_containment",
                        )
                        shell_containment_paths = containment_paths
                    else:
                        if containment_reason:
                            shell_containment_warning = containment_reason
                            shell_containment_paths = containment_paths
                        sim_commands = [c.lower() for c in POLICY.get("requires_simulation", {}).get("commands", [])]
                        if sim_commands:
                            simulation = simulate_blast_radius(command, sim_commands)
                        result = check_policy(command, simulation=simulation)
                        if simulation is not None:
                            simulation_diagnostic = check_simulation_tier(command, simulation=simulation)
    
            affected_for_budget: list[str] = []
            affected_for_limits: list[str] = []
            if result.allowed:
                if simulation and simulation["affected"]:
                    affected_for_budget = simulation["affected"]
                    affected_for_limits = simulation["affected"]
                else:
                    affected_for_budget = extract_paths(command)
                    affected_for_limits = affected_for_budget
    
                # Allowed-tier safety cap for default-allowed multi-target operations.
                # Keep simulation-specific wildcard logic governed by simulation thresholds.
                is_simulation_wildcard_flow = bool(simulation and simulation.get("saw_wildcard"))
                if not is_simulation_wildcard_flow:
                    resolved_unique: list[str] = []
                    seen: set[str] = set()
                    for candidate in affected_for_limits:
                        try:
                            resolved = str(pathlib.Path(candidate).resolve())
                        except OSError:
                            continue
                        if not is_within_workspace(resolved):
                            continue
                        if resolved in seen:
                            continue
                        seen.add(resolved)
                        resolved_unique.append(resolved)
    
                    max_files = int(POLICY.get("allowed", {}).get("max_files_per_operation", 10))
                    if max_files >= 0 and len(resolved_unique) > max_files:
                        result = PolicyResult(
                            allowed=False,
                            reason=(
                                f"Operation targets {len(resolved_unique)} file/path entries, "
                                f"which exceeds allowed.max_files_per_operation={max_files}"
                            ),
                            decision_tier="blocked",
                            matched_rule="allowed.max_files_per_operation",
                        )
    
                if result.allowed:
                    budget_allowed, budget_reason, budget_rule, budget_fields = check_and_record_cumulative_budget(
                        tool="execute_command",
                        command=command,
                        affected_paths=affected_for_budget,
                        operation_count=1,
                    )
                    if not budget_allowed:
                        result = PolicyResult(
                            allowed=False,
                            reason=budget_reason or "Cumulative blast-radius budget exceeded for current scope.",
                            decision_tier="blocked",
                            matched_rule=budget_rule or "requires_simulation.cumulative_budget_exceeded",
                        )
    
            server_retry_count = 0
            final_block = False
            if not result.allowed and result.decision_tier != "requires_confirmation":
                server_retry_count = register_retry(command, result.decision_tier, result.matched_rule)
                final_block = server_retry_count >= MAX_RETRIES
    
            log_entry = build_log_entry(
                "execute_command",
                result,
                command=command,
                normalized_command=normalize_for_audit(command),
                retry_count=retry_count,
                server_retry_count=server_retry_count,
                affected_paths_count=len(affected_for_budget),
                **({"network_warning": network_warning} if network_warning else {}),
                **({"shell_containment_warning": shell_containment_warning} if shell_containment_warning else {}),
                **({"shell_containment_offending_paths": shell_containment_paths} if shell_containment_paths else {}),
                **(
                    {
                        "simulation_diagnostic_message": simulation_diagnostic[0],
                        "simulation_diagnostic_rule": simulation_diagnostic[1],
                    }
                    if simulation_diagnostic and result.decision_tier == "requires_confirmation"
                    else {}
                ),
                **budget_fields,
                **({"final_block": True} if final_block else {}),
            )
            append_log_entry(log_entry)
    
            if not result.allowed:
                if result.decision_tier == "requires_confirmation":
                    approval_paths = simulation["affected"] if simulation and simulation.get("affected") else extract_paths(command)
                    token, expires_at = issue_or_reuse_approval_token(
                        command,
                        session_id=current_agent_session_id(),
                        affected_paths=approval_paths,
                    )
                    simulation_context = (
                        f"Simulation context: {simulation_diagnostic[0]}\n"
                        if simulation_diagnostic
                        else ""
                    )
                    return (
                        f"[POLICY BLOCK] {result.reason}\n\n"
                        "This command requires an explicit confirmation handshake.\n"
                        f"{simulation_context}"
                        "Ask a human operator to approve it via the control-plane GUI/API using this exact command and token, then retry execute_command:\n"
                        f"approval_token={token}\n"
                        f"token_expires_at={expires_at.isoformat()}Z"
                    )
    
                if final_block:
                    return (
                        f"[POLICY BLOCK] {result.reason}\n\n"
                        f"Maximum retries reached ({MAX_RETRIES}/{MAX_RETRIES}). "
                        "This action is permanently blocked for the current request. "
                        "No further attempts will be accepted."
                    )
    
                attempts_remaining = MAX_RETRIES - server_retry_count
                return (
                    f"[POLICY BLOCK] {result.reason}\n\n"
                    f"You have {attempts_remaining} attempt(s) remaining. "
                    "Please retry execute_command with a safer alternative command "
                    f"(server attempt {server_retry_count}/{MAX_RETRIES})."
                )
    
            if MODIFYING_COMMAND_RE.search(command):
                affected = extract_paths(command)
                if affected and POLICY.get("audit", {}).get("backup_enabled", True):
                    backup_location = backup_paths(affected)
                    if backup_location:
                        append_log_entry(
                            {
                                **log_entry,
                                "source": "mcp-server",
                                "backup_location": backup_location,
                                "event": "backup_created",
                            }
                        )
    
            timeout_seconds, max_output_chars = execution_limits()
            try:
                proc = run_shell_command(command, timeout_seconds)
            except subprocess.TimeoutExpired:
                return f"Command timed out after {timeout_seconds} seconds"
    
            stdout = truncate_output(proc.stdout or "", max_output_chars)
            stderr = truncate_output(proc.stderr or "", max_output_chars)
    
            if proc.returncode != 0:
                return stderr or f"Command exited with code {proc.returncode}"
            return stdout
        finally:
            reset_runtime_context(context_tokens)
Behavior1/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

Tool has no description.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness1/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Tool has no description.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Tool has no description.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Tool has no description.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose1/5

Does the description clearly state what the tool does and how it differs from similar tools?

Tool has no description.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines1/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Tool has no description.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/runtimeguard/runtime-guard'

If you have feedback or need assistance with the MCP directory API, please join our Discord server