Skip to main content
Glama
preview_usage.py10.5 kB
"""Preview usage tracking with LRU cache management.""" import json import time from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict, fields from mcp.server.fastmcp.utilities.logging import get_logger logger = get_logger(__name__) @dataclass class PreviewUsage: """Usage statistics for a preview.""" url: str subdomain: Optional[str] = None port: Optional[int] = None # optional for single-port mode directory: Optional[str] = None # virtual directory (e.g. "/", "/dist") for restore entry: Optional[str] = None workspace_name: Optional[str] = None user_id: Optional[str] = None chat_id: Optional[str] = None created_at: float = 0.0 last_accessed_at: float = 0.0 access_count: int = 0 def to_dict(self) -> Dict: return asdict(self) @classmethod def from_dict(cls, data: Dict) -> "PreviewUsage": # Backward compatibility: # - tolerate missing fields # - tolerate extra/unknown fields (older schema versions) normalized = dict(data or {}) normalized.setdefault("port", None) normalized.setdefault("directory", None) normalized.setdefault("entry", None) allowed = {f.name for f in fields(cls)} filtered = {k: v for k, v in normalized.items() if k in allowed} return cls(**filtered) class PreviewUsageTracker: """Tracks preview usage and manages LRU cache.""" def __init__(self, storage_path: Path): """Initialize usage tracker. Args: storage_path: Path to JSON file for persistent storage """ self.storage_path = storage_path self._usage_data: Dict[str, PreviewUsage] = {} # Key: subdomain or port (as string) self._load() def _load(self): """Load usage data from storage.""" if self.storage_path.exists(): try: with open(self.storage_path, 'r', encoding='utf-8') as f: data = json.load(f) self._usage_data = { key: PreviewUsage.from_dict(usage_dict) for key, usage_dict in data.items() } logger.info(f"Loaded {len(self._usage_data)} preview usage records") except Exception as e: logger.warning(f"Failed to load usage data: {e}") self._usage_data = {} else: self._usage_data = {} def _save(self): """Save usage data to storage.""" try: self.storage_path.parent.mkdir(parents=True, exist_ok=True) data = { key: usage.to_dict() for key, usage in self._usage_data.items() } with open(self.storage_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save usage data: {e}") def register_preview( self, url: str, subdomain: Optional[str], port: Optional[int], workspace_name: Optional[str] = None, user_id: Optional[str] = None, chat_id: Optional[str] = None, directory: Optional[str] = None, entry: Optional[str] = None, ): """Register a new preview or update existing one. If the preview already exists (by subdomain or port), preserve existing statistics (access_count, last_accessed_at, created_at). Args: url: Preview URL subdomain: Subdomain (if using wildcard domain) port: Port number workspace_name: Workspace name user_id: User ID chat_id: Chat ID """ key = subdomain if subdomain else (str(port) if port is not None else None) if not key: logger.warning("Cannot register preview without subdomain or port") return # Check if preview already exists existing_usage = self._usage_data.get(key) if existing_usage: # If workspace_name doesn't match, this is a conflict - don't update if existing_usage.workspace_name and workspace_name and existing_usage.workspace_name != workspace_name: logger.warning( f"Subdomain {key} already used by workspace {existing_usage.workspace_name}, " f"cannot assign to {workspace_name}. This should not happen if subdomain reuse logic is correct." ) # Don't update - let the caller handle this by generating a new subdomain return # Update existing preview, but preserve statistics existing_usage.url = url existing_usage.directory = directory existing_usage.entry = entry existing_usage.workspace_name = workspace_name existing_usage.user_id = user_id existing_usage.chat_id = chat_id # Preserve: created_at, last_accessed_at, access_count self._save() logger.debug(f"Updated existing preview: {key} -> {url} (preserved stats: count={existing_usage.access_count})") else: # Create new preview now = time.time() usage = PreviewUsage( url=url, subdomain=subdomain, port=port, directory=directory, entry=entry, workspace_name=workspace_name, user_id=user_id, chat_id=chat_id, created_at=now, last_accessed_at=now, access_count=0, ) self._usage_data[key] = usage self._save() logger.debug(f"Registered new preview: {key} -> {url}") def record_access(self, subdomain: Optional[str] = None, port: Optional[int] = None): """Record that a preview URL was accessed. Args: subdomain: Subdomain (if using wildcard domain) port: Port number (if not using subdomain) """ key = subdomain if subdomain else (str(port) if port is not None else None) if not key or key not in self._usage_data: logger.warning(f"Preview not found for access record: {key}") return usage = self._usage_data[key] usage.last_accessed_at = time.time() usage.access_count += 1 self._save() logger.debug(f"Recorded access for {key}: count={usage.access_count}") def unregister_preview(self, subdomain: Optional[str] = None, port: Optional[int] = None): """Unregister a preview (when it's stopped). Args: subdomain: Subdomain (if using wildcard domain) port: Port number (if not using subdomain) """ key = subdomain if subdomain else (str(port) if port is not None else None) if key and key in self._usage_data: del self._usage_data[key] self._save() logger.debug(f"Unregistered preview: {key}") def get_lru_preview(self) -> Optional[Tuple[str, PreviewUsage]]: """Get the least recently used preview. If you pass a set of keys, this only considers those keys (useful for finding the LRU among currently-active previews). Returns: Tuple of (key, PreviewUsage) or None if no previews exist """ if not self._usage_data: return None # Find preview with oldest last_accessed_at lru_key = min(self._usage_data.keys(), key=lambda k: self._usage_data[k].last_accessed_at) return (lru_key, self._usage_data[lru_key]) def get_lru_preview_for_keys(self, keys: set) -> Optional[Tuple[str, PreviewUsage]]: """Get the least recently used preview restricted to a key set.""" if not keys: return None candidates = [k for k in keys if k in self._usage_data] if not candidates: return None lru_key = min(candidates, key=lambda k: self._usage_data[k].last_accessed_at) return (lru_key, self._usage_data[lru_key]) def get_all_previews(self) -> List[PreviewUsage]: """Get all registered previews. Returns: List of PreviewUsage objects """ return list(self._usage_data.values()) def get_preview_by_key(self, key: str) -> Optional[PreviewUsage]: """Get preview by key (subdomain or port). Args: key: Subdomain or port as string Returns: PreviewUsage or None """ return self._usage_data.get(key) def get_preview_by_workspace_name(self, workspace_name: Optional[str]) -> Optional[PreviewUsage]: """Get the most recently accessed preview for a workspace. Args: workspace_name: Workspace name Returns: PreviewUsage or None """ if not workspace_name: return None # Find all previews for this workspace workspace_previews = [ usage for usage in self._usage_data.values() if usage.workspace_name == workspace_name and usage.subdomain ] if not workspace_previews: return None # Return the most recently accessed one return max(workspace_previews, key=lambda u: u.last_accessed_at or u.created_at or 0.0) def cleanup_stale_previews(self, active_ports: set, active_subdomains: set): """Remove previews that are no longer active. Args: active_ports: Set of active port numbers active_subdomains: Set of active subdomains """ keys_to_remove = [] for key, usage in self._usage_data.items(): if usage.subdomain: if usage.subdomain not in active_subdomains: keys_to_remove.append(key) else: if usage.port not in active_ports: keys_to_remove.append(key) for key in keys_to_remove: del self._usage_data[key] logger.debug(f"Cleaned up stale preview: {key}") if keys_to_remove: self._save()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server