Skip to main content
Glama
structured_file.py13 kB
""" Structured file management with caching, locking, and derived data. Provides base classes for thread-safe and process-safe file operations with intelligent caching and automatic computation of derived data (token counts, section counts, etc.). """ import fcntl import json import logging import os import threading from pathlib import Path from typing import Any, Callable, Optional logger = logging.getLogger(__name__) class StructuredFileCache: """ Base class for file caching with automatic derived data computation. Features: - mtime-based cache validation (no redundant reads) - Write updates cache from memory (no invalidation/re-read) - Thread-safe and process-safe with fcntl locking - Automatic computation of derived data (tokens, sections, lines) - Tag wrapping for system prompt injection """ def __init__( self, file_path: Path, name: str, tag: Optional[str] = None, section_marker: Optional[str] = None ): """ Initialize structured file cache. Args: file_path: Path to the file name: Human-readable name for logging tag: Optional tag for wrapping content (e.g., "---FLOATING_WINDOWS") section_marker: Optional marker for counting sections (e.g., "---WIDGET_") """ self.file_path = Path(file_path) self.name = name self.tag = tag self.section_marker = section_marker # Cache state self.content: str = "" self._last_mtime: Optional[float] = None # Derived data cache self._cached_token_count: int = 0 self._cached_section_count: int = 0 self._cached_line_count: int = 0 # Thread safety (in-process coordination) self._lock = threading.Lock() def load(self) -> str: """ Load content with mtime-based cache validation. Returns cached content if file hasn't changed, otherwise re-reads. Wraps content with tags if configured. Returns: Content wrapped with tags (if tag is set) """ with self._lock: if not self.file_path.exists(): return self._wrap_with_tags("") # Check if file changed current_mtime = self.file_path.stat().st_mtime if current_mtime != self._last_mtime: # File changed externally → re-read and recalculate with open(self.file_path, 'r', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_SH) # Shared lock for read try: self.content = f.read() finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) self._last_mtime = current_mtime self._update_derived_data() logger.debug(f"[{self.name}] Loaded from disk (mtime changed)") else: logger.debug(f"[{self.name}] Using cached content") return self._wrap_with_tags(self.content) def write(self, content: str) -> None: """ Write content and update cache from memory (no re-read!). This is the key optimization: after writing, we keep the content in memory and update derived data from it, rather than invalidating the cache and forcing a re-read. Args: content: Content to write """ with self._lock: # Ensure directory exists self.file_path.parent.mkdir(parents=True, exist_ok=True) # Atomic write with exclusive lock with open(self.file_path, 'w', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Exclusive lock try: f.write(content) f.flush() os.fsync(f.fileno()) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Update cache from memory (content we just wrote) self.content = content self._last_mtime = self.file_path.stat().st_mtime self._update_derived_data() # Recalculate from in-memory content logger.debug(f"[{self.name}] Written to disk and cache updated") def atomic_update(self, modifier_fn: Callable[[str], str]) -> None: """ Atomic read-modify-write transaction. Holds exclusive lock for the entire transaction to prevent races. Args: modifier_fn: Function that takes current content and returns new content """ with self._lock: self.file_path.parent.mkdir(parents=True, exist_ok=True) # Create file if doesn't exist if not self.file_path.exists(): self.file_path.write_text("") # Hold exclusive lock for entire read-modify-write transaction with open(self.file_path, 'r+', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: # Read current state f.seek(0) current_content = f.read() # Modify new_content = modifier_fn(current_content) # Write back f.seek(0) f.truncate() f.write(new_content) f.flush() os.fsync(f.fileno()) # Update cache from memory self.content = new_content self._last_mtime = self.file_path.stat().st_mtime self._update_derived_data() finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) logger.debug(f"[{self.name}] Atomic update completed") def _update_derived_data(self) -> None: """Recalculate all derived data from current content.""" self._cached_token_count = self._estimate_tokens(self.content) self._cached_line_count = self.content.count('\n') if self.section_marker: self._cached_section_count = int(self.content.count(self.section_marker) / 2) def _estimate_tokens(self, content: str) -> int: """ Estimate token count using simple heuristic. Override in subclasses for more accurate estimation. """ return len(content) // 4 def _wrap_with_tags(self, content: str) -> str: """Wrap content with tags for system prompt injection.""" if not self.tag: return content return f"---{self.tag}\n{content}\n---{self.tag}_END" # Properties for derived data (cached, no recomputation) @property def token_count(self) -> int: """Get cached token count.""" return self._cached_token_count @property def section_count(self) -> int: """Get cached section count.""" return self._cached_section_count @property def line_count(self) -> int: """Get cached line count.""" return self._cached_line_count @property def is_empty(self) -> bool: """Check if content is empty.""" return len(self.content) == 0 class JsonStructuredFile(StructuredFileCache): """ Specialized structured file for JSON files. Provides JSON-specific operations with the same caching and locking guarantees as the base class. Use cases: - mcp_servers.json (registry) - State files (augments, file_windows, etc.) """ def __init__( self, file_path: Path, name: str, default_factory: Optional[Callable[[], dict]] = None ): """ Initialize JSON structured file. Args: file_path: Path to JSON file name: Human-readable name default_factory: Optional factory for default JSON structure """ super().__init__(file_path, name, tag=None, section_marker=None) self.default_factory = default_factory or dict self._cached_data: dict = {} def load_json(self) -> dict: """ Load and parse JSON content. Returns cached parsed data if file hasn't changed. Returns: Parsed JSON data """ with self._lock: if not self.file_path.exists(): return self.default_factory() # Check if file changed current_mtime = self.file_path.stat().st_mtime if current_mtime != self._last_mtime: # File changed → re-read and parse with open(self.file_path, 'r', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_SH) try: self.content = f.read() try: self._cached_data = json.loads(self.content) except json.JSONDecodeError as e: logger.warning(f"[{self.name}] Invalid JSON, using default: {e}") self._cached_data = self.default_factory() finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) self._last_mtime = current_mtime self._update_derived_data() logger.debug(f"[{self.name}] Loaded JSON from disk") return self._cached_data def write_json(self, data: dict, indent: int = 2) -> None: """ Write JSON data and update cache from memory. Args: data: Dictionary to serialize as JSON indent: JSON indentation (default: 2) """ with self._lock: # Serialize to JSON content = json.dumps(data, indent=indent) # Write using base class method (handles locking) self.file_path.parent.mkdir(parents=True, exist_ok=True) with open(self.file_path, 'w', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: f.write(content) f.flush() os.fsync(f.fileno()) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) # Update cache from memory self.content = content self._cached_data = data self._last_mtime = self.file_path.stat().st_mtime self._update_derived_data() logger.debug(f"[{self.name}] Written JSON to disk and cache updated") def atomic_update_json(self, modifier_fn: Callable[[dict], dict]) -> None: """ Atomic read-modify-write transaction for JSON data. Args: modifier_fn: Function that takes current data dict and returns new data dict """ with self._lock: self.file_path.parent.mkdir(parents=True, exist_ok=True) # Create with default if doesn't exist if not self.file_path.exists(): self.write_json(self.default_factory()) # Hold exclusive lock for entire transaction with open(self.file_path, 'r+', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: # Read and parse f.seek(0) try: current_data = json.load(f) except json.JSONDecodeError: current_data = self.default_factory() # Modify new_data = modifier_fn(current_data) # Serialize and write content = json.dumps(new_data, indent=2) f.seek(0) f.truncate() f.write(content) f.flush() os.fsync(f.fileno()) # Update cache from memory self.content = content self._cached_data = new_data self._last_mtime = self.file_path.stat().st_mtime self._update_derived_data() finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) logger.debug(f"[{self.name}] Atomic JSON update completed")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server