We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/kzmshx/frontmatter-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Embedding indexer module for background embedding generation."""
import threading
from enum import Enum
from pathlib import Path
from typing import Any, Callable
import frontmatter
from frontmatter_mcp.semantic.cache import EmbeddingCache
from frontmatter_mcp.semantic.model import EmbeddingModel
class IndexerState(Enum):
"""State of the embedding indexer."""
IDLE = "idle" # Not started yet
INDEXING = "indexing" # Indexing in progress
READY = "ready" # Indexing completed at least once
class EmbeddingIndexer:
"""Background indexer for document embeddings."""
def __init__(
self,
cache: EmbeddingCache,
model: EmbeddingModel,
get_files: Callable[[], list[Path]],
base_dir: Path,
) -> None:
"""Initialize the indexer.
Args:
cache: Embedding cache instance.
model: Embedding model instance.
get_files: Callable that returns list of files to index.
base_dir: Base directory for relative path calculation.
"""
self._cache = cache
self._model = model
self._get_files = get_files
self._base_dir = base_dir
self._state = IndexerState.IDLE
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
@property
def state(self) -> IndexerState:
"""Get current indexer state."""
with self._lock:
return self._state
def start(self) -> dict[str, Any]:
"""Start background indexing.
Returns:
Status dict with state, message, and target_count.
"""
with self._lock:
if self._state == IndexerState.INDEXING:
return {
"state": self._state.value,
"message": "Indexing already in progress",
}
files = self._get_files()
target_count = len(files)
self._state = IndexerState.INDEXING
self._thread = threading.Thread(
target=self._run_indexing,
args=(files,),
daemon=True,
)
self._thread.start()
return {
"state": self._state.value,
"message": "Indexing started",
"target_count": target_count,
}
def _run_indexing(self, files: list[Path]) -> None:
"""Run the indexing process.
Args:
files: List of files to index.
"""
try:
self._index_files(files)
finally:
self._cache.close()
with self._lock:
self._state = IndexerState.READY
def _index_files(self, files: list[Path]) -> None:
"""Index the given files.
Args:
files: List of files to index.
"""
# Build current file map with mtime
current_files: dict[str, float] = {}
for file_path in files:
try:
rel_path = str(file_path.relative_to(self._base_dir))
mtime = file_path.stat().st_mtime
current_files[rel_path] = mtime
except (ValueError, OSError):
continue
# Find stale and deleted paths
stale_paths = self._cache.get_stale_paths(current_files)
deleted_paths = self._cache.get_deleted_paths(current_files)
# Remove deleted entries
for path in deleted_paths:
self._cache.delete(path)
# Index stale files
for rel_path in stale_paths:
abs_path = self._base_dir / rel_path
try:
content = self._get_content(abs_path)
if content:
vector = self._model.encode(content)
mtime = current_files[rel_path]
self._cache.set(rel_path, mtime, vector)
except Exception:
# Skip files that can't be processed
continue
def _get_content(self, file_path: Path) -> str | None:
"""Get content from a file for embedding.
Args:
file_path: Path to the file.
Returns:
File content (body text after frontmatter), or None if empty.
"""
try:
post = frontmatter.load(file_path)
content = post.content.strip()
return content if content else None
except Exception:
return None
def wait(self, timeout: float | None = None) -> bool:
"""Wait for indexing to complete.
Args:
timeout: Maximum time to wait in seconds.
Returns:
True if indexing completed, False if timed out.
"""
if self._thread is not None:
self._thread.join(timeout=timeout)
return not self._thread.is_alive()
return True