execute_command
Execute shell commands through the AI Runtime Guard policy layer to enforce security boundaries before system actions.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | ||
| retry_count | No | ||
| ctx | No |
Implementation Reference
- src/tools/command_tools.py:43-250 (handler)The implementation of the `execute_command` tool, which handles policy checks, simulation of blast radius, budget tracking, command execution via `run_shell_command`, and output handling.
def execute_command(command: str, retry_count: int = 0, ctx: Context | None = None) -> str: context_tokens = activate_runtime_context(ctx) network_warning = None shell_containment_warning = None shell_containment_paths: list[str] = [] budget_fields: dict = {} simulation = None simulation_diagnostic: tuple[str, str] | None = None try: if has_shell_unsafe_control_chars(command): result = PolicyResult( allowed=False, reason="Command contains disallowed control characters (newline, carriage return, or NUL)", decision_tier="blocked", matched_rule="command_control_characters", ) elif command_targets_backup_storage(command): result = PolicyResult( allowed=False, reason="Command targets protected backup storage; use restore_backup for controlled recovery operations", decision_tier="blocked", matched_rule="backup_storage_protected", ) else: net_allowed, net_reason = network_policy_check(command) mode = str(POLICY.get("network", {}).get("enforcement_mode", "off")).lower() if not net_allowed: result = PolicyResult( allowed=False, reason=net_reason or "Network command blocked by policy", decision_tier="blocked", matched_rule="network_policy", ) else: if mode == "monitor" and net_reason: network_warning = net_reason containment_allowed, containment_reason, containment_paths = shell_workspace_containment_check(command) if not containment_allowed: result = PolicyResult( allowed=False, reason=containment_reason or "Shell workspace containment blocked command.", decision_tier="blocked", matched_rule="execution.shell_workspace_containment", ) shell_containment_paths = containment_paths else: if containment_reason: shell_containment_warning = containment_reason shell_containment_paths = containment_paths sim_commands = [c.lower() for c in POLICY.get("requires_simulation", {}).get("commands", [])] if sim_commands: simulation = simulate_blast_radius(command, sim_commands) result = check_policy(command, simulation=simulation) if simulation is not None: simulation_diagnostic = check_simulation_tier(command, simulation=simulation) affected_for_budget: list[str] = [] affected_for_limits: list[str] = [] if result.allowed: if simulation and simulation["affected"]: affected_for_budget = simulation["affected"] affected_for_limits = simulation["affected"] else: affected_for_budget = extract_paths(command) affected_for_limits = affected_for_budget # Allowed-tier safety cap for default-allowed multi-target operations. # Keep simulation-specific wildcard logic governed by simulation thresholds. is_simulation_wildcard_flow = bool(simulation and simulation.get("saw_wildcard")) if not is_simulation_wildcard_flow: resolved_unique: list[str] = [] seen: set[str] = set() for candidate in affected_for_limits: try: resolved = str(pathlib.Path(candidate).resolve()) except OSError: continue if not is_within_workspace(resolved): continue if resolved in seen: continue seen.add(resolved) resolved_unique.append(resolved) max_files = int(POLICY.get("allowed", {}).get("max_files_per_operation", 10)) if max_files >= 0 and len(resolved_unique) > max_files: result = PolicyResult( allowed=False, reason=( f"Operation targets {len(resolved_unique)} file/path entries, " f"which exceeds allowed.max_files_per_operation={max_files}" ), decision_tier="blocked", matched_rule="allowed.max_files_per_operation", ) if result.allowed: budget_allowed, budget_reason, budget_rule, budget_fields = check_and_record_cumulative_budget( tool="execute_command", command=command, affected_paths=affected_for_budget, operation_count=1, ) if not budget_allowed: result = PolicyResult( allowed=False, reason=budget_reason or "Cumulative blast-radius budget exceeded for current scope.", decision_tier="blocked", matched_rule=budget_rule or "requires_simulation.cumulative_budget_exceeded", ) server_retry_count = 0 final_block = False if not result.allowed and result.decision_tier != "requires_confirmation": server_retry_count = register_retry(command, result.decision_tier, result.matched_rule) final_block = server_retry_count >= MAX_RETRIES log_entry = build_log_entry( "execute_command", result, command=command, normalized_command=normalize_for_audit(command), retry_count=retry_count, server_retry_count=server_retry_count, affected_paths_count=len(affected_for_budget), **({"network_warning": network_warning} if network_warning else {}), **({"shell_containment_warning": shell_containment_warning} if shell_containment_warning else {}), **({"shell_containment_offending_paths": shell_containment_paths} if shell_containment_paths else {}), **( { "simulation_diagnostic_message": simulation_diagnostic[0], "simulation_diagnostic_rule": simulation_diagnostic[1], } if simulation_diagnostic and result.decision_tier == "requires_confirmation" else {} ), **budget_fields, **({"final_block": True} if final_block else {}), ) append_log_entry(log_entry) if not result.allowed: if result.decision_tier == "requires_confirmation": approval_paths = simulation["affected"] if simulation and simulation.get("affected") else extract_paths(command) token, expires_at = issue_or_reuse_approval_token( command, session_id=current_agent_session_id(), affected_paths=approval_paths, ) simulation_context = ( f"Simulation context: {simulation_diagnostic[0]}\n" if simulation_diagnostic else "" ) return ( f"[POLICY BLOCK] {result.reason}\n\n" "This command requires an explicit confirmation handshake.\n" f"{simulation_context}" "Ask a human operator to approve it via the control-plane GUI/API using this exact command and token, then retry execute_command:\n" f"approval_token={token}\n" f"token_expires_at={expires_at.isoformat()}Z" ) if final_block: return ( f"[POLICY BLOCK] {result.reason}\n\n" f"Maximum retries reached ({MAX_RETRIES}/{MAX_RETRIES}). " "This action is permanently blocked for the current request. " "No further attempts will be accepted." ) attempts_remaining = MAX_RETRIES - server_retry_count return ( f"[POLICY BLOCK] {result.reason}\n\n" f"You have {attempts_remaining} attempt(s) remaining. " "Please retry execute_command with a safer alternative command " f"(server attempt {server_retry_count}/{MAX_RETRIES})." ) if MODIFYING_COMMAND_RE.search(command): affected = extract_paths(command) if affected and POLICY.get("audit", {}).get("backup_enabled", True): backup_location = backup_paths(affected) if backup_location: append_log_entry( { **log_entry, "source": "mcp-server", "backup_location": backup_location, "event": "backup_created", } ) timeout_seconds, max_output_chars = execution_limits() try: proc = run_shell_command(command, timeout_seconds) except subprocess.TimeoutExpired: return f"Command timed out after {timeout_seconds} seconds" stdout = truncate_output(proc.stdout or "", max_output_chars) stderr = truncate_output(proc.stderr or "", max_output_chars) if proc.returncode != 0: return stderr or f"Command exited with code {proc.returncode}" return stdout finally: reset_runtime_context(context_tokens)