Skip to main content
Glama
ingeno
by ingeno
storage.py6.23 kB
"""Key-value storage utilities for persistent data management.""" from __future__ import annotations import json from pathlib import Path from typing import Any, Protocol import pydantic_core from fastmcp.utilities.logging import get_logger logger = get_logger(__name__) class KVStorage(Protocol): """Protocol for key-value storage of JSON data.""" async def get(self, key: str) -> dict[str, Any] | None: """Get a JSON dict by key.""" ... async def set(self, key: str, value: dict[str, Any]) -> None: """Store a JSON dict by key.""" ... async def delete(self, key: str) -> None: """Delete a value by key.""" ... class JSONFileStorage: """File-based key-value storage for JSON data with automatic metadata tracking. Each key-value pair is stored as a separate JSON file on disk. Keys are sanitized to be filesystem-safe. The storage automatically wraps all data with metadata: - timestamp: Timestamp when the entry was last written Args: cache_dir: Directory for storing JSON files """ def __init__(self, cache_dir: Path): """Initialize JSON file storage.""" self.cache_dir = cache_dir self.cache_dir.mkdir(exist_ok=True, parents=True) def _get_safe_key(self, key: str) -> str: """Convert key to filesystem-safe string.""" safe_key = key # Replace problematic characters with underscores for char in [".", "/", "\\", ":", "*", "?", '"', "<", ">", "|", " "]: safe_key = safe_key.replace(char, "_") # Compress multiple underscores into one while "__" in safe_key: safe_key = safe_key.replace("__", "_") # Strip leading and trailing underscores safe_key = safe_key.strip("_") return safe_key def _get_file_path(self, key: str) -> Path: """Get the file path for a given key.""" safe_key = self._get_safe_key(key) return self.cache_dir / f"{safe_key}.json" async def get(self, key: str) -> dict[str, Any] | None: """Get a JSON dict from storage by key. Args: key: The key to retrieve Returns: The stored dict or None if not found """ path = self._get_file_path(key) try: wrapper = json.loads(path.read_text()) # Expect wrapped format with metadata if not isinstance(wrapper, dict) or "data" not in wrapper: logger.warning(f"Invalid storage format for key '{key}'") return None logger.debug(f"Loaded data for key '{key}'") return wrapper["data"] except FileNotFoundError: logger.debug(f"No data found for key '{key}'") return None except json.JSONDecodeError as e: logger.warning(f"Failed to load data for key '{key}': {e}") return None async def set(self, key: str, value: dict[str, Any]) -> None: """Store a JSON dict with metadata. Args: key: The key to store under value: The dict to store """ import time path = self._get_file_path(key) current_time = time.time() # Create wrapper with metadata wrapper = { "data": value, "timestamp": current_time, } # Use pydantic_core for consistent JSON serialization json_data = pydantic_core.to_json(wrapper, fallback=str) path.write_bytes(json_data) logger.debug(f"Saved data for key '{key}'") async def delete(self, key: str) -> None: """Delete a value from storage. Args: key: The key to delete """ path = self._get_file_path(key) if path.exists(): path.unlink() logger.debug(f"Deleted data for key '{key}'") async def cleanup_old_entries( self, max_age_seconds: int = 30 * 24 * 60 * 60, # 30 days default ) -> int: """Remove entries older than the specified age. Uses the timestamp field to determine age. Args: max_age_seconds: Maximum age in seconds (default 30 days) Returns: Number of entries removed """ import time current_time = time.time() removed_count = 0 for json_file in self.cache_dir.glob("*.json"): try: # Read the file and check timestamp wrapper = json.loads(json_file.read_text()) # Check wrapped format if not isinstance(wrapper, dict) or "data" not in wrapper: continue # Invalid format, skip if "timestamp" not in wrapper: continue # No timestamp field, skip entry_age = current_time - wrapper["timestamp"] if entry_age > max_age_seconds: json_file.unlink() removed_count += 1 logger.debug( f"Removed old entry '{json_file.stem}' (age: {entry_age:.0f}s)" ) except (json.JSONDecodeError, KeyError) as e: logger.debug(f"Error reading {json_file.name}: {e}") continue if removed_count > 0: logger.info(f"Cleaned up {removed_count} old entries from storage") return removed_count class InMemoryStorage: """In-memory key-value storage for JSON data. Simple dict-based storage that doesn't persist across restarts. Useful for testing or environments where file storage isn't available. """ def __init__(self): """Initialize in-memory storage.""" self._data: dict[str, dict[str, Any]] = {} async def get(self, key: str) -> dict[str, Any] | None: """Get a JSON dict from memory by key.""" return self._data.get(key) async def set(self, key: str, value: dict[str, Any]) -> None: """Store a JSON dict in memory.""" self._data[key] = value async def delete(self, key: str) -> None: """Delete a value from memory.""" self._data.pop(key, None)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ingeno/mcp-openapi-lambda'

If you have feedback or need assistance with the MCP directory API, please join our Discord server