Skip to main content
Glama
index.py12.9 kB
"""Global report index management and title resolution. This module provides the global index that tracks all reports in the system, enabling fast lookup by ID or title, and maintaining consistency across the file system storage. """ from __future__ import annotations import contextlib import json import logging import os import shutil from pathlib import Path from pydantic import ValidationError from .models import IndexEntry, Outline class IndexError(RuntimeError): """Base exception for index-related errors.""" class IndexCorruptionError(IndexError): """Raised when index file is corrupted.""" class ReportIndex: """Global index of all reports in the system. Maintains a JSONL file with metadata for all reports, providing fast lookup and title resolution. """ def __init__(self, index_path: Path) -> None: """Initialize report index. Args: index_path: Path to the index.jsonl file """ self.index_path = index_path self._entries: dict[str, IndexEntry] = {} self._title_to_id: dict[str, str] = {} self._load_index() def _load_index(self) -> None: """Load index from disk.""" if not self.index_path.exists(): return entries = {} title_to_id = {} corrupted_lines = [] try: with self.index_path.open("r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: data = json.loads(line) entry = IndexEntry(**data) entries[entry.report_id] = entry # Track title mapping (last entry wins for duplicates) title_to_id[entry.current_title.lower()] = entry.report_id except Exception as e: # Log corrupted line but continue loading valid entries corrupted_lines.append((line_num, line, str(e))) continue # If we found corrupted entries, try to rebuild from filesystem if corrupted_lines: import logging logger = logging.getLogger(__name__) logger.warning( f"Found {len(corrupted_lines)} corrupted index entries. " f"Attempting to rebuild index from filesystem..." ) # Backup corrupted index backup_path = self.index_path.with_suffix(".jsonl.corrupted") try: self.index_path.rename(backup_path) logger.info(f"Backed up corrupted index to {backup_path}") except Exception: pass # Rebuild from filesystem self.rebuild_from_filesystem() return except (OSError, json.JSONDecodeError) as e: raise IndexCorruptionError(f"Failed to load index: {e}") from e self._entries = entries self._title_to_id = title_to_id def rebuild_from_filesystem(self) -> None: """Rebuild index by scanning report directories atomically. Builds a fresh index from by_id/ into a temp file and atomically swaps it into place. On failure, the existing index (and in-memory entries) are preserved and the backup is restored. """ reports_root = self.index_path.parent by_id_dir = reports_root / "by_id" if not by_id_dir.exists(): self._entries = {} self._title_to_id = {} return previous_entries = self._entries previous_titles = self._title_to_id backup_path = self.index_path.with_suffix(".jsonl.bak") new_entries: dict[str, IndexEntry] = {} title_to_id: dict[str, str] = {} for report_dir in by_id_dir.iterdir(): if not report_dir.is_dir(): continue outline_path = report_dir / "outline.json" if not outline_path.exists(): # Skip directories without an outline; may be incomplete reports continue try: raw = outline_path.read_text(encoding="utf-8") data = json.loads(raw) outline = Outline(**data) except ValidationError as e: # Log validation errors but continue indexing other reports logger = logging.getLogger(__name__) logger.warning( f"Skipping report {report_dir.name} due to validation error: {e}", extra={ "report_id": report_dir.name, "error_type": type(e).__name__, "error_message": str(e), }, ) continue except Exception as e: # Log unexpected errors logger = logging.getLogger(__name__) logger.warning( f"Skipping report {report_dir.name} due to unexpected error: {e}", extra={ "report_id": report_dir.name, "error_type": type(e).__name__, }, ) continue report_id = report_dir.name entry = IndexEntry( report_id=outline.report_id, current_title=outline.title, created_at=outline.created_at, updated_at=outline.updated_at, tags=outline.metadata.get("tags", []), status=outline.metadata.get("status", "active"), path=f"by_id/{report_id}", ) new_entries[entry.report_id] = entry title_to_id[entry.current_title.lower()] = entry.report_id # Backup current index before swapping with contextlib.suppress(Exception): if self.index_path.exists(): shutil.copy2(self.index_path, backup_path) try: self._save_index_internal(entries=new_entries, title_map=title_to_id) with contextlib.suppress(Exception): if backup_path.exists(): backup_path.unlink() except Exception: # Restore previous state and backup file self._entries = previous_entries self._title_to_id = previous_titles with contextlib.suppress(Exception): if backup_path.exists(): backup_path.replace(self.index_path) raise def _save_index(self) -> None: """Save current index to disk.""" self._save_index_internal(entries=None, title_map=None) def _save_index_internal(self, entries=None, title_map=None) -> None: """Save provided entries to disk atomically and update in-memory cache.""" temp_path = self.index_path.with_suffix(".tmp") entries = self._entries if entries is None else entries title_map = self._title_to_id if title_map is None else title_map try: with temp_path.open("w", encoding="utf-8") as f: for entry in entries.values(): data = entry.model_dump() line = json.dumps(data, ensure_ascii=False) + "\n" f.write(line) f.flush() os.fsync(f.fileno()) temp_path.replace(self.index_path) self._entries = entries self._title_to_id = title_map # Best-effort directory sync for index durability try: dir_fd = os.open(str(self.index_path.parent), os.O_RDONLY) try: os.fsync(dir_fd) finally: os.close(dir_fd) except (OSError, AttributeError): pass except Exception as e: with contextlib.suppress(Exception): temp_path.unlink(missing_ok=True) raise IndexError(f"Failed to save index: {e}") from e def add_entry(self, entry: IndexEntry) -> None: """Add or update an index entry. Args: entry: Index entry to add/update """ self._entries[entry.report_id] = entry self._title_to_id[entry.current_title.lower()] = entry.report_id self._save_index() def remove_entry(self, report_id: str) -> None: """Remove an index entry. Args: report_id: Report ID to remove """ if report_id in self._entries: entry = self._entries[report_id] del self._entries[report_id] # Remove title mapping if it points to this report if self._title_to_id.get(entry.current_title.lower()) == report_id: del self._title_to_id[entry.current_title.lower()] self._save_index() def get_entry(self, report_id: str) -> IndexEntry | None: """Get index entry by report ID. Args: report_id: Report identifier Returns: Index entry or None if not found """ return self._entries.get(report_id) def resolve_title(self, title: str, allow_partial: bool = True) -> str | None: """Resolve a title to a report ID. Args: title: Title to resolve (case-insensitive) allow_partial: If True, allow partial matches Returns: Report ID or None if not found """ title_lower = title.lower() # Exact match first if title_lower in self._title_to_id: return self._title_to_id[title_lower] if not allow_partial: return None # Partial match on active reports candidates = [] for entry in self._entries.values(): if entry.status != "active": continue if title_lower in entry.current_title.lower(): candidates.append((entry.current_title, entry.report_id)) if not candidates: return None if len(candidates) == 1: return candidates[0][1] # Multiple matches - return None to force explicit selection # In a real implementation, this might raise an error or return # a list of candidates for user selection return None def list_entries( self, status: str | None = None, tags: list[str] | None = None, sort_by: str = "updated_at", reverse: bool = True, ) -> list[IndexEntry]: """List index entries with optional filtering. Args: status: Filter by status (active/archived) tags: Filter by tags (reports must have all specified tags) sort_by: Sort field (created_at, updated_at, current_title) reverse: Reverse sort order Returns: List of matching index entries """ entries = list(self._entries.values()) # Apply filters if status: entries = [e for e in entries if e.status == status] if tags: tag_set = set(tags) entries = [e for e in entries if tag_set.issubset(set(e.tags))] # Sort if sort_by == "created_at": def key_func(e): return e.created_at elif sort_by == "current_title": def key_func(e): return e.current_title.lower() else: # updated_at (default) def key_func(e): return e.updated_at entries.sort(key=key_func, reverse=reverse) return entries def validate_consistency(self) -> list[str]: """Validate index consistency with filesystem. Returns: List of validation error messages """ errors = [] reports_root = self.index_path.parent by_id_dir = reports_root / "by_id" # Check that all indexed reports exist on disk for report_id, _entry in self._entries.items(): report_dir = by_id_dir / report_id if not report_dir.exists(): errors.append(f"Indexed report {report_id} not found on disk") # Check for unindexed reports on disk if by_id_dir.exists(): for report_dir in by_id_dir.iterdir(): if not report_dir.is_dir(): continue report_id = report_dir.name if report_id not in self._entries: errors.append(f"Unindexed report directory: {report_id}") return errors __all__ = [ "IndexCorruptionError", "IndexError", "ReportIndex", ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server