Skip to main content
Glama
preview.py27.2 kB
"""Frontend preview server management.""" import asyncio import os import random import string import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Optional from mcp.server.fastmcp.utilities.logging import get_logger from starlette.responses import FileResponse, JSONResponse, Response from .preview_usage import PreviewUsageTracker, PreviewUsage logger = get_logger(__name__) # Global usage tracker (shared across all PreviewManager instances) _global_usage_tracker: Optional[PreviewUsageTracker] = None def get_global_usage_tracker(storage_path: Optional[Path] = None) -> PreviewUsageTracker: """Get or create the global usage tracker. Args: storage_path: Path to storage file (default: user_data/.preview_usage.json) Returns: PreviewUsageTracker instance """ global _global_usage_tracker if _global_usage_tracker is None: if storage_path is None: # Default to user_data directory default_user_data_dir = Path(__file__).parent.parent.parent / "user_data" user_data_dir = Path(os.environ.get("MCP_WORKSPACES_DIR", str(default_user_data_dir))) storage_path = user_data_dir / ".preview_usage.json" _global_usage_tracker = PreviewUsageTracker(storage_path) return _global_usage_tracker class PreviewRoutingMiddleware: """ASGI middleware to serve previews in single-port mode based on Host subdomain.""" def __init__(self, app, config: Optional[Dict[str, Any]] = None): self.app = app cfg = config or {} preview_cfg = cfg.get("preview", {}) wildcard = preview_cfg.get("wildcard_domain") or "" wildcard = wildcard.replace("https://", "").replace("http://", "") if wildcard.startswith("*."): wildcard = wildcard[2:] self.base_domain = wildcard.lstrip(".") async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return host = "" for name, value in scope.get("headers", []): if name == b"host": host = value.decode() break if ":" in host: host = host.split(":", 1)[0] if self.base_domain and host.endswith(self.base_domain): subdomain = host[: -len(self.base_domain)].rstrip(".") if subdomain: preview_entry = PreviewManager.get_preview_entry(subdomain) if preview_entry: response = await self._serve_preview(scope, preview_entry, subdomain) await response(scope, receive, send) return await self.app(scope, receive, send) async def _serve_preview(self, scope, preview_entry: Dict[str, Any], subdomain: str) -> Response: path = scope.get("path", "/") rel_path = path.lstrip("/") serve_path: Path = preview_entry["serve_path"] entry = preview_entry.get("entry") or "index.html" target = (serve_path / rel_path).resolve() try: target.relative_to(serve_path.resolve()) except ValueError: return JSONResponse({"success": False, "error": "Invalid path"}, status_code=403) if target.is_dir(): target = target / entry if not target.exists(): target = (serve_path / entry).resolve() if not target.exists(): return JSONResponse({"success": False, "error": "File not found"}, status_code=404) try: tracker = get_global_usage_tracker() tracker.record_access(subdomain=subdomain) except Exception: pass return FileResponse(target) @dataclass class PreviewInfo: """Information about an active preview.""" port: Optional[int] directory: Path entry: str subdomain: Optional[str] = None # Subdomain for wildcard domain process: Optional[asyncio.subprocess.Process] = None # unused in single-port mode url: str = "" started_at: float = field(default_factory=time.time) def to_dict(self) -> Dict[str, Any]: return { "port": self.port, "directory": str(self.directory), "entry": self.entry, "url": self.url, "started_at": self.started_at, # In single-port mode, the main server process serves previews, so a # missing subprocess still means the preview is "running". "running": True if self.process is None else (self.process.returncode is None), } class PreviewManager: """Manages frontend preview servers for sessions.""" def __init__( self, workspace_path: Path, workspace_name: Optional[str] = None, host: str = "0.0.0.0", external_host: Optional[str] = None, config: Optional[Dict[str, Any]] = None, usage_tracker: Optional[PreviewUsageTracker] = None, ): """Initialize preview manager. Args: workspace_path: The workspace directory. workspace_name: Workspace name for generating subdomain (e.g., "user123_chat456"). host: Host to bind preview servers to. external_host: External hostname for URLs (e.g., for Docker). config: Additional configuration. """ self.workspace_path = workspace_path self.workspace_name = workspace_name self.host = host self.external_host = external_host or "localhost" self.config = config or {} # URL generation settings (single-port + wildcard domain routing) self.wildcard_domain: Optional[str] = None # Wildcard domain like "*.proxy.your_domain.com" self.use_tls: bool = False # Whether to use HTTPS/TLS for preview URLs self.max_active_previews: Optional[int] = None # Default port for URL building (main MCP port) try: self.default_port = int(os.environ.get("FASTMCP_PORT", "18089")) except Exception: self.default_port = 18089 # Override from config preview_config = self.config.get("preview", {}) if preview_config: if "external_host" in preview_config: self.external_host = preview_config["external_host"] if "wildcard_domain" in preview_config: self.wildcard_domain = preview_config["wildcard_domain"] if "use_tls" in preview_config: self.use_tls = bool(preview_config["use_tls"]) if "max_active_previews" in preview_config: try: v = int(preview_config["max_active_previews"]) if v > 0: self.max_active_previews = v except Exception: self.max_active_previews = None # Active preview for this session (only one at a time) self.active_preview: Optional[PreviewInfo] = None # Track used subdomains globally (class-level) to ensure uniqueness PreviewManager._used_subdomains: set = getattr(PreviewManager, "_used_subdomains", set()) # Global mapping: subdomain -> PreviewManager instance (for LRU cleanup) PreviewManager._active_previews: Dict[str, "PreviewManager"] = getattr( PreviewManager, "_active_previews", {} ) # Host routing map for single-port mode PreviewManager._preview_entries: Dict[str, Dict[str, Any]] = getattr( PreviewManager, "_preview_entries", {} ) # Usage tracker for LRU management self.usage_tracker = usage_tracker or get_global_usage_tracker() # Store workspace info for usage tracking self.workspace_name = workspace_name def _generate_random_subdomain(self) -> str: """Generate a short random subdomain. We no longer encode ports in the subdomain: previews are served by the main server (single-port), and routing is done via Host header. """ subdomain = "".join(random.choices(string.ascii_lowercase + string.digits, k=10)) max_attempts = 10 attempts = 0 while subdomain in PreviewManager._used_subdomains and attempts < max_attempts: subdomain = "".join(random.choices(string.ascii_lowercase + string.digits, k=10)) attempts += 1 return subdomain def _normalize_usage_directory(self, raw_directory: Optional[str], entry: Optional[str] = None) -> str: """Normalize stored directory into a virtual directory (e.g. '/', '/dist'). Backward compatible: - If stored as absolute path under workspace_path, convert to relative. - If missing/invalid, fall back to '/'. """ if not raw_directory: return "/" # Already virtual-style if raw_directory in ("/", ".", "/.", ""): return "/" raw_directory = raw_directory.replace("\\", "/") # Absolute path stored previously: try to relativize to workspace if raw_directory.startswith("/"): try: rel = Path(raw_directory).resolve().relative_to(self.workspace_path.resolve()) # If the stored "directory" accidentally includes a file path, # convert it to its parent directory. if rel.name and rel.suffix: rel = rel.parent if entry and rel.name == entry: rel = rel.parent rel_str = str(rel).replace("\\", "/").strip("/").strip(".") return f"/{rel_str}" if rel_str else "/" except Exception: return "/" # Treat as relative directory under workspace rel_path = Path(raw_directory) if rel_path.suffix: rel_path = rel_path.parent if entry and rel_path.name == entry: rel_path = rel_path.parent rel_str = str(rel_path).replace("\\", "/").strip("/").strip(".") return f"/{rel_str}" if rel_str else "/" def _build_url(self, port: Optional[int], entry: str, subdomain: Optional[str] = None) -> str: """Build the preview URL. Preferred pattern (production): `https://{subdomain}.{wildcard_domain}/{entry}`. Fallback: `http(s)://{external_host}:{FASTMCP_PORT}/{entry}`. """ if self.wildcard_domain and subdomain: domain = self.wildcard_domain.replace("*", subdomain) if self.wildcard_domain.startswith("https://"): protocol = "https" elif self.wildcard_domain.startswith("http://"): protocol = "http" else: protocol = "https" if self.use_tls else "http" if not domain.startswith(("http://", "https://")): domain = f"{protocol}://{domain}" return f"{domain}/{entry.lstrip('/')}" protocol = "https" if self.use_tls else "http" return f"{protocol}://{self.external_host}:{port or self.default_port}/{entry}" @staticmethod def get_preview_entry(subdomain: str) -> Optional[Dict[str, Any]]: return getattr(PreviewManager, "_preview_entries", {}).get(subdomain) async def start_preview( self, directory: str = "/", entry: str = "index.html", subdomain: Optional[str] = None, ) -> Dict[str, Any]: """Start a preview (single-port mode only).""" # Reuse existing subdomain for the same session instead of stopping reuse_subdomain = None if self.active_preview and not subdomain: # Same session: reuse the existing subdomain to keep the same URL reuse_subdomain = self.active_preview.subdomain elif not subdomain and self.workspace_name: # If active_preview is None (e.g., after LRU cleanup), try to find existing subdomain from usage_tracker existing_usage = self.usage_tracker.get_preview_by_workspace_name(self.workspace_name) if existing_usage and existing_usage.subdomain: # Check if this subdomain is still available (not used by another workspace) existing_entry = PreviewManager._preview_entries.get(existing_usage.subdomain) if existing_entry and existing_entry.get("workspace_name") == self.workspace_name: reuse_subdomain = existing_usage.subdomain elif not existing_entry: # Subdomain exists in tracker but not in active entries, can reuse reuse_subdomain = existing_usage.subdomain directory_rel = (directory or "/").lstrip("/").replace("\\", "/").strip() if directory_rel in ("", "."): directory_rel = "" # Support entry paths like "dist/index.html" by folding them into directory. entry_path = (entry or "index.html").lstrip("/").replace("\\", "/") if "/" in entry_path: entry_dir, entry_name = entry_path.rsplit("/", 1) directory_rel = "/".join([p for p in [directory_rel.strip("/"), entry_dir.strip("/")] if p]).strip("/") entry = entry_name else: entry = entry_path serve_path = self.workspace_path / directory_rel if directory_rel else self.workspace_path if not serve_path.exists() or not serve_path.is_dir(): return {"success": False, "error": f"Directory does not exist: /{directory_rel}"} if not (serve_path / entry).exists(): for fallback in ["index.html", "index.htm", "default.html"]: if (serve_path / fallback).exists(): entry = fallback break if not subdomain: if reuse_subdomain: # Reuse existing subdomain for same session # Verify it's not being used by another workspace existing_entry = PreviewManager._preview_entries.get(reuse_subdomain) if existing_entry and existing_entry.get("workspace_name") != self.workspace_name: # Subdomain is being used by another workspace, generate new one logger.warning( f"Subdomain {reuse_subdomain} is used by workspace {existing_entry.get('workspace_name')}, " f"generating new subdomain for {self.workspace_name}" ) subdomain = self._generate_random_subdomain() else: subdomain = reuse_subdomain else: # Generate new subdomain for new preview subdomain = self._generate_random_subdomain() if not subdomain.strip(): return {"success": False, "error": "Invalid subdomain"} # Verify subdomain is not already used by another workspace existing_entry = PreviewManager._preview_entries.get(subdomain) if existing_entry and existing_entry.get("workspace_name") != self.workspace_name: # Conflict: subdomain is used by another workspace, generate new one logger.warning( f"Subdomain {subdomain} conflict detected, generating new subdomain for {self.workspace_name}" ) subdomain = self._generate_random_subdomain() # Only add to used_subdomains if it's a new subdomain if subdomain not in PreviewManager._used_subdomains: PreviewManager._used_subdomains.add(subdomain) user_id = chat_id = None if self.workspace_name and "_" in self.workspace_name: parts = self.workspace_name.rsplit("_", 1) if len(parts) == 2: user_id, chat_id = parts if self.max_active_previews is not None: active_total = len(getattr(PreviewManager, "_preview_entries", {})) while active_total >= self.max_active_previews: cleaned = await self._cleanup_lru_preview() if not cleaned: return {"success": False, "error": f"Maximum active previews reached ({self.max_active_previews})"} active_total = len(getattr(PreviewManager, "_preview_entries", {})) virtual_directory = f"/{directory_rel}" if directory_rel else "/" url = self._build_url(self.default_port, entry, subdomain) PreviewManager._preview_entries[subdomain] = { "serve_path": serve_path, "entry": entry, "workspace_name": self.workspace_name, } PreviewManager._active_previews[subdomain] = self self.active_preview = PreviewInfo( port=None, directory=serve_path, entry=entry, subdomain=subdomain, process=None, url=url, ) self.usage_tracker.register_preview( url=url, subdomain=subdomain, port=None, workspace_name=self.workspace_name, user_id=user_id, chat_id=chat_id, directory=virtual_directory, entry=entry, ) return { "success": True, "url": url, "directory": virtual_directory, "entry": entry, "subdomain": subdomain, "port": None, "message": f"Preview registered at {url}", } async def stop_preview(self) -> Dict[str, Any]: """Stop the active preview (single-port mode).""" if not self.active_preview: return {"success": True, "message": "No active preview to stop"} preview = self.active_preview subdomain = preview.subdomain if subdomain: PreviewManager._used_subdomains.discard(subdomain) PreviewManager._preview_entries.pop(subdomain, None) PreviewManager._active_previews.pop(subdomain, None) self.usage_tracker.unregister_preview(subdomain=subdomain, port=None) self.active_preview = None return {"success": True, "message": "Preview stopped"} def get_status(self) -> Dict[str, Any]: """Get the status of the preview server.""" if not self.active_preview: return {"active": False, "message": "No active preview"} return {"active": True, **self.active_preview.to_dict()} async def _cleanup_lru_preview(self) -> bool: """Clean up the least recently used preview mapping. Returns: True if a preview was cleaned up, False otherwise. """ active_keys = set(getattr(PreviewManager, "_preview_entries", {}).keys()) lru_result = self.usage_tracker.get_lru_preview_for_keys(active_keys) if not lru_result: if not active_keys: return False # If tracker is missing records, evict an arbitrary active preview to # ensure we can enforce the cap. lru_key = next(iter(active_keys)) logger.info(f"Cleaning up preview mapping without usage record: {lru_key}") PreviewManager._used_subdomains.discard(lru_key) PreviewManager._preview_entries.pop(lru_key, None) PreviewManager._active_previews.pop(lru_key, None) self.usage_tracker.unregister_preview(subdomain=lru_key, port=None) return True lru_key, lru_usage = lru_result preview_manager = PreviewManager._active_previews.get(lru_key) if preview_manager and preview_manager.active_preview and preview_manager.active_preview.subdomain == lru_key: logger.info(f"Cleaning up LRU preview: {lru_key}") await preview_manager.stop_preview() return True # Fallback: remove mapping directly (ensures active_total decreases) logger.info(f"Cleaning up LRU preview mapping directly: {lru_key}") PreviewManager._used_subdomains.discard(lru_key) PreviewManager._preview_entries.pop(lru_key, None) PreviewManager._active_previews.pop(lru_key, None) self.usage_tracker.unregister_preview(subdomain=lru_usage.subdomain, port=lru_usage.port) if preview_manager and preview_manager.active_preview and preview_manager.active_preview.subdomain == lru_key: preview_manager.active_preview = None return True async def restore_preview_from_usage( self, usage: "PreviewUsage", ) -> bool: """Restore a preview mapping (single-port).""" if usage.workspace_name != self.workspace_name: return False entry = usage.entry or "index.html" directory = self._normalize_usage_directory(usage.directory, entry=entry) # Backward compatibility: some older records stored "entry" as a path # (e.g. "dist/index.html") with directory="/". entry_path = entry.lstrip("/").replace("\\", "/") if "/" in entry_path: entry_dir, entry_name = entry_path.rsplit("/", 1) if directory.rstrip("/") in ("", "/"): directory = f"/{entry_dir}" else: directory = f"{directory.rstrip('/')}/{entry_dir}" entry = entry_name if directory.startswith("/"): directory_rel = directory[1:] else: directory_rel = directory serve_path = self.workspace_path / directory_rel if directory_rel and directory_rel != "." else self.workspace_path if not serve_path.exists() or not serve_path.is_dir(): return False if not (serve_path / entry).exists(): for fallback in ["index.html", "index.htm", "default.html"]: if (serve_path / fallback).exists(): entry = fallback break result = await self.start_preview( directory="/" + directory_rel if directory_rel else "/", entry=entry, subdomain=usage.subdomain, ) return bool(result.get("success")) @staticmethod async def restore_all_previews_on_startup( workspaces_dir: Path, config: Optional[Dict[str, Any]] = None, ) -> int: """Restore all preview services from JSON file on startup. This method should be called once when the server starts. Args: workspaces_dir: Directory containing workspace directories config: Configuration dictionary Returns: Number of previews successfully restored """ config = config or {} preview_config = config.get("preview", {}) # Get the usage tracker usage_tracker = get_global_usage_tracker() storage_path = usage_tracker.storage_path logger.info(f"Loading preview usage data from: {storage_path}") # Get all preview usage records all_previews = usage_tracker.get_all_previews() if not all_previews: logger.info("No preview services found in storage, nothing to restore") return 0 # Choose at most 1 preview per workspace (most recently accessed). # This matches the current PreviewManager behavior (1 active preview per workspace). by_workspace: Dict[str, PreviewUsage] = {} missing_workspace = 0 missing_subdomain = 0 def _recency(u: PreviewUsage) -> float: return float(u.last_accessed_at or u.created_at or 0.0) for p in all_previews: if not p.workspace_name: missing_workspace += 1 continue if not p.subdomain: missing_subdomain += 1 continue existing = by_workspace.get(p.workspace_name) if not existing or _recency(p) > _recency(existing): by_workspace[p.workspace_name] = p selected_previews = list(by_workspace.values()) selected_previews.sort(key=_recency, reverse=True) if missing_workspace: logger.warning(f"Found {missing_workspace} preview(s) without workspace_name, skipping") if missing_subdomain: logger.warning(f"Found {missing_subdomain} preview(s) without subdomain, skipping") # Apply global max_active_previews cap at restore time cap = preview_config.get("max_active_previews") try: cap = int(cap) if cap is not None else None if cap is not None and cap <= 0: cap = None except Exception: cap = None if cap is not None and len(selected_previews) > cap: skipped = len(selected_previews) - cap selected_previews = selected_previews[:cap] logger.warning(f"Restore cap reached ({cap}), skipping {skipped} older workspace previews") logger.info(f"Found {len(selected_previews)} preview service(s) to restore across {len(by_workspace)} workspace(s)") restored_count = 0 failed_count = 0 skipped_count = 0 for idx, preview in enumerate(selected_previews, 1): workspace_name = preview.workspace_name subdomain = preview.subdomain url = preview.url key_info = f"subdomain {subdomain}" if subdomain else "no-subdomain" logger.info(f"[{idx}/{len(selected_previews)}] Restoring preview: {workspace_name} {key_info} -> {url}") workspace_path = workspaces_dir / workspace_name if not workspace_path.exists(): logger.warning(f" ✗ Workspace directory does not exist: {workspace_path}, skipping") skipped_count += 1 continue preview_manager = PreviewManager( workspace_path=workspace_path, workspace_name=workspace_name, config=config, usage_tracker=usage_tracker, ) try: success = await preview_manager.restore_preview_from_usage(preview) if success: restored_count += 1 logger.info(f" ✓ Successfully restored preview on {key_info}") else: failed_count += 1 logger.warning( f" ✗ Failed to restore preview on {key_info} (directory={preview.directory}, entry={preview.entry})" ) except Exception as e: failed_count += 1 logger.error(f" ✗ Error restoring preview on {key_info}: {e}", exc_info=True) total_processed = restored_count + failed_count + skipped_count logger.info( f"Restoration summary: {restored_count} restored, {failed_count} failed, {skipped_count} skipped " f"(total: {total_processed}/{len(selected_previews)})" ) return restored_count async def cleanup(self): """Clean up resources (call on shutdown).""" await self.stop_preview()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/answerlink/MCP-Workspace-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server