#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "llama-index",
# "llama-index-embeddings-huggingface",
# "llama-index-embeddings-openai",
# "sentence-transformers",
# "transformers",
# "mcp",
# ]
# ///
"""Expose docs/en LlamaIndex chunks via the MCP FastMCP helper."""
from __future__ import annotations
import argparse
from dataclasses import dataclass
from pathlib import Path
from threading import Lock
from typing import List, Optional
from llama_index.core import Settings, StorageContext, load_index_from_storage
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
try:
from llama_index.embeddings.openai import OpenAIEmbedding
except ImportError: # pragma: no cover
OpenAIEmbedding = None
try:
from mcp.server.fastmcp import FastMCP
except ModuleNotFoundError as exc: # pragma: no cover
raise SystemExit("The 'mcp' package is required. Install via 'pip install mcp'.") from exc
@dataclass(frozen=True)
class IndexConfig:
persist_dir: Path
embed_backend: str
embedding_model: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve docs/en index via MCP")
parser.add_argument(
"--persist-dir",
type=Path,
default=Path("storage/llamaindex"),
help="Directory containing the persisted index",
)
parser.add_argument(
"--embedding-model",
type=str,
default="BAAI/bge-base-zh-v1.5",
help="Embedding model identifier; interpreted per backend",
)
parser.add_argument(
"--embed-backend",
choices=("huggingface", "openai"),
default="huggingface",
help="Embedding backend to match the stored index",
)
parser.add_argument(
"--default-k",
type=int,
default=4,
help="Default number of chunks to retrieve when the caller omits 'k'",
)
return parser.parse_args()
def configure_embedding(embed_backend: str, embedding_model: str) -> None:
if embed_backend == "huggingface":
Settings.embed_model = HuggingFaceEmbedding(model_name=embedding_model)
elif embed_backend == "openai":
if OpenAIEmbedding is None:
raise SystemExit(
"OpenAI backend requested but llama-index-embeddings-openai is not installed",
)
Settings.embed_model = OpenAIEmbedding(model=embedding_model)
else: # pragma: no cover
raise SystemExit(f"Unsupported embed backend: {embed_backend}")
def format_chunks(source_nodes) -> str:
parts: List[str] = []
for idx, node in enumerate(source_nodes, start=1):
meta = node.metadata or {}
meta_str = f"path={meta.get('path', 'unknown')} score={node.score:.3f}"
parts.append(f"[{idx}] ({meta_str})\n{node.text.strip()}\n")
return "\n".join(parts)
mcp = FastMCP(
name="docs-retriever",
instructions="Provides chunks from docs/en for answering CLI questions.",
)
_INDEX = None
_DEFAULT_TOP_K = 4
_CONFIG: Optional[IndexConfig] = None
_LOAD_LOCK = Lock()
def _ensure_index_loaded() -> None:
"""Load the persisted index lazily so startup stays under CLI timeouts."""
global _INDEX
if _INDEX is not None:
return
if _CONFIG is None:
raise RuntimeError("Server not configured yet")
with _LOAD_LOCK:
if _INDEX is not None:
return
configure_embedding(_CONFIG.embed_backend, _CONFIG.embedding_model)
storage_context = StorageContext.from_defaults(
persist_dir=str(_CONFIG.persist_dir),
)
_INDEX = load_index_from_storage(storage_context)
@mcp.tool()
def scripting_docs_query(question: str, k: Optional[int] = None) -> str:
"""Retrieve context chunks from docs/en for the given question."""
if not question or not question.strip():
raise ValueError("'question' must be a non-empty string")
_ensure_index_loaded()
if _INDEX is None:
raise RuntimeError("Index not loaded yet")
top_k = k if k and k > 0 else _DEFAULT_TOP_K
retriever = _INDEX.as_retriever(similarity_top_k=top_k)
source_nodes = retriever.retrieve(question)
return format_chunks(source_nodes)
def main() -> None:
global _DEFAULT_TOP_K, _CONFIG
args = parse_args()
persist_dir = args.persist_dir.resolve()
if not persist_dir.exists():
raise SystemExit(f"Persist dir {persist_dir} not found")
_DEFAULT_TOP_K = max(1, args.default_k)
_CONFIG = IndexConfig(
persist_dir=persist_dir,
embed_backend=args.embed_backend,
embedding_model=args.embedding_model,
)
mcp.run("stdio")
if __name__ == "__main__":
main()