restore_backup
Restore system backups to recover from data loss or corruption. Specify backup location and use dry run to preview changes before applying restoration.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| backup_location | Yes | ||
| dry_run | No | ||
| restore_token | No | ||
| ctx | No |
Implementation Reference
- src/tools/restore_tools.py:21-180 (handler)The main function executing the restore_backup logic, including path validation, dry-run checks, token verification, and file system operations.
def restore_backup( backup_location: str, dry_run: bool = True, restore_token: str = "", ctx: Context | None = None, ) -> str: context_tokens = activate_runtime_context(ctx) backup_path = ( pathlib.Path(backup_location) if os.path.isabs(backup_location) else pathlib.Path(BACKUP_DIR) / backup_location ).resolve() backup_root = pathlib.Path(BACKUP_DIR).resolve() if not backup_path.is_relative_to(backup_root): result = PolicyResult( allowed=False, reason="Backup restore path must be inside BACKUP_DIR", decision_tier="blocked", matched_rule="backup_boundary", ) append_log_entry( build_log_entry( "restore_backup", result, backup_location=str(backup_path), dry_run=dry_run, ) ) return "[POLICY BLOCK] Backup restore path must be inside BACKUP_DIR" try: manifest_path = backup_path / "manifest.json" if not manifest_path.exists(): return f"Error: manifest.json not found in backup: {backup_path}" try: manifest = json.loads(manifest_path.read_text()) except (json.JSONDecodeError, OSError) as e: return f"Error reading backup manifest: {e}" if not isinstance(manifest, list): return "Error: backup manifest is invalid (expected array)" eligible_entries: list[dict] = [] for item in manifest: if not isinstance(item, dict): continue source = item.get("source") backup = item.get("backup") item_type = item.get("type") expected_hash = item.get("sha256") if not source or not backup or not item_type: continue source_path = pathlib.Path(source).resolve() backup_item = pathlib.Path(backup).resolve() if not is_within_workspace(str(source_path)): continue if not backup_item.exists(): continue eligible_entries.append( { "source_path": source_path, "backup_item": backup_item, "item_type": item_type, "expected_hash": expected_hash, } ) planned = len(eligible_entries) require_confirm = bool(POLICY.get("restore", {}).get("require_dry_run_before_apply", True)) if dry_run: response_extra = {} if require_confirm: token, expires_at = issue_restore_confirmation_token(backup_path, planned) response_extra = { "restore_token_issued": token, "restore_token_expires_at": expires_at.isoformat() + "Z", } append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=True, planned=planned, restored=0, hash_failures=0, **response_extra, ) ) msg = f"Restore dry run complete: {planned} item(s) eligible from {backup_path}" if require_confirm: msg += ( f"\nrestore_token={response_extra['restore_token_issued']}" f"\nrestore_token_expires_at={response_extra['restore_token_expires_at']}" ) return msg if require_confirm: ok, reason, matched_rule = consume_restore_confirmation_token(backup_path, restore_token) if not ok: append_log_entry( build_log_entry( "restore_backup", PolicyResult( allowed=False, reason=reason or "Invalid restore token", decision_tier="blocked", matched_rule=matched_rule, ), backup_location=str(backup_path), dry_run=False, restore_token=restore_token, ) ) return f"[POLICY BLOCK] {reason}" restored = 0 hash_failures = 0 for entry in eligible_entries: source_path = entry["source_path"] backup_item = entry["backup_item"] item_type = entry["item_type"] expected_hash = entry["expected_hash"] try: if item_type == "file": if expected_hash: actual_hash = sha256_file(backup_item) if actual_hash != expected_hash: hash_failures += 1 continue source_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(str(backup_item), str(source_path)) restored += 1 elif item_type == "directory": source_path.mkdir(parents=True, exist_ok=True) shutil.copytree(str(backup_item), str(source_path), dirs_exist_ok=True) restored += 1 except OSError: continue append_log_entry( build_log_entry( "restore_backup", PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None), backup_location=str(backup_path), dry_run=dry_run, planned=planned, restored=restored, hash_failures=hash_failures, ) ) return f"Restore complete from {backup_path}: restored={restored}, planned={planned}, hash_failures={hash_failures}" finally: reset_runtime_context(context_tokens)