Skip to main content
Glama
StorageManager.py3.19 kB
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev> # This file is part of Archive Agent. See LICENSE for details. import typer from logging import Logger import json import shutil from pathlib import Path from copy import deepcopy from typing import Dict, Any from abc import ABC, abstractmethod from archive_agent.util.format import format_file class StorageManager(ABC): """ Storage manager. """ def __init__(self, logger: Logger, file_path: Path, default: Dict[str, Any]) -> None: """ Initialize storage manager. :param file_path: File path. :param default: Default data. """ self.logger = logger self.file_path = file_path self.default = default self.data: Dict[str, Any] = {} self.load_or_create() def load_or_create(self) -> None: """ Load or create file. """ try: if not self.file_path.exists(): self.create() else: self.load() except Exception as e: self.logger.exception(f"Failed to load {format_file(self.file_path)}: {e}") raise typer.Exit(code=1) def create(self) -> None: """ Create file. """ self.data = deepcopy(self.default) self.save() self.logger.info(f"Created default {format_file(self.file_path)}") def load(self) -> None: """ Load file. """ with open(self.file_path, "r", encoding="utf-8") as f: self.data = json.load(f) upgraded = self.upgrade() missing_keys = self.default.keys() - self.data.keys() if missing_keys: self.logger.error(f"Missing keys in {format_file(self.file_path)}: {missing_keys}") raise typer.Exit(code=1) if not self.validate(): self.logger.error(f"Invalid data in {format_file(self.file_path)}") raise typer.Exit(code=1) self.logger.debug(f"Loaded existing {format_file(self.file_path)}") if upgraded: self.logger.debug(f"Upgraded existing {format_file(self.file_path)}") self.save() def save(self) -> None: """ Save file (atomic write). """ if not self.validate(): self.logger.error(f"Invalid data in {format_file(self.file_path)}") raise typer.Exit(code=1) self.file_path.parent.mkdir(parents=True, exist_ok=True) temp_path = self.file_path.with_suffix(".tmp") with open(temp_path, "w", encoding="utf-8") as f: # noinspection PyTypeChecker json.dump(self.data, f, indent=4) shutil.move(temp_path, self.file_path) self.logger.debug(f"Saved {format_file(self.file_path)}") @abstractmethod def upgrade(self) -> bool: """ Upgrade data. :return: True if data upgraded, False otherwise. """ raise NotImplementedError @abstractmethod def validate(self) -> bool: """ Validate data. :return: True if data is valid, False otherwise. """ raise NotImplementedError

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shredEngineer/Archive-Agent'

If you have feedback or need assistance with the MCP directory API, please join our Discord server