"""
Multi-Source Hybrid Search Indexer for Local Search MCP Server
Supports Wikipedia (static, large) and Local Files (dynamic, personal).
Handles downloading, tokenizing, and indexing data with BM25 + Vector search.
"""
import os
import pickle
import sys
import json
import hashlib
from typing import List, Dict, Literal
from collections import Counter
from datasets import load_dataset
from rank_bm25 import BM25Okapi
from tqdm import tqdm
import chromadb
from chromadb.utils import embedding_functions
from langchain_core.documents import Document
from src.chunking import ChunkingStrategy, get_config_for_file, get_smart_config
from src.document_analyzer import DocumentAnalyzer
from src.content_cleaner import ContentCleaner
from src.quality_metrics import QualityAnalyzer
from src.logger import logger
# Default paths for Wikipedia index
WIKI_INDEX_PATH = "data/wiki_index.pkl"
WIKI_CHROMA_PATH = "data/chroma_db"
LOCAL_CHROMA_PATH = "data/local_chroma_db"
STATE_FILE_DIR = "data/indexing_states" # Directory for per-path state files
# Number of documents to index for Wikipedia
DEFAULT_SUBSET_SIZE = 1_000_000
class BaseHybridIndexer:
"""
Base class for hybrid search (BM25 + Vector) indexing.
Provides common functionality for both Wikipedia and Local File indexers.
"""
def __init__(self, collection_name: str, chroma_path: str):
"""
Initialize the base indexer.
Args:
collection_name: Name for the ChromaDB collection
chroma_path: Path to the ChromaDB persistent storage
"""
self.bm25 = None
self.documents = [] # Stores document metadata
# Initialize ChromaDB for vector search
self.chroma_client = chromadb.PersistentClient(path=chroma_path)
# Use lightweight embedding model (all-MiniLM-L6-v2)
# Fast on CPU, good quality for semantic search
self.emb_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="intfloat/multilingual-e5-large"
)
self.collection_name = collection_name
self.collection = None
def search(self, query: str, top_k: int = 3) -> List[Dict]:
"""
Search the index using BM25 algorithm (keyword search).
Args:
query: Search query string
top_k: Number of top results to return
Returns:
List of document dictionaries
"""
if not self.bm25:
return []
tokenized_query = query.lower().split()
# Get top N documents based on BM25 scores
docs = self.bm25.get_top_n(tokenized_query, self.documents, n=top_k)
return docs if docs else []
def vector_search(self, query: str, top_k: int = 3) -> List[Dict]:
"""
Search using vector similarity (semantic search).
Args:
query: Search query string
top_k: Number of top results to return
Returns:
List of document dictionaries
"""
if not self.collection:
return []
try:
# Query ChromaDB for semantic similarity
results = self.collection.query(
query_texts=[query],
n_results=top_k
)
# Convert ChromaDB results to standard format
docs = []
if results['documents'] and results['documents'][0]:
for doc_text, metadata in zip(results['documents'][0], results['metadatas'][0]):
docs.append({
"title": metadata.get('title', 'Unknown'),
"url": metadata.get('url', ''),
"text": doc_text
})
return docs
except Exception as e:
print(f"Vector search error: {e}", file=sys.stderr)
return []
def hybrid_search(
self,
query: str,
top_k: int = 3,
strategy: Literal["keyword", "semantic", "hybrid"] = "hybrid"
) -> List[Dict]:
"""
Perform hybrid search combining BM25 and vector search using Reciprocal Rank Fusion (RRF).
Args:
query: Search query string
top_k: Number of top results to return
strategy: Search strategy - 'keyword' (BM25 only), 'semantic' (vector only), or 'hybrid' (both)
Returns:
List of document dictionaries with title, url, text, and source (bm25/vector/both)
"""
# RRF constant (typical value is 60)
RRF_K = 60
# Store RRF scores and sources for each document
rrf_scores: Dict[str, float] = {}
doc_data: Dict[str, Dict] = {}
doc_sources: Dict[str, List[str]] = {}
if strategy in ["keyword", "hybrid"]:
bm25_results = self.search(query, top_k=top_k)
for rank, doc in enumerate(bm25_results):
url = doc['url']
rrf_scores[url] = rrf_scores.get(url, 0) + 1 / (RRF_K + rank + 1)
# Keep the document with longer text (BM25 has full text)
if url not in doc_data or len(doc.get('text', '')) > len(doc_data[url].get('text', '')):
doc_data[url] = doc
if url not in doc_sources:
doc_sources[url] = []
doc_sources[url].append("bm25")
if strategy in ["semantic", "hybrid"]:
vector_results = self.vector_search(query, top_k=top_k)
for rank, doc in enumerate(vector_results):
url = doc['url']
rrf_scores[url] = rrf_scores.get(url, 0) + 1 / (RRF_K + rank + 1)
# Keep the document with longer text (prefer BM25's full text over vector's truncated text)
if url not in doc_data or len(doc.get('text', '')) > len(doc_data[url].get('text', '')):
doc_data[url] = doc
if url not in doc_sources:
doc_sources[url] = []
doc_sources[url].append("vector")
# Sort by RRF score (descending) and take top_k
sorted_urls = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)[:top_k]
# Build final results with source information
results = []
for url in sorted_urls:
doc = doc_data[url].copy()
sources = doc_sources[url]
doc['source'] = "both" if len(sources) > 1 else sources[0]
results.append(doc)
return results
class WikiIndexer(BaseHybridIndexer):
"""
Wikipedia indexer with persistent BM25 and vector indices.
Builds index once and loads from cache on subsequent runs.
"""
def __init__(self):
super().__init__(collection_name="wikipedia", chroma_path=WIKI_CHROMA_PATH)
self.index_path = WIKI_INDEX_PATH
def load_or_build(self):
"""Load existing index or build a new one if not found."""
if os.path.exists(self.index_path):
print(f"Loading BM25 index from {self.index_path}...", file=sys.stderr)
with open(self.index_path, "rb") as f:
data = pickle.load(f)
self.bm25 = data["bm25"]
self.documents = data["documents"]
print(f"BM25 index loaded. {len(self.documents)} documents available.", file=sys.stderr)
# Load or create ChromaDB collection
print(f"Loading vector index from {WIKI_CHROMA_PATH}...", file=sys.stderr)
try:
self.collection = self.chroma_client.get_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
print(f"Vector index loaded. {self.collection.count()} documents in vector store.", file=sys.stderr)
except Exception as e:
print(f"Vector index not found, will be built on next index build: {e}", file=sys.stderr)
self.collection = None
else:
self.build_index()
def build_index(self):
"""Build hybrid search index (BM25 + Vector) from Wikipedia dataset."""
# Get subset size from environment variable or use default
subset_size = int(os.environ.get("WIKI_SUBSET_SIZE", DEFAULT_SUBSET_SIZE))
print(f"Downloading dataset (English Wikipedia, {subset_size:,} articles)...", file=sys.stderr)
# Use subset for practical memory usage
if subset_size > 0:
split = f"train[:{subset_size}]"
else:
split = "train"
ds = load_dataset("wikimedia/wikipedia", "20231101.en", split=split)
print("Tokenizing corpus for BM25...", file=sys.stderr)
tokenized_corpus = []
for row in tqdm(ds, desc="Processing documents", file=sys.stderr):
text = row['text']
# Simple space-based tokenization
tokens = text.lower().split()
tokenized_corpus.append(tokens)
# Store metadata for search results
self.documents.append({
"title": row['title'],
"url": row['url'],
"text": text[:500] + "..." if len(text) > 500 else text
})
print("Building BM25 index...", file=sys.stderr)
self.bm25 = BM25Okapi(tokenized_corpus)
print(f"Saving BM25 index to {self.index_path}...", file=sys.stderr)
os.makedirs(os.path.dirname(self.index_path), exist_ok=True)
with open(self.index_path, "wb") as f:
pickle.dump({"bm25": self.bm25, "documents": self.documents}, f)
print(f"BM25 index build complete. {len(self.documents)} documents indexed.", file=sys.stderr)
# Build vector index with ChromaDB
print("Building vector index with ChromaDB...", file=sys.stderr)
print("(This may take a while for embedding generation)", file=sys.stderr)
# Create or recreate collection
try:
self.chroma_client.delete_collection(name=self.collection_name)
except:
pass
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
# Add documents to ChromaDB in batches
batch_size = 100
ids = [doc['url'] for doc in self.documents]
docs = [doc['text'] for doc in self.documents]
metadatas = [{"title": doc['title'], "url": doc['url']} for doc in self.documents]
for i in tqdm(range(0, len(docs), batch_size), desc="Embedding documents", file=sys.stderr):
batch_ids = ids[i:i+batch_size]
batch_docs = docs[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
self.collection.add(
ids=batch_ids,
documents=batch_docs,
metadatas=batch_metadatas
)
print(f"Vector index build complete. {self.collection.count()} documents in vector store.", file=sys.stderr)
def build_vector_index(self):
"""
Build only the vector index from existing BM25 documents.
Use this when BM25 index exists but vector index is missing.
"""
if not self.documents:
raise ValueError("No documents loaded. Call load_or_build() first to load BM25 index.")
print(f"Building vector index from {len(self.documents)} existing documents...", file=sys.stderr)
print("(This may take a while for embedding generation)", file=sys.stderr)
# Create or recreate collection
try:
self.chroma_client.delete_collection(name=self.collection_name)
except:
pass
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
# Add documents to ChromaDB in batches
batch_size = 100
ids = [doc['url'] for doc in self.documents]
docs = [doc['text'] for doc in self.documents]
metadatas = [{"title": doc['title'], "url": doc['url']} for doc in self.documents]
for i in tqdm(range(0, len(docs), batch_size), desc="Embedding documents", file=sys.stderr):
batch_ids = ids[i:i+batch_size]
batch_docs = docs[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
self.collection.add(
ids=batch_ids,
documents=batch_docs,
metadatas=batch_metadatas
)
print(f"Vector index build complete. {self.collection.count()} documents in vector store.", file=sys.stderr)
class LocalFileIndexer(BaseHybridIndexer):
"""
Local file indexer that scans directories for Markdown/text files.
Rebuilds BM25 index on each startup (fast for local files).
Uses persistent vector index with upsert for updates.
Each directory path gets its own ChromaDB collection and state file.
"""
def __init__(self, directory_path: str, extensions: List[str] = None):
"""
Initialize local file indexer.
Args:
directory_path: Directory to scan for files
extensions: File extensions to include (default: [".md", ".txt", ".py"])
"""
# Normalize path to absolute path for consistent hashing
self.directory_path = os.path.abspath(directory_path)
self.extensions = extensions if extensions else [".md", ".txt", ".py"]
# Generate path-specific collection name and state file
self.path_hash = self._generate_path_hash(self.directory_path)
collection_name = f"local_files_{self.path_hash}"
self.state_file_path = os.path.join(STATE_FILE_DIR, f"state_{self.path_hash}.json")
super().__init__(collection_name=collection_name, chroma_path=LOCAL_CHROMA_PATH)
# チャンキング戦略クラスを初期化
self.chunker = ChunkingStrategy()
# Load indexing state for incremental updates
self.state = self._load_state()
logger.info(
f"LocalFileIndexer initialized",
directory=self.directory_path,
collection=collection_name,
state_file=self.state_file_path
)
@staticmethod
def _generate_path_hash(path: str) -> str:
"""
Generate a short hash from the directory path for collection naming.
Uses first 12 characters of MD5 hash for compactness.
Args:
path: Absolute directory path
Returns:
Short hash string (12 chars)
"""
path_normalized = os.path.normpath(path).lower()
hash_obj = hashlib.md5(path_normalized.encode('utf-8'))
return hash_obj.hexdigest()[:12]
def _load_state(self) -> Dict:
"""Load indexing state from path-specific state file."""
if os.path.exists(self.state_file_path):
try:
with open(self.state_file_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load state file: {e}")
return {}
return {}
def _save_state(self):
"""Save indexing state to path-specific state file."""
try:
os.makedirs(os.path.dirname(self.state_file_path), exist_ok=True)
with open(self.state_file_path, "w", encoding="utf-8") as f:
json.dump(self.state, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to save state file: {e}")
def _calculate_file_hash(self, filepath: str) -> str:
"""
Calculate MD5 hash of file content for strict change detection.
Args:
filepath: Path to the file
Returns:
MD5 hash as hexadecimal string
"""
hash_md5 = hashlib.md5()
try:
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
logger.warning(f"Failed to calculate hash for {filepath}: {e}")
return ""
def _remove_from_index(self, file_paths: List[str]):
"""
Remove chunks associated with specified file paths from the index.
Args:
file_paths: List of file paths to remove
"""
all_chunk_ids_to_delete = []
for path in file_paths:
if path in self.state:
chunk_ids = self.state[path].get('chunk_ids', [])
all_chunk_ids_to_delete.extend(chunk_ids)
logger.debug(f"Marking {len(chunk_ids)} chunks for deletion from {path}")
del self.state[path]
if all_chunk_ids_to_delete:
try:
# Get or create collection
try:
collection = self.chroma_client.get_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
except:
# Collection doesn't exist yet, nothing to delete
logger.debug(f"Collection {self.collection_name} doesn't exist, skipping deletion")
return
# Delete chunks from ChromaDB
collection.delete(ids=all_chunk_ids_to_delete)
logger.info(f"Deleted {len(all_chunk_ids_to_delete)} chunks from {len(file_paths)} files")
except Exception as e:
logger.error(f"Error deleting chunks: {e}")
def build_index(self):
"""
Build/rebuild hybrid index from local files with Intelligent Preprocessing Pipeline.
Uses incremental indexing to only process changed files (based on mtime).
Pipeline stages:
1. Change Detection (new/updated/deleted/unchanged files)
2. Document Analysis (quality scoring, language detection)
3. Smart Chunking (adaptive sizing, language-aware)
4. Content Cleaning (deduplication, boilerplate removal)
5. Quality Metrics (size distribution, uniqueness, diversity)
6. Vector Indexing with structured logging
"""
# Import here to avoid circular dependency
from src.loaders import load_local_files
logger.info(f"Scanning local files in {self.directory_path}")
raw_documents = load_local_files(self.directory_path, self.extensions)
if not raw_documents:
logger.warning(f"No documents found in {self.directory_path}")
self.bm25 = None
return
logger.info(f"Found {len(raw_documents)} local files", total_files=len(raw_documents))
# === Stage 0: Incremental Indexing - Change Detection ===
logger.info("Stage 0/5: Detecting file changes (incremental indexing)")
# Create a map of current files for quick lookup
current_files = {doc['path']: doc for doc in raw_documents}
# 1. Detect deleted files (in state but not in current files)
deleted_paths = [p for p in self.state.keys() if p not in current_files]
if deleted_paths:
logger.info(f"Removing {len(deleted_paths)} deleted files from index")
self._remove_from_index(deleted_paths)
# 2. Detect new/updated files vs unchanged files
to_process_docs = []
skipped_count = 0
for doc in raw_documents:
path = doc['path']
current_mtime = os.path.getmtime(path)
# Check if file is unchanged (same mtime)
if path in self.state:
stored_mtime = self.state[path].get('mtime')
if stored_mtime == current_mtime:
# File unchanged, skip processing
skipped_count += 1
continue
else:
# File updated, remove old chunks before re-indexing
logger.debug(f"File updated: {path} (mtime changed)")
self._remove_from_index([path])
# New or updated file
to_process_docs.append(doc)
logger.info(
f"Change detection complete",
total_files=len(raw_documents),
unchanged=skipped_count,
new_or_updated=len(to_process_docs),
deleted=len(deleted_paths)
)
# Early exit if no changes detected
if not to_process_docs and not deleted_paths:
logger.info("No changes detected. Index is up to date.")
# Load or reload documents for BM25 index
if not self.documents:
# First time initialization - load all documents
self.documents = load_local_files(self.directory_path, self.extensions)
logger.info(f"Initial load: {len(self.documents)} documents")
if self.documents:
tokenized_corpus = [doc['text'].lower().split() for doc in self.documents]
self.bm25 = BM25Okapi(tokenized_corpus)
return
logger.info(f"Processing {len(to_process_docs)} changed files")
# Use only the files that need processing
raw_documents = to_process_docs
# Initialize pipeline components
analyzer = DocumentAnalyzer()
cleaner = ContentCleaner()
quality_analyzer = QualityAnalyzer()
# Note: BM25 will be rebuilt with ALL documents at the end (fast operation)
# For now, only process the changed documents for vector indexing
# 2. Document Analysis & Smart Chunking
logger.info("Stage 1/5: Analyzing documents and applying smart chunking")
# Get or create ChromaDB collection (don't delete existing data)
try:
self.collection = self.chroma_client.get_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
logger.debug(f"Using existing collection: {self.collection_name}")
except:
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
embedding_function=self.emb_fn
)
logger.debug(f"Created new collection: {self.collection_name}")
all_chunks = []
doc_analyses = []
language_counts = Counter()
# Track chunk IDs per file for state management
file_chunk_mapping = {} # {file_path: [chunk_ids]}
# Process each document with progress tracking
for idx, doc in enumerate(tqdm(raw_documents, desc="Analyzing & Chunking", file=sys.stderr)):
# Analyze document
analysis = analyzer.analyze(doc['text'], doc.get('path', ''))
doc_analyses.append(analysis)
language_counts[analysis.language] += 1
# Log progress
if idx % 10 == 0:
logger.log_progress(
stage="document_analysis",
current=idx + 1,
total=len(raw_documents),
metrics={
"avg_quality": f"{sum(a.quality_score for a in doc_analyses) / len(doc_analyses):.3f}",
"languages_detected": len(language_counts)
}
)
# Smart chunking with language-aware sizing
lc_doc = Document(
page_content=doc['text'],
metadata={
"title": doc['title'],
"url": doc['url'],
"path": doc.get('path', ''),
"quality_score": analysis.quality_score,
"language": analysis.language
}
)
# Get smart config (language-aware, content-aware)
config = get_smart_config(
filename=doc.get('path', 'unknown.txt'),
text_content=doc['text']
)
# Apply chunking
chunks = self.chunker.chunk_documents([lc_doc], config)
# Add chunk index to metadata and track chunk IDs for this file
file_path = doc.get('path', '')
file_chunk_ids = []
for i, chunk in enumerate(chunks):
chunk.metadata['chunk_index'] = i
chunk.metadata['chunking_method'] = config.method.value
chunk.metadata['detected_language'] = config.detected_language
chunk.metadata['language_multiplier'] = str(config.language_multiplier)
# Generate chunk ID (will be used later)
chunk_id = f"{chunk.metadata.get('url', 'unknown')}#chunk{i}"
file_chunk_ids.append(chunk_id)
all_chunks.extend(chunks)
file_chunk_mapping[file_path] = file_chunk_ids
logger.info(
f"Chunking complete",
total_chunks_before_cleaning=len(all_chunks),
avg_chunks_per_doc=f"{len(all_chunks)/len(raw_documents):.1f}" if raw_documents else "0"
)
# 3. Content Cleaning
logger.info("Stage 2/5: Cleaning content (deduplication, boilerplate removal)")
cleaned_chunks, cleaning_stats = cleaner.clean_chunks(all_chunks, detect_boilerplate=True)
logger.info(
"Content cleaning complete",
exact_duplicates_removed=cleaning_stats.exact_duplicates_removed,
near_duplicates_removed=cleaning_stats.near_duplicates_removed,
boilerplate_removed=cleaning_stats.boilerplate_removed,
too_small_removed=cleaning_stats.too_small_removed,
uniqueness_ratio=f"{cleaning_stats.uniqueness_ratio:.3f}",
chunks_remaining=cleaning_stats.total_output
)
# 4. Quality Metrics
logger.info("Stage 3/5: Computing quality metrics")
metrics = quality_analyzer.analyze(cleaned_chunks)
logger.info("Quality metrics computed", **metrics.to_dict())
# 5. Vector Indexing with Progress
logger.info("Stage 4/5: Indexing chunks to vector database")
chunk_ids = []
chunk_texts = []
chunk_metadatas = []
for i, chunk in enumerate(cleaned_chunks):
chunk_id = f"{chunk.metadata.get('url', 'unknown')}#chunk{chunk.metadata.get('chunk_index', i)}"
chunk_ids.append(chunk_id)
chunk_texts.append(chunk.page_content)
# Clean metadata for ChromaDB (no None values)
meta = chunk.metadata.copy()
for k, v in meta.items():
if v is None:
meta[k] = ""
elif not isinstance(v, (str, int, float, bool)):
meta[k] = str(v)
chunk_metadatas.append(meta)
# Batch insert with progress logging
if chunk_texts:
batch_size = 50
total_batches = (len(chunk_texts) + batch_size - 1) // batch_size
for batch_idx, i in enumerate(range(0, len(chunk_texts), batch_size)):
end = min(i + batch_size, len(chunk_texts))
self.collection.add(
ids=chunk_ids[i:end],
documents=chunk_texts[i:end],
metadatas=chunk_metadatas[i:end]
)
# Log progress every 10 batches
if batch_idx % 10 == 0:
logger.log_progress(
stage="vector_indexing",
current=end,
total=len(chunk_texts),
metrics={"batch_size": batch_size}
)
# Update state for processed files
logger.info("Updating indexing state for processed files")
for file_path, chunk_id_list in file_chunk_mapping.items():
if file_path:
self.state[file_path] = {
"mtime": os.path.getmtime(file_path),
"chunk_ids": chunk_id_list
}
# Save state to disk
self._save_state()
logger.info("Indexing state saved")
# 6. Rebuild BM25 index with ALL documents (fast operation)
logger.info("Stage 5/5: Rebuilding BM25 index with all documents")
# Reload all documents for BM25 (text-based, very fast)
all_docs = load_local_files(self.directory_path, self.extensions)
self.documents = all_docs
tokenized_corpus = [doc['text'].lower().split() for doc in self.documents]
self.bm25 = BM25Okapi(tokenized_corpus)
logger.info(f"BM25 index rebuilt with {len(self.documents)} total documents")
# Final summary
avg_quality = sum(a.quality_score for a in doc_analyses) / len(doc_analyses) if doc_analyses else 0.0
logger.log_document_stats(
total_docs=len(all_docs),
total_chunks=len(chunk_texts),
avg_quality=avg_quality,
unique_ratio=cleaning_stats.uniqueness_ratio,
languages=dict(language_counts)
)
logger.info(
"Local file index build complete (incremental)",
total_files=len(all_docs),
processed_files=len(raw_documents),
total_chunks=len(chunk_texts),
avg_quality_score=f"{avg_quality:.3f}"
)