Skip to main content
Glama
watchers.py3.55 kB
"""Lightweight filesystem watchers orchestration.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Dict, Optional import logging import os LOGGER = logging.getLogger(__name__) @dataclass class WatchersConfig: enabled: bool = False backend: str = "auto" debounce_ms: int = 200 max_batch: int = 512 index_path: Path = Path.home() / ".mcpdt" / "recent_index" rebuild_on_start: bool = False @dataclass class WatcherStatus: enabled: bool backend: str watchers_count: int queued_events: int last_event_ts: Optional[str] @dataclass class WatcherWorkspace: workspace_id: str watchers_count: int = 0 queued_events: int = 0 last_event_ts: Optional[datetime] = None def bump_event(self) -> None: self.queued_events += 1 self.last_event_ts = datetime.utcnow() @dataclass class ReindexResult: reindexed: bool invalidated: list[str] = field(default_factory=list) class WatchersEngine: """Simplified watcher engine used for introspection and CLI wiring.""" def __init__(self, config: Optional[WatchersConfig] = None): self._config = config or load_watchers_config() self._workspaces: Dict[str, WatcherWorkspace] = {} @property def config(self) -> WatchersConfig: return self._config def ensure_workspace(self, workspace_id: str) -> WatcherWorkspace: workspace = self._workspaces.get(workspace_id) if not workspace: workspace = WatcherWorkspace(workspace_id=workspace_id) if self._config.enabled: workspace.watchers_count = 1 self._workspaces[workspace_id] = workspace return workspace def status(self, workspace_id: str) -> WatcherStatus: workspace = self.ensure_workspace(workspace_id) ts = workspace.last_event_ts.isoformat() if workspace.last_event_ts else None return WatcherStatus( enabled=self._config.enabled, backend=self._config.backend, watchers_count=workspace.watchers_count, queued_events=workspace.queued_events, last_event_ts=ts, ) def mark_event(self, workspace_id: str) -> None: workspace = self.ensure_workspace(workspace_id) workspace.bump_event() def reindex(self, workspace_id: str, rel_path: Optional[str] = None) -> ReindexResult: workspace = self.ensure_workspace(workspace_id) workspace.queued_events = 0 workspace.last_event_ts = datetime.utcnow() invalidated = [rel_path] if rel_path else [] return ReindexResult(reindexed=True, invalidated=invalidated) def load_watchers_config() -> WatchersConfig: enabled = os.environ.get("MCPDT_WATCHERS_ENABLED") == "1" backend = os.environ.get("MCPDT_WATCHERS_BACKEND", "auto") debounce = int(os.environ.get("MCPDT_WATCHERS_DEBOUNCE_MS", "200")) max_batch = int(os.environ.get("MCPDT_WATCHERS_MAX_BATCH", "512")) index_path = Path(os.environ.get("MCPDT_RECENT_INDEX_PATH", Path.home() / ".mcpdt" / "recent_index")) rebuild = os.environ.get("MCPDT_WATCHERS_REBUILD_ON_START", "0") == "1" return WatchersConfig( enabled=enabled, backend=backend, debounce_ms=debounce, max_batch=max_batch, index_path=index_path, rebuild_on_start=rebuild, ) ENGINE = WatchersEngine() def get_engine() -> WatchersEngine: return ENGINE

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/andrey-zhuravl/mcp-desktop-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server