Skip to main content
Glama
storage.py15.8 kB
"""Storage layer for living reports with atomic operations and locking. This module provides the filesystem operations for living reports, ensuring atomic writes, crash recovery, and cross-platform file locking. ATOMICITY GUARANTEE: All write operations follow the temp-file + atomic rename pattern: 1. Write to {path}.tmp 2. fsync the temp file 3. Atomic rename {path}.tmp -> {path} 4. Best-effort directory fsync for durability This guarantees that: - Writes are atomic (all-or-nothing) - No partial state is visible to readers - Crash recovery finds either old or new state, never partial CONCURRENCY CONTROL: - Per-report file locks via portalocker (or file existence fallback) - Operations acquire lock via storage.lock() context manager - Lock timeout: 30 seconds (configurable) - Deadlock prevention: Always acquire single report lock only RECOVERY PATTERN: If index.jsonl becomes inconsistent: 1. Call ReportIndex.rebuild_from_filesystem() 2. Scans by_id/* directories 3. Reconstructs index from outline.json files 4. Audit logs remain intact for full history """ from __future__ import annotations import contextlib import datetime import json import os import uuid from collections.abc import Generator from contextlib import contextmanager, suppress from pathlib import Path from typing import Any try: import portalocker HAS_PORTALOCKER = True except ImportError: HAS_PORTALOCKER = False import logging from .models import AuditEvent, Outline logger = logging.getLogger(__name__) class StorageError(RuntimeError): """Base exception for storage-related errors.""" class LockTimeoutError(StorageError): """Raised when unable to acquire a file lock within timeout.""" class ReportLock: """File-based locking for report operations. Uses portalocker for cross-platform advisory file locking. Falls back to basic file existence checks if portalocker unavailable. """ def __init__(self, lock_path: Path, timeout_seconds: float = 30.0) -> None: """Initialize lock. Args: lock_path: Path to lock file timeout_seconds: Maximum time to wait for lock acquisition """ self.lock_path = lock_path self.timeout_seconds = timeout_seconds self._lock_file: Any | None = None def acquire(self) -> None: """Acquire the lock. Raises: LockTimeoutError: If lock cannot be acquired within timeout RuntimeError: If portalocker not available and lock file exists """ if not HAS_PORTALOCKER: # Fallback: check file existence if self.lock_path.exists(): raise RuntimeError(f"Lock file exists: {self.lock_path}") try: self.lock_path.write_text("") self._lock_file = self.lock_path except Exception as e: raise RuntimeError(f"Failed to create lock file: {e}") from e return try: self._lock_file = open(self.lock_path, "w") # noqa: SIM115 - Need to keep file open for locking portalocker.lock( self._lock_file, portalocker.LOCK_EX | portalocker.LOCK_NB, timeout=self.timeout_seconds, ) except portalocker.LockException as e: raise LockTimeoutError(f"Failed to acquire lock within {self.timeout_seconds}s: {e}") from e except Exception as e: if self._lock_file: self._lock_file.close() self._lock_file = None raise RuntimeError(f"Lock acquisition failed: {e}") from e def release(self) -> None: """Release the lock.""" if not HAS_PORTALOCKER: if self._lock_file and self._lock_file.exists(): with contextlib.suppress(Exception): # Best effort cleanup self._lock_file.unlink() self._lock_file = None return if self._lock_file: try: portalocker.unlock(self._lock_file) self._lock_file.close() except Exception: pass # Best effort cleanup finally: self._lock_file = None def __enter__(self) -> ReportLock: """Context manager entry.""" self.acquire() return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Context manager exit.""" self.release() class ReportStorage: """Storage operations for a single report. Handles atomic writes, backups, and audit logging for report operations. """ def __init__(self, report_dir: Path) -> None: """Initialize storage for a report. Args: report_dir: Directory containing the report files """ self.report_dir = report_dir self.outline_path = report_dir / "outline.json" self.audit_path = report_dir / "audit.jsonl" self.backups_dir = report_dir / "backups" self.lock_path = report_dir / ".lock" # Create directories if they don't exist # Ensure report_dir exists first, then create backups subdirectory self.report_dir.mkdir(parents=True, exist_ok=True) self.backups_dir.mkdir(parents=True, exist_ok=True) @contextmanager def lock(self, timeout_seconds: float = 30.0) -> Generator[ReportLock, None, None]: """Context manager for report locking. Args: timeout_seconds: Lock acquisition timeout Yields: ReportLock instance """ lock = ReportLock(self.lock_path, timeout_seconds) try: with lock: yield lock except Exception: raise def load_outline(self) -> Outline: """Load outline from disk. Returns: Parsed Outline object Raises: FileNotFoundError: If outline.json doesn't exist ValueError: If outline is invalid """ if not self.outline_path.exists(): raise FileNotFoundError(f"Outline not found: {self.outline_path}") try: raw = self.outline_path.read_text(encoding="utf-8") data = json.loads(raw) return Outline(**data) except (json.JSONDecodeError, ValueError) as e: raise ValueError(f"Invalid outline at {self.outline_path}: {e}") from e def _save_outline_atomic(self, outline: Outline) -> str | None: """Atomically save outline to disk with backup. Args: outline: Outline to save Returns: Backup filename if created, None otherwise Raises: StorageError: If save operation fails """ # Create backup and track filename backup_filename = None if self.outline_path.exists(): backup_filename = self._create_backup() # Write to temporary file first temp_path = self.outline_path.with_suffix(".tmp") try: # Serialize and write to temp file data = outline.model_dump(by_alias=True) raw = json.dumps(data, indent=2, ensure_ascii=False) with temp_path.open("w", encoding="utf-8") as f: f.write(raw) f.flush() with contextlib.suppress(OSError): # Best-effort fsync; some filesystems may not support it os.fsync(f.fileno()) # Atomic rename temp_path.replace(self.outline_path) # Best-effort directory sync for durability try: dir_fd = os.open(str(self.report_dir), os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) except (OSError, AttributeError): # os.fsync on directories may not be supported on all platforms pass return backup_filename except Exception as e: # Clean up temp file on failure with suppress(Exception): temp_path.unlink(missing_ok=True) raise StorageError(f"Failed to save outline: {e}") from e finally: # Ensure temp file is cleaned up with suppress(Exception): temp_path.unlink(missing_ok=True) def save_outline(self, outline: Outline) -> None: """Atomically save outline and record a backup audit event. This method is intended for low-level callers; higher-level services that manage their own audit events should use _save_outline_atomic. """ self._save_outline_atomic(outline) # If an audit log already exists, append a backup event if self.audit_path.exists(): try: event = AuditEvent( action_id=str(uuid.uuid4()), report_id=outline.report_id, ts=datetime.datetime.now(datetime.UTC).isoformat(), actor="cli", action_type="backup", payload={"reason": "outline_save"}, ) self.append_audit_event(event) except Exception: # Best-effort; do not fail saves on audit issues pass def _create_backup(self) -> str | None: """Create timestamped backup of current outline. Returns: Backup filename (e.g., 'outline.json.20231023T143022_123456.bak') or None if no backup created """ if not self.outline_path.exists(): return None import datetime now = datetime.datetime.now(datetime.UTC) # Include microseconds to avoid filename collisions within the same second timestamp_clean = now.strftime("%Y%m%dT%H%M%S") + f"_{now.microsecond:06d}" backup_filename = f"outline.json.{timestamp_clean}.bak" backup_path = self.backups_dir / backup_filename try: import shutil shutil.copy2(self.outline_path, backup_path) return backup_filename except Exception: # Best-effort backup - don't fail writes if backup fails return None def append_audit_event(self, event: AuditEvent) -> None: """Append audit event to audit log. Args: event: Audit event to append Raises: StorageError: If append operation fails """ temp_path = self.audit_path.with_suffix(".tmp") try: # Serialize event data = event.model_dump() line = json.dumps(data, ensure_ascii=False) + "\n" # Write to temp file with temp_path.open("a", encoding="utf-8") as f: f.write(line) f.flush() os.fsync(f.fileno()) # Atomic rename if self.audit_path.exists(): # Append to existing file with temp_path.open("rb") as src, self.audit_path.open("ab") as dst: src.seek(0) dst.write(src.read()) dst.flush() os.fsync(dst.fileno()) temp_path.unlink() else: temp_path.replace(self.audit_path) # Best-effort directory sync; may not be available on all platforms try: dir_fd = os.open(str(self.report_dir), os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) except (OSError, AttributeError): pass except Exception as e: with suppress(Exception): temp_path.unlink(missing_ok=True) raise StorageError(f"Failed to append audit event: {e}") from e def load_audit_events(self) -> list[AuditEvent]: """Load all audit events for this report. Returns: List of audit events in chronological order """ if not self.audit_path.exists(): return [] events = [] try: with self.audit_path.open("r", encoding="utf-8") as f: for _line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: data = json.loads(line) event = AuditEvent(**data) events.append(event) except Exception as e: # Skip malformed lines but continue logger.debug(f"Skipping malformed audit log line: {e}") continue except Exception: # Return what we could parse pass return events def detect_manual_edits(self) -> bool: """Check if outline.json appears to have been manually edited. Returns: True if manual edits are detected """ if not self.outline_path.exists(): return False # Simple heuristic: check if file is valid JSON but invalid Outline try: raw = self.outline_path.read_text(encoding="utf-8") data = json.loads(raw) Outline(**data) return False except (json.JSONDecodeError, ValueError): return True class GlobalStorage: """Global storage operations for the reports system.""" def __init__(self, reports_root: Path) -> None: """Initialize global storage. Args: reports_root: Root directory for all reports """ self.reports_root = reports_root self.index_path = reports_root / "index.jsonl" self.index_path.parent.mkdir(parents=True, exist_ok=True) def get_report_storage(self, report_id: str) -> ReportStorage: """Get storage instance for a specific report. Args: report_id: Report identifier Returns: ReportStorage instance """ report_dir = self.reports_root / "by_id" / report_id return ReportStorage(report_dir) def save_index_entry(self, entry: dict[str, Any]) -> None: """Atomically append index entry. Args: entry: Index entry data """ temp_path = self.index_path.with_suffix(".tmp") try: line = json.dumps(entry, ensure_ascii=False) + "\n" with temp_path.open("a", encoding="utf-8") as f: f.write(line) f.flush() os.fsync(f.fileno()) if self.index_path.exists(): with temp_path.open("rb") as src, self.index_path.open("ab") as dst: src.seek(0) dst.write(src.read()) dst.flush() os.fsync(dst.fileno()) temp_path.unlink() else: temp_path.replace(self.index_path) # Best-effort directory sync for index durability try: dir_fd = os.open(str(self.index_path.parent), os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) except (OSError, AttributeError): pass except Exception as e: with suppress(Exception): temp_path.unlink(missing_ok=True) raise StorageError(f"Failed to save index entry: {e}") from e __all__ = [ "GlobalStorage", "LockTimeoutError", "ReportLock", "ReportStorage", "StorageError", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server