Skip to main content
Glama

restore_backup

Restore files from a recorded backup manifest with dry-run planning and token-gated apply for policy-controlled recovery.

Instructions

Restore files from a recorded AIRG backup manifest.

Supports dry-run planning and token-gated apply mode when restore confirmation is required by policy.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
backup_locationYes
dry_runNo
restore_tokenNo
ctxNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The main handler function for the 'restore_backup' tool. Implements the full restore logic including dry-run planning, token-gated apply mode, manifest validation, file integrity verification via SHA256 hashing and manifest signatures, and actual file restoration via shutil.copy2. Also handles policy checks (backup boundary), audit logging, and runtime context management.
    def restore_backup(
        backup_location: str,
        dry_run: bool = True,
        restore_token: str = "",
        ctx: Context | None = None,
    ) -> str:
        """Restore files from a recorded AIRG backup manifest.
    
        Supports dry-run planning and token-gated apply mode when restore
        confirmation is required by policy.
        """
        context_tokens = activate_runtime_context(ctx)
        backup_path = (
            pathlib.Path(backup_location)
            if os.path.isabs(backup_location)
            else pathlib.Path(BACKUP_DIR) / backup_location
        ).resolve()
        backup_root = pathlib.Path(BACKUP_DIR).resolve()
        if not backup_path.is_relative_to(backup_root):
            result = PolicyResult(
                allowed=False,
                reason="Backup restore path must be inside BACKUP_DIR",
                decision_tier="blocked",
                matched_rule="backup_boundary",
            )
            append_log_entry(
                build_log_entry(
                    "restore_backup",
                    result,
                    backup_location=str(backup_path),
                    dry_run=dry_run,
                )
            )
            return "[POLICY BLOCK] Backup restore path must be inside BACKUP_DIR"
    
        try:
            manifest_path = backup_path / "manifest.json"
            if not manifest_path.exists():
                return f"Error: manifest.json not found in backup: {backup_path}"
    
            try:
                manifest = json.loads(manifest_path.read_text())
            except (json.JSONDecodeError, OSError) as e:
                return f"Error reading backup manifest: {e}"
    
            if not isinstance(manifest, list):
                return "Error: backup manifest is invalid (expected array)"
    
            eligible_entries: list[dict] = []
            for item in manifest:
                if not isinstance(item, dict):
                    continue
                source = item.get("source")
                backup = item.get("backup")
                item_type = item.get("type")
                expected_hash = item.get("sha256")
                if not source or not backup or not item_type:
                    continue
                source_path = pathlib.Path(source).resolve()
                backup_item = pathlib.Path(backup).resolve()
                if not is_within_workspace(str(source_path)):
                    continue
                if not backup_item.exists():
                    continue
    
                eligible_entries.append(
                    {
                        "source": source,
                        "backup": backup,
                        "source_path": source_path,
                        "backup_item": backup_item,
                        "item_type": item_type,
                        "expected_hash": expected_hash,
                        "manifest_sig": item.get("manifest_sig"),
                    }
                )
    
            planned = len(eligible_entries)
            session_id = current_agent_session_id()
    
            require_confirm = bool(POLICY.get("restore", {}).get("require_dry_run_before_apply", True))
            if dry_run:
                response_extra = {}
                if require_confirm:
                    token, expires_at = issue_restore_confirmation_token(
                        backup_path, planned, session_id=session_id
                    )
                    response_extra = {
                        "restore_token_issued": token,
                        "restore_token_expires_at": expires_at.isoformat() + "Z",
                    }
                append_log_entry(
                    build_log_entry(
                        "restore_backup",
                        PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None),
                        backup_location=str(backup_path),
                        dry_run=True,
                        planned=planned,
                        restored=0,
                        hash_failures=0,
                        **response_extra,
                    )
                )
    
                msg = f"Restore dry run complete: {planned} item(s) eligible from {backup_path}"
                if require_confirm:
                    msg += (
                        f"\nrestore_token={response_extra['restore_token_issued']}"
                        f"\nrestore_token_expires_at={response_extra['restore_token_expires_at']}"
                    )
                return msg
    
            if require_confirm:
                ok, reason, matched_rule = consume_restore_confirmation_token(
                    backup_path, restore_token, session_id=session_id
                )
                if not ok:
                    append_log_entry(
                        build_log_entry(
                            "restore_backup",
                            PolicyResult(
                                allowed=False,
                                reason=reason or "Invalid restore token",
                                decision_tier="blocked",
                                matched_rule=matched_rule,
                            ),
                            backup_location=str(backup_path),
                            dry_run=False,
                            restore_token=restore_token,
                        )
                    )
                    return f"[POLICY BLOCK] {reason}"
    
            restored = 0
            hash_failures = 0
            signature_failures = 0
            for entry in eligible_entries:
                source_path = entry["source_path"]
                backup_item = entry["backup_item"]
                item_type = entry["item_type"]
                expected_hash = entry["expected_hash"]
                signature = str(entry.get("manifest_sig", "") or "")
                source = str(entry.get("source", "") or "")
                backup = str(entry.get("backup", "") or "")
                candidate_sig = restore_manifest_signature(
                    {"source": source, "backup": backup, "type": item_type, "sha256": expected_hash or ""}
                )
                try:
                    if not signature or signature != candidate_sig:
                        signature_failures += 1
                        continue
                    if item_type == "file":
                        if not expected_hash:
                            hash_failures += 1
                            continue
                        actual_hash = sha256_file(backup_item)
                        if actual_hash != expected_hash:
                            hash_failures += 1
                            continue
                        source_path.parent.mkdir(parents=True, exist_ok=True)
                        shutil.copy2(str(backup_item), str(source_path))
                        try:
                            restored_content = source_path.read_text(encoding="utf-8", errors="replace")
                            script_sentinel.scan_and_record_write(str(source_path), restored_content)
                        except OSError:
                            pass
                        restored += 1
                    elif item_type == "directory":
                        source_path.mkdir(parents=True, exist_ok=True)
                        shutil.copytree(str(backup_item), str(source_path), dirs_exist_ok=True)
                        restored += 1
                except OSError:
                    continue
    
            append_log_entry(
                build_log_entry(
                    "restore_backup",
                    PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None),
                    backup_location=str(backup_path),
                    dry_run=dry_run,
                    planned=planned,
                    restored=restored,
                    hash_failures=hash_failures,
                    signature_failures=signature_failures,
                )
            )
    
            return (
                f"Restore complete from {backup_path}: restored={restored}, planned={planned}, "
                f"hash_failures={hash_failures}, signature_failures={signature_failures}"
            )
        finally:
            reset_runtime_context(context_tokens)
  • src/server.py:21-31 (registration)
    Registration of restore_backup as an MCP tool. The function is passed to mcp.tool() which decorates it as an MCP tool handler on the 'ai-runtime-guard' FastMCP server.
    for tool in [
        server_info,
        restore_backup,
        execute_command,
        read_file,
        write_file,
        edit_file,
        delete_file,
        list_directory,
    ]:
        mcp.tool()(tool)
  • Re-exports restore_backup from the restore_tools module, making it importable via 'from tools import restore_backup'. Also included in __all__.
    from .restore_tools import restore_backup
    
    __all__ = [
        "server_info",
        "execute_command",
        "read_file",
        "write_file",
        "edit_file",
        "delete_file",
        "list_directory",
        "restore_backup",
    ]
  • Policy engine logic that blocks agent tools (except restore_backup) from accessing protected backup storage paths. This ensures only restore_backup can operate inside BACKUP_DIR.
    backup_access = POLICY.get("backup_access", {})
    if backup_access.get("block_agent_tools", True) and is_backup_path(path):
        tool_name = (tool or "").lower()
        if tool_name != "restore_backup":
            return (
                f"Path '{path}' is inside protected backup storage and is not accessible via {tool or 'this tool'}",
                "backup_storage_protected",
            )
    
    return None
  • Command tool policy enforcement that blocks shell commands targeting backup storage, directing users to use restore_backup for controlled recovery.
    if command_targets_backup_storage(command):
        return (
            PolicyResult(
                allowed=False,
                reason="Command targets protected backup storage; use restore_backup for controlled recovery operations",
                decision_tier="blocked",
                matched_rule="backup_storage_protected",
            ),
            network_warning,
            shell_containment_warning,
            shell_containment_paths,
            sentinel_eval,
        )
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations, the description must disclose behavioral traits. It mentions dry-run vs apply modes and token requirement, but omits side effects (overwriting), idempotency, required permissions, or error conditions.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is two sentences, front-loading the purpose in the first sentence and adding key details in the second. No redundant information.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Despite an output schema, the description fails to explain the required 'backup_location' parameter or the result of a successful restore. It only addresses the dry-run and token features, leaving significant gaps for a recovery tool.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters2/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so the description must explain parameters. It partially covers dry_run and restore_token via 'dry-run' and 'token-gated apply', but the required 'backup_location' and 'ctx' are not described at all.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool restores files from an AIRG backup manifest, which is a specific verb and resource. The sibling tools are all file/command operations, making this restore tool distinct.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

It mentions dry-run planning and token-gated apply mode, providing some guidance on when to use each mode. However, it lacks explicit instructions on when not to use this tool or comparison with alternatives.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jimmyracheta/ai-runtime-guard'

If you have feedback or need assistance with the MCP directory API, please join our Discord server