execute_command
Execute shell commands after policy enforcement, network containment, and approval checks to maintain security boundaries.
Instructions
Execute a shell command after full AIRG policy and approval checks.
The command is evaluated against network/workspace containment, command-tier policy, Script Sentinel continuity checks, and optional confirmation gates before execution.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| command | Yes | ||
| retry_count | No | ||
| ctx | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/tools/command_tools.py:309-359 (handler)Main handler function for the execute_command tool. Evaluates policy/sentinel checks, builds audit logs, checks approval requirements, and executes the shell command via _execute_shell().
def execute_command(command: str, retry_count: int = 0, ctx: Context | None = None) -> str: """Execute a shell command after full AIRG policy and approval checks. The command is evaluated against network/workspace containment, command-tier policy, Script Sentinel continuity checks, and optional confirmation gates before execution. """ context_tokens = activate_runtime_context(ctx) refresh_policy_if_changed() affected_paths: list[str] = [] try: ( result, network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval, ) = _evaluate_policy_and_sentinel(command) if result.allowed: affected_paths = extract_paths(command) server_retry_count, final_block = _retry_state(command, result) log_entry = build_log_entry( "execute_command", result, command=command, normalized_command=normalize_for_audit(command), retry_count=retry_count, server_retry_count=server_retry_count, affected_paths_count=len(affected_paths), **({"network_warning": network_warning} if network_warning else {}), **({"shell_containment_warning": shell_containment_warning} if shell_containment_warning else {}), **({"shell_containment_offending_paths": shell_containment_paths} if shell_containment_paths else {}), **_script_sentinel_log_fields(sentinel_eval), **({"final_block": True} if final_block else {}), ) append_log_entry(log_entry) _append_script_sentinel_events(log_entry, sentinel_eval) if not result.allowed: if result.decision_tier == "requires_confirmation": return _requires_confirmation_response(command, result, sentinel_eval) return _blocked_response(result, final_block=final_block, server_retry_count=server_retry_count) _maybe_backup_modifying_command(command, log_entry) return _execute_shell(command) finally: reset_runtime_context(context_tokens) - src/server.py:21-31 (registration)Registers execute_command as an MCP tool via FastMCP.tool() decorator loop in the MCP server entrypoint.
for tool in [ server_info, restore_backup, execute_command, read_file, write_file, edit_file, delete_file, list_directory, ]: mcp.tool()(tool) - src/tools/__init__.py:1-14 (registration)Exports execute_command from tools package, re-exporting from command_tools module.
from .command_tools import execute_command, server_info from .file_tools import delete_file, edit_file, list_directory, read_file, write_file from .restore_tools import restore_backup __all__ = [ "server_info", "execute_command", "read_file", "write_file", "edit_file", "delete_file", "list_directory", "restore_backup", ] - src/tools/command_tools.py:294-306 (helper)_execute_shell helper - runs the shell command via run_shell_command with timeout, truncates output, and returns stdout/stderr.
def _execute_shell(command: str) -> str: timeout_seconds, max_output_chars = execution_limits() try: proc = run_shell_command(command, timeout_seconds) except subprocess.TimeoutExpired: return f"Command timed out after {timeout_seconds} seconds" stdout = truncate_output(proc.stdout or "", max_output_chars) stderr = truncate_output(proc.stderr or "", max_output_chars) if proc.returncode != 0: return stderr or f"Command exited with code {proc.returncode}" return stdout - src/tools/command_tools.py:71-177 (helper)_evaluate_policy_and_sentinel helper - runs all policy checks (control chars, backup targets, network, workspace containment, script sentinel) and returns result.
def _evaluate_policy_and_sentinel( command: str, ) -> tuple[PolicyResult, str | None, str | None, list[str], dict[str, Any]]: network_warning = None shell_containment_warning = None shell_containment_paths: list[str] = [] sentinel_eval: dict[str, Any] = _default_sentinel_eval() if has_shell_unsafe_control_chars(command): return ( PolicyResult( allowed=False, reason="Command contains disallowed control characters (newline, carriage return, or NUL)", decision_tier="blocked", matched_rule="command_control_characters", ), network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval, ) if command_targets_backup_storage(command): return ( PolicyResult( allowed=False, reason="Command targets protected backup storage; use restore_backup for controlled recovery operations", decision_tier="blocked", matched_rule="backup_storage_protected", ), network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval, ) net_allowed, net_reason = network_policy_check(command) mode = str(POLICY.get("network", {}).get("enforcement_mode", "off")).lower() if not net_allowed: return ( PolicyResult( allowed=False, reason=net_reason or "Network command blocked by policy", decision_tier="blocked", matched_rule="network_policy", ), network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval, ) if mode == "monitor" and net_reason: network_warning = net_reason containment_allowed, containment_reason, containment_paths = shell_workspace_containment_check(command) if not containment_allowed: return ( PolicyResult( allowed=False, reason=containment_reason or "Shell workspace containment blocked command.", decision_tier="blocked", matched_rule="execution.shell_workspace_containment", ), network_warning, shell_containment_warning, containment_paths, sentinel_eval, ) if containment_reason: shell_containment_warning = containment_reason shell_containment_paths = containment_paths result = check_policy(command) if not result.allowed: return result, network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval sentinel_eval = script_sentinel.evaluate_command_execution( command, agent_id=AGENT_ID, session_id=current_agent_session_id(), ) sentinel_decision = str(sentinel_eval.get("decision", "allowed")) if sentinel_eval.get("has_hits") and sentinel_decision in {"blocked", "requires_confirmation"}: preview = _script_sentinel_preview(sentinel_eval) if sentinel_decision == "blocked": result = PolicyResult( allowed=False, reason=( "Script Sentinel preserved policy intent: execution of a tagged script artifact " f"is blocked for this agent ({preview})." ), decision_tier="blocked", matched_rule="script_sentinel", ) else: result = PolicyResult( allowed=False, reason=( "Script Sentinel preserved policy intent: execution of a tagged script artifact " f"requires explicit confirmation for this agent ({preview})." ), decision_tier="requires_confirmation", matched_rule="script_sentinel", ) return result, network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval