"""
Indexer implementation for local RAG MCP server.
"""
import os
import argparse
from typing import List, Optional, Callable
from pathlib import Path
from dotenv import load_dotenv
from llama_index.core import (
Document,
VectorStoreIndex,
StorageContext,
Settings,
)
from llama_index.core.readers import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceWindowNodeParser, SentenceSplitter
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.storage.index_store import SimpleIndexStore
from llama_index.vector_stores.chroma.base import ChromaVectorStore
import chromadb
from indexer import DEFAULT_LOCAL_EMBEDDING_MODEL_NAME, build_embedding_function, build_tokenizer, \
DEFAULT_INDEXER_CHUNK_SIZE, DEFAULT_INDEXER_OVERLAP
class ChunkCounter:
accepted:int
rejected:int
def __init__(self, accepted:int, rejected: int):
self.accepted = accepted
self.rejected = rejected
def check_token_number(chunk:str, max_size, tokenizer) :
token_num = len(tokenizer.tokenize(chunk))
if token_num> max_size:
raise Exception("Incorrect chunking %d, %s" % (token_num, max_size))
def create_filter_tokens(max_size):
def filter_tokens():
pass
return filter_tokens
def split_by_sentence_splitter(chunk_size, chunk_overlap, tokenizer, chunk_counter: ChunkCounter) -> Callable[[str], List[str]]:
splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
def split(text: str) -> List[str]:
# The SentenceSplitter's split_text method returns a list of strings which are already sentences, so we don't need to do additional processing :-)
skipped = 0
accepted = 0
result = splitter.split_text(text)
# We need to tokenize to identify precisely if the chunk sizes comply with the embedding model max token length
tok_results = tokenizer.tokenize_multiple(result)
actual_results = []
failed_nb_tokens = []
for res, toks in zip(result, tok_results):
if len(toks) > chunk_size:
failed_nb_tokens.append("%s"%len(toks))
skipped += 1
chunk_counter.rejected += 1
else:
accepted += 1
actual_results.append(res)
chunk_counter.accepted += 1
if skipped:
print("Chunks - Accepted = %d, skipped = %d - lengths = [%s]"%(accepted, skipped, ','.join(failed_nb_tokens)))
else:
print(
"Chunks - Accepted = %d" % accepted)
return actual_results
return split
class Indexer:
def __init__(
self,
input_path: str,
output_path: str,
embed_endpoint: Optional[str] = None,
tokenizer_endpoint: Optional[str] = None,
embedding_model_name: Optional[str] = DEFAULT_LOCAL_EMBEDDING_MODEL_NAME,
tokenizer_model_name: Optional[str] = None,
window_size: int = 3,
chunk_size: int = DEFAULT_INDEXER_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_INDEXER_OVERLAP
):
"""
Initialize the indexer.
Args:
input_path: Path to the input directory containing documents
output_path: Path to store the ChromaDB index
embed_endpoint: Optional OpenAI embedding endpoint URL
tokenizer_endpoint: Optional OpenAI embedding endpoint URL
window_size: Number of sentences to include before and after the target sentence
"""
self.input_path = Path(input_path)
self.output_path = Path(output_path)
self.window_size = window_size
self.embedding_model_name = embedding_model_name
self.embed_model = build_embedding_function(embedding_model_name, embed_endpoint=embed_endpoint)
self.tokenizer = build_tokenizer(embedding_model_name,
token_endpoint=tokenizer_endpoint,
tokenizer_model_name=tokenizer_model_name)
# Create output directory if it doesn't exist
self.output_path.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def _load_documents(self) -> List[Document]:
"""Load documents from the input directory."""
reader = SimpleDirectoryReader(
input_dir=str(self.input_path),
recursive=True,
required_exts=[".txt", ".md", ".pdf", ".doc", ".docx"]
)
return reader.load_data()
def _create_node_parser(self, chunk_counter: ChunkCounter) -> SentenceWindowNodeParser:
"""Create a sentence window node parser."""
return SentenceWindowNodeParser.from_defaults(
window_size=self.window_size,
window_metadata_key="window",
original_text_metadata_key="original_text",
sentence_splitter=split_by_sentence_splitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
tokenizer=self.tokenizer,
chunk_counter=chunk_counter)
)
def build_index(self) -> None:
"""Build the vector store index."""
# Load documents
documents = self._load_documents()
# Create node parser
chunk_counter = ChunkCounter(0,0)
node_parser = self._create_node_parser(chunk_counter)
# Configure settings
Settings.embed_model = self.embed_model
Settings.node_parser = node_parser
# Create new stores
docstore = SimpleDocumentStore()
index_store = SimpleIndexStore()
# Initialize ChromaDB
chroma_client = chromadb.PersistentClient(path=str(self.output_path))
vector_store = ChromaVectorStore(chroma_collection=chroma_client.get_or_create_collection("documents"))
# Create storage context with new stores
storage_context = StorageContext.from_defaults(
docstore=docstore,
index_store=index_store,
vector_store=vector_store,
)
# Build and persist index
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
)
# Persist the index
index.storage_context.persist(persist_dir=str(self.output_path))
print("%d/%d chunks indexed - %d %%"
%(chunk_counter.accepted, chunk_counter.accepted+chunk_counter.rejected,
chunk_counter.accepted*100/(chunk_counter.accepted+chunk_counter.rejected)), )
def load_index(self) -> VectorStoreIndex:
"""Load the vector store index."""
# Initialize ChromaDB
chroma_client = chromadb.PersistentClient(path=str(self.output_path))
vector_store = ChromaVectorStore(chroma_collection=chroma_client.get_or_create_collection("documents"))
# Create storage context
storage_context = StorageContext.from_defaults(
vector_store=vector_store,
)
# Load the index
return VectorStoreIndex.from_vector_store(
vector_store=vector_store,
storage_context=storage_context,
)
def main():
"""Command-line interface for the indexer."""
parser = argparse.ArgumentParser(
description="Index documents for local RAG MCP server"
)
parser.add_argument(
"input_path",
type=str,
help="Path to the input directory containing documents"
)
parser.add_argument(
"output_path",
type=str,
help="Path to store the ChromaDB index"
)
parser.add_argument(
"--embed-endpoint",
type=str,
help="OpenAI embedding endpoint URL (optional)",
default=None
)
parser.add_argument(
"--tokenizer-endpoint",
type=str,
help="OpenAI tokenizer endpoint URL (optional)",
default=None
)
parser.add_argument(
"--embed-model",
type=str,
help="OpenAI embedding model",
default=DEFAULT_LOCAL_EMBEDDING_MODEL_NAME
)
parser.add_argument(
"--tokenizer-model",
type=str,
help="OpenAI tokenizer model",
default=None
)
parser.add_argument(
"--window-size",
type=int,
help="Number of sentences to include before and after the target sentence",
default=3
)
parser.add_argument(
"--chunk-size",
type=int,
help="Chunk size in number of tokens( must comply with the embedding model spec)",
default=DEFAULT_INDEXER_CHUNK_SIZE
)
parser.add_argument(
"--chunk-overlap",
type=int,
help="Chunk overlap",
default=DEFAULT_INDEXER_OVERLAP
)
args = parser.parse_args()
# Initialize and run the indexer
indexer = Indexer(
input_path=args.input_path,
output_path=args.output_path,
embed_endpoint=args.embed_endpoint,
tokenizer_endpoint=args.tokenizer_endpoint,
embedding_model_name=args.embed_model,
tokenizer_model_name=args.tokenizer_model,
window_size=args.window_size,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap
)
print(f"Building index from {args.input_path}...")
indexer.build_index()
print(f"Index built successfully and saved to {args.output_path}")
if __name__ == "__main__":
main()