restore_backup
Restore files from a recorded backup manifest with dry-run planning and token-gated apply for policy-controlled recovery.
Instructions
Restore files from a recorded AIRG backup manifest.
Supports dry-run planning and token-gated apply mode when restore confirmation is required by policy.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| backup_location | Yes | ||
| dry_run | No | ||
| restore_token | No | ||
| ctx | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/tools/restore_tools.py:22-214 (handler)The main handler function for the 'restore_backup' tool. Implements the full restore logic including dry-run planning, token-gated apply mode, manifest validation, file integrity verification via SHA256 hashing and manifest signatures, and actual file restoration via shutil.copy2. Also handles policy checks (backup boundary), audit logging, and runtime context management.
def restore_backup( backup_location: str, dry_run: bool = True, restore_token: str = "", ctx: Context | None = None, ) -> str: """Restore files from a recorded AIRG backup manifest. Supports dry-run planning and token-gated apply mode when restore confirmation is required by policy. """ context_tokens = activate_runtime_context(ctx) backup_path = ( pathlib.Path(backup_location) if os.path.isabs(backup_location) else pathlib.Path(BACKUP_DIR) / backup_location ).resolve() backup_root = pathlib.Path(BACKUP_DIR).resolve() if not backup_path.is_relative_to(backup_root): result = PolicyResult( allowed=False, reason="Backup restore path must be inside BACKUP_DIR", decision_tier="blocked", matched_rule="backup_boundary", ) append_log_entry( build_log_entry( "restore_backup", result, backup_location=str(backup_path), dry_run=dry_run, ) ) return "[POLICY BLOCK] Backup restore path must be inside BACKUP_DIR" try: manifest_path = backup_path / "manifest.json" if not manifest_path.exists(): return f"Error: manifest.json not found in backup: {backup_path}" try: manifest = json.loads(manifest_path.read_text()) except (json.JSONDecodeError, OSError) as e: return f"Error reading backup manifest: {e}" if not isinstance(manifest, list): return "Error: backup manifest is invalid (expected array)" eligible_entries: list[dict] = [] for item in manifest: if not isinstance(item, dict): continue source = item.get("source") backup = item.get("backup") item_type = item.get("type") expected_hash = item.get("sha256") if not source or not backup or not item_type: continue source_path = pathlib.Path(source).resolve() backup_item = pathlib.Path(backup).resolve() if not is_within_workspace(str(source_path)): continue if not backup_item.exists(): continue eligible_entries.append( { "source": source, "backup": backup, "source_path": source_path, "backup_item": backup_item, "item_type": item_type, "expected_hash": expected_hash, "manifest_sig": item.get("manifest_sig"), } ) planned = len(eligible_entries) session_id = current_agent_session_id() require_confirm = bool(POLICY.get("restore", {}).get("require_dry_run_before_apply", True)) if dry_run: response_extra = {} if require_confirm: token, expires_at = issue_restore_confirmation_token( backup_path, planned, session_id=session_id ) response_extra = { "restore_token_issued": token, "restore_token_expires_at": expires_at.isoformat() + "Z", } append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=True, planned=planned, restored=0, hash_failures=0, **response_extra, ) ) msg = f"Restore dry run complete: {planned} item(s) eligible from {backup_path}" if require_confirm: msg += ( f"\nrestore_token={response_extra['restore_token_issued']}" f"\nrestore_token_expires_at={response_extra['restore_token_expires_at']}" ) return msg if require_confirm: ok, reason, matched_rule = consume_restore_confirmation_token( backup_path, restore_token, session_id=session_id ) if not ok: append_log_entry( build_log_entry( "restore_backup", PolicyResult( allowed=False, reason=reason or "Invalid restore token", decision_tier="blocked", matched_rule=matched_rule, ), backup_location=str(backup_path), dry_run=False, restore_token=restore_token, ) ) return f"[POLICY BLOCK] {reason}" restored = 0 hash_failures = 0 signature_failures = 0 for entry in eligible_entries: source_path = entry["source_path"] backup_item = entry["backup_item"] item_type = entry["item_type"] expected_hash = entry["expected_hash"] signature = str(entry.get("manifest_sig", "") or "") source = str(entry.get("source", "") or "") backup = str(entry.get("backup", "") or "") candidate_sig = restore_manifest_signature( {"source": source, "backup": backup, "type": item_type, "sha256": expected_hash or ""} ) try: if not signature or signature != candidate_sig: signature_failures += 1 continue if item_type == "file": if not expected_hash: hash_failures += 1 continue actual_hash = sha256_file(backup_item) if actual_hash != expected_hash: hash_failures += 1 continue source_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(backup_item), str(source_path)) try: restored_content = source_path.read_text(encoding="utf-8", errors="replace") script_sentinel.scan_and_record_write(str(source_path), restored_content) except OSError: pass restored += 1 elif item_type == "directory": source_path.mkdir(parents=True, exist_ok=True) shutil.copytree(str(backup_item), str(source_path), dirs_exist_ok=True) restored += 1 except OSError: continue append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=dry_run, planned=planned, restored=restored, hash_failures=hash_failures, signature_failures=signature_failures, ) ) return ( f"Restore complete from {backup_path}: restored={restored}, planned={planned}, " f"hash_failures={hash_failures}, signature_failures={signature_failures}" ) finally: reset_runtime_context(context_tokens) - src/server.py:21-31 (registration)Registration of restore_backup as an MCP tool. The function is passed to mcp.tool() which decorates it as an MCP tool handler on the 'ai-runtime-guard' FastMCP server.
for tool in [ server_info, restore_backup, execute_command, read_file, write_file, edit_file, delete_file, list_directory, ]: mcp.tool()(tool) - src/tools/__init__.py:3-14 (registration)Re-exports restore_backup from the restore_tools module, making it importable via 'from tools import restore_backup'. Also included in __all__.
from .restore_tools import restore_backup __all__ = [ "server_info", "execute_command", "read_file", "write_file", "edit_file", "delete_file", "list_directory", "restore_backup", ] - src/policy_engine.py:915-924 (helper)Policy engine logic that blocks agent tools (except restore_backup) from accessing protected backup storage paths. This ensures only restore_backup can operate inside BACKUP_DIR.
backup_access = POLICY.get("backup_access", {}) if backup_access.get("block_agent_tools", True) and is_backup_path(path): tool_name = (tool or "").lower() if tool_name != "restore_backup": return ( f"Path '{path}' is inside protected backup storage and is not accessible via {tool or 'this tool'}", "backup_storage_protected", ) return None - src/tools/command_tools.py:93-105 (helper)Command tool policy enforcement that blocks shell commands targeting backup storage, directing users to use restore_backup for controlled recovery.
if command_targets_backup_storage(command): return ( PolicyResult( allowed=False, reason="Command targets protected backup storage; use restore_backup for controlled recovery operations", decision_tier="blocked", matched_rule="backup_storage_protected", ), network_warning, shell_containment_warning, shell_containment_paths, sentinel_eval, )