restore_backup
Restore files from a recorded backup manifest, with dry-run planning and token-gated apply mode for policy-compliant recovery.
Instructions
Restore files from a recorded AIRG backup manifest.
Supports dry-run planning and token-gated apply mode when restore confirmation is required by policy.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| backup_location | Yes | ||
| dry_run | No | ||
| restore_token | No | ||
| ctx | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/tools/restore_tools.py:22-214 (handler)The main handler function for the 'restore_backup' tool. It validates the backup path is within BACKUP_DIR, loads the manifest, filters eligible entries (within workspace, existing backup items), supports dry-run mode (optionally issuing a restore confirmation token), and in apply mode validates the token, verifies manifest signatures and SHA256 hashes, then copies files/directories back to their original locations. It logs all operations via the audit system.
def restore_backup( backup_location: str, dry_run: bool = True, restore_token: str = "", ctx: Context | None = None, ) -> str: """Restore files from a recorded AIRG backup manifest. Supports dry-run planning and token-gated apply mode when restore confirmation is required by policy. """ context_tokens = activate_runtime_context(ctx) backup_path = ( pathlib.Path(backup_location) if os.path.isabs(backup_location) else pathlib.Path(BACKUP_DIR) / backup_location ).resolve() backup_root = pathlib.Path(BACKUP_DIR).resolve() if not backup_path.is_relative_to(backup_root): result = PolicyResult( allowed=False, reason="Backup restore path must be inside BACKUP_DIR", decision_tier="blocked", matched_rule="backup_boundary", ) append_log_entry( build_log_entry( "restore_backup", result, backup_location=str(backup_path), dry_run=dry_run, ) ) return "[POLICY BLOCK] Backup restore path must be inside BACKUP_DIR" try: manifest_path = backup_path / "manifest.json" if not manifest_path.exists(): return f"Error: manifest.json not found in backup: {backup_path}" try: manifest = json.loads(manifest_path.read_text()) except (json.JSONDecodeError, OSError) as e: return f"Error reading backup manifest: {e}" if not isinstance(manifest, list): return "Error: backup manifest is invalid (expected array)" eligible_entries: list[dict] = [] for item in manifest: if not isinstance(item, dict): continue source = item.get("source") backup = item.get("backup") item_type = item.get("type") expected_hash = item.get("sha256") if not source or not backup or not item_type: continue source_path = pathlib.Path(source).resolve() backup_item = pathlib.Path(backup).resolve() if not is_within_workspace(str(source_path)): continue if not backup_item.exists(): continue eligible_entries.append( { "source": source, "backup": backup, "source_path": source_path, "backup_item": backup_item, "item_type": item_type, "expected_hash": expected_hash, "manifest_sig": item.get("manifest_sig"), } ) planned = len(eligible_entries) session_id = current_agent_session_id() require_confirm = bool(POLICY.get("restore", {}).get("require_dry_run_before_apply", True)) if dry_run: response_extra = {} if require_confirm: token, expires_at = issue_restore_confirmation_token( backup_path, planned, session_id=session_id ) response_extra = { "restore_token_issued": token, "restore_token_expires_at": expires_at.isoformat() + "Z", } append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=True, planned=planned, restored=0, hash_failures=0, **response_extra, ) ) msg = f"Restore dry run complete: {planned} item(s) eligible from {backup_path}" if require_confirm: msg += ( f"\nrestore_token={response_extra['restore_token_issued']}" f"\nrestore_token_expires_at={response_extra['restore_token_expires_at']}" ) return msg if require_confirm: ok, reason, matched_rule = consume_restore_confirmation_token( backup_path, restore_token, session_id=session_id ) if not ok: append_log_entry( build_log_entry( "restore_backup", PolicyResult( allowed=False, reason=reason or "Invalid restore token", decision_tier="blocked", matched_rule=matched_rule, ), backup_location=str(backup_path), dry_run=False, restore_token=restore_token, ) ) return f"[POLICY BLOCK] {reason}" restored = 0 hash_failures = 0 signature_failures = 0 for entry in eligible_entries: source_path = entry["source_path"] backup_item = entry["backup_item"] item_type = entry["item_type"] expected_hash = entry["expected_hash"] signature = str(entry.get("manifest_sig", "") or "") source = str(entry.get("source", "") or "") backup = str(entry.get("backup", "") or "") candidate_sig = restore_manifest_signature( {"source": source, "backup": backup, "type": item_type, "sha256": expected_hash or ""} ) try: if not signature or signature != candidate_sig: signature_failures += 1 continue if item_type == "file": if not expected_hash: hash_failures += 1 continue actual_hash = sha256_file(backup_item) if actual_hash != expected_hash: hash_failures += 1 continue source_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(backup_item), str(source_path)) try: restored_content = source_path.read_text(encoding="utf-8", errors="replace") script_sentinel.scan_and_record_write(str(source_path), restored_content) except OSError: pass restored += 1 elif item_type == "directory": source_path.mkdir(parents=True, exist_ok=True) shutil.copytree(str(backup_item), str(source_path), dirs_exist_ok=True) restored += 1 except OSError: continue append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=dry_run, planned=planned, restored=restored, hash_failures=hash_failures, signature_failures=signature_failures, ) ) return ( f"Restore complete from {backup_path}: restored={restored}, planned={planned}, " f"hash_failures={hash_failures}, signature_failures={signature_failures}" ) finally: reset_runtime_context(context_tokens) - src/server.py:19-31 (registration)The MCP server registers 'restore_backup' as a tool by iterating over all tool functions and calling mcp.tool()(tool) on line 31, including restore_backup.
mcp = FastMCP("ai-runtime-guard") for tool in [ server_info, restore_backup, execute_command, read_file, write_file, edit_file, delete_file, list_directory, ]: mcp.tool()(tool) - src/tools/__init__.py:1-14 (registration)The tools package __init__.py imports restore_backup from .restore_tools and re-exports it in __all__.
from .command_tools import execute_command, server_info from .file_tools import delete_file, edit_file, list_directory, read_file, write_file from .restore_tools import restore_backup __all__ = [ "server_info", "execute_command", "read_file", "write_file", "edit_file", "delete_file", "list_directory", "restore_backup", ] - src/tools/restore_tools.py:22-27 (schema)The function signature defines the input schema: backup_location (str), dry_run (bool, default True), restore_token (str, default ''), and ctx (optional Context). It returns a str result.
def restore_backup( backup_location: str, dry_run: bool = True, restore_token: str = "", ctx: Context | None = None, ) -> str: - src/policy_engine.py:915-924 (helper)Policy engine: when backup_access blocks agent tools, the only allowed tool for backup storage paths is 'restore_backup', ensuring only controlled recovery operations can access backup data.
backup_access = POLICY.get("backup_access", {}) if backup_access.get("block_agent_tools", True) and is_backup_path(path): tool_name = (tool or "").lower() if tool_name != "restore_backup": return ( f"Path '{path}' is inside protected backup storage and is not accessible via {tool or 'this tool'}", "backup_storage_protected", ) return None