#!/usr/bin/env python3
"""
Dual-Path Knowledge Base Builder - Variables vs Methodology Separation
Builds TWO separate vector databases optimized for different retrieval patterns:
1. VARIABLES DATABASE: 65K canonical variables → FAISS index (fast loading) OR ChromaDB
2. METHODOLOGY DATABASE: Documentation, guides, PDFs → ChromaDB (conceptual search)
Usage:
python build-kb.py --variables-only --output-dir variables-db --faiss
python build-kb.py --methodology-only --output-dir methodology-db
python build-kb.py --both --variables-dir variables-db --methodology-dir methodology-db --faiss
"""
import os
import sys
import json
import logging
import argparse
import hashlib
import numpy as np
from pathlib import Path
from typing import List, Dict, Any, Optional
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import re
# Document processing
import PyPDF2
from bs4 import BeautifulSoup
import markdown
import pandas as pd
# Vector DB and embeddings
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
# FAISS for variables database
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
logging.warning("FAISS not available. Install with: pip install faiss-cpu")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - KB-BUILD - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def chunk_content(text, file_path, category, source_path, worker_id=None):
"""Smart chunking: structured data extraction for special files, recursive chunking for regular text"""
chunks = []
# Clean text
text = re.sub(r'\s+', ' ', text).strip()
if len(text) < 100:
return chunks
# For structured data files (like Census variables), use structured extraction
is_structured = any(indicator in file_path.name.lower()
for indicator in ['variables', 'api', 'definitions', 'zcta', 'rel'])
if is_structured and len(text) > 5000: # Large structured files
return chunk_structured_document(text, file_path, category, source_path, worker_id)
# Standard recursive chunking for regular documents
chunk_size = 1000
overlap = 200 # 20% overlap
# Split by paragraphs first (natural boundaries)
paragraphs = text.split('\n\n')
current_chunk = ""
chunk_num = 0
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
# Test if adding this paragraph would exceed chunk size
test_chunk = current_chunk + " " + paragraph if current_chunk else paragraph
if len(test_chunk) > chunk_size and current_chunk:
# Save current chunk
if len(current_chunk.strip()) > 100:
chunks.append(create_chunk_metadata(
current_chunk.strip(), file_path, category, chunk_num, source_path, worker_id
))
chunk_num += 1
# Start new chunk with overlap
overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk
current_chunk = overlap_text + " " + paragraph if overlap_text else paragraph
else:
current_chunk = test_chunk
# Add final chunk
if current_chunk and len(current_chunk.strip()) > 100:
chunks.append(create_chunk_metadata(
current_chunk.strip(), file_path, category, chunk_num, source_path, worker_id
))
return chunks
def chunk_structured_document(text, file_path, category, source_path, worker_id=None):
"""Handle structured documents by splitting on natural entity boundaries"""
chunks = []
chunk_num = 0
# Try splitting on common structured boundaries
split_patterns = [
r'\n(?=[A-Z]\d{5}_\d{3})', # Census variable codes
r'\n(?=Table [A-Z]\d+)', # Table definitions
r'\n(?=\w+:)', # Key-value pairs
r'\n\n', # Paragraph breaks
r'\n' # Line breaks (last resort)
]
sections = [text] # Start with full text
# Try each split pattern until chunks are reasonable size
for pattern in split_patterns:
new_sections = []
for section in sections:
if len(section) <= 2000: # Reasonable size for structured content
new_sections.append(section)
else:
# Split this section
parts = re.split(pattern, section)
new_sections.extend(parts)
sections = new_sections
# Check if we're at reasonable size now
if all(len(s) <= 2000 for s in sections):
break
# Create chunks from sections
for section in sections:
section = section.strip()
if len(section) >= 100: # Minimum chunk size
chunks.append(create_chunk_metadata(
section, file_path, category, chunk_num, source_path, worker_id
))
chunk_num += 1
return chunks
def create_chunk_metadata(text, file_path, category, chunk_num, source_path, worker_id=None):
"""Create metadata for a text chunk"""
# Generate globally unique ID using full file path + content
content_hash = hashlib.md5(text.encode()).hexdigest()[:8]
file_hash = hashlib.md5(str(file_path).encode()).hexdigest()[:6]
worker_prefix = f"w{worker_id}_" if worker_id is not None else ""
chunk_id = f"{worker_prefix}{file_path.stem}_{file_hash}_{chunk_num}_{content_hash}"
return {
'id': chunk_id,
'text': text,
'metadata': {
'source_file': str(file_path.relative_to(source_path)),
'category': category,
'chunk_number': chunk_num,
'file_name': file_path.name,
'file_type': file_path.suffix,
'text_length': len(text)
}
}
def worker_process_files(files_chunk, worker_id, source_dir, temp_dir, model_name):
"""Process files in parallel worker with clean chunking"""
# Set environment for offline model loading
os.environ['HF_HUB_OFFLINE'] = '1'
os.environ['TRANSFORMERS_OFFLINE'] = '1'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
# Load model from local cache
model = SentenceTransformer(model_name, cache_folder='./model_cache', device='cpu')
source_path = Path(source_dir)
all_chunks = []
files_processed = 0
errors = 0
# Progress tracking
total_files = len(files_chunk)
progress_interval = max(50, total_files // 10)
print(f"🔄 Worker {worker_id}: Starting {total_files} files...")
for idx, (file_path, category) in enumerate(files_chunk):
try:
# Progress updates
if idx > 0 and idx % progress_interval == 0:
percent = (idx / total_files) * 100
print(f"Worker {worker_id}: {percent:.0f}% ({idx}/{total_files})")
# Extract text based on file type
if file_path.suffix.lower() == '.pdf':
text = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
if page.extract_text():
text += page.extract_text()
elif file_path.suffix.lower() in ['.html', '.htm']:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
soup = BeautifulSoup(f.read(), 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
elif file_path.suffix.lower() == '.md':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
md_content = f.read()
# Convert markdown to HTML then extract text
html = markdown.markdown(md_content)
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text()
elif file_path.suffix.lower() == '.xlsx':
excel_file = pd.ExcelFile(file_path)
text_parts = []
for sheet_name in excel_file.sheet_names:
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
if not df.empty:
text_parts.append(f"Sheet: {sheet_name}")
text_parts.append(f"Columns: {', '.join(df.columns.astype(str))}")
for _, row in df.head(10).iterrows():
row_text = ' | '.join([f"{col}: {val}" for col, val in row.items() if pd.notna(val)])
if row_text:
text_parts.append(row_text)
except:
continue
text = '\n'.join(text_parts)
elif file_path.suffix.lower() == '.json':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
data = json.load(f)
text = json.dumps(data)[:8000]
elif file_path.suffix.lower() == '.csv':
try:
df = pd.read_csv(file_path, nrows=100)
text = f"CSV: {file_path.name}\nColumns: {', '.join(df.columns)}\n"
text += df.to_string()
except:
text = ""
else:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
if len(text.strip()) < 100:
continue
# Use clean recursive chunking with worker ID
chunks = chunk_content(text, file_path, category, source_path, worker_id)
# Generate embeddings
if chunks:
texts = [c['text'] for c in chunks]
embeddings = model.encode(texts, show_progress_bar=False)
for i, chunk in enumerate(chunks):
chunk['embedding'] = embeddings[i].tolist()
all_chunks.extend(chunks)
files_processed += 1
except Exception as e:
print(f"❌ Worker {worker_id}: ERROR in {file_path.name}: {str(e)}")
errors += 1
continue
# Save to temp file
temp_file = Path(temp_dir) / f"worker_{worker_id}.json"
with open(temp_file, 'w') as f:
json.dump({
'chunks': all_chunks,
'files_processed': files_processed,
'errors': errors
}, f)
print(f"✅ Worker {worker_id}: COMPLETE - {files_processed} files, {len(all_chunks)} chunks, {errors} errors")
return {'files_processed': files_processed, 'chunks_created': len(all_chunks), 'errors': errors}
class DualPathKnowledgeBuilder:
"""
Builds separated knowledge bases optimized for different retrieval patterns:
- Variables DB: Entity lookup for 65K canonical variables
- Methodology DB: Conceptual search for documentation
"""
def __init__(self, source_dir: Path, build_mode: str,
variables_dir: Path = None, methodology_dir: Path = None,
test_mode: bool = False, model_name: str = "sentence-transformers/all-mpnet-base-v2",
workers: int = 5, use_faiss: bool = False):
self.source_dir = Path(source_dir)
self.build_mode = build_mode # 'variables', 'methodology', or 'both'
self.variables_dir = Path(variables_dir) if variables_dir else None
self.methodology_dir = Path(methodology_dir) if methodology_dir else None
self.test_mode = test_mode
self.model_name = model_name
self.workers = workers
# Stats tracking
self.variables_stats = {'canonical_variables': 0, 'files_processed': 0, 'chunks_created': 0, 'errors': 0}
self.methodology_stats = {'files_processed': 0, 'chunks_created': 0, 'errors': 0}
# Initialize model
self._init_model()
# Initialize databases based on build mode
self._init_databases()
self.use_faiss = use_faiss and (build_mode in ['variables', 'both'])
# Validate FAISS availability
if self.use_faiss and not FAISS_AVAILABLE:
logger.error("FAISS requested but not available. Install with: pip install faiss-cpu")
raise ImportError("FAISS not available")
logger.info(f"🚀 Dual-Path Builder initialized:")
logger.info(f" Build mode: {build_mode}")
logger.info(f" Variables dir: {variables_dir}")
logger.info(f" Variables backend: {'FAISS' if self.use_faiss else 'ChromaDB'}")
logger.info(f" Methodology dir: {methodology_dir}")
logger.info(f" Test mode: {test_mode}")
def _init_model(self):
"""Initialize embedding model with local caching"""
os.environ['HF_HUB_OFFLINE'] = '1'
os.environ['TRANSFORMERS_OFFLINE'] = '1'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
logger.info(f"🔄 Loading model: {self.model_name} (768 dimensions)")
self.embedding_model = SentenceTransformer(self.model_name, cache_folder='./model_cache')
logger.info(f"✅ Model cached locally in ./model_cache")
def _init_databases(self):
"""Initialize ChromaDB collections based on build mode"""
self.variables_client = None
self.methodology_client = None
self.variables_collection = None
self.methodology_collection = None
if self.build_mode in ['variables', 'both']:
self.variables_dir.mkdir(parents=True, exist_ok=True)
self.variables_client = chromadb.PersistentClient(
path=str(self.variables_dir),
settings=Settings(anonymized_telemetry=False, allow_reset=True)
)
try:
self.variables_collection = self.variables_client.get_collection("census_variables")
except:
self.variables_collection = self.variables_client.create_collection(
"census_variables",
metadata={"description": "Census canonical variables for entity lookup"}
)
logger.info(f"✅ Variables database initialized: {self.variables_dir}")
if self.build_mode in ['methodology', 'both']:
self.methodology_dir.mkdir(parents=True, exist_ok=True)
self.methodology_client = chromadb.PersistentClient(
path=str(self.methodology_dir),
settings=Settings(anonymized_telemetry=False, allow_reset=True)
)
try:
self.methodology_collection = self.methodology_client.get_collection("census_methodology")
except:
self.methodology_collection = self.methodology_client.create_collection(
"census_methodology",
metadata={"description": "Census methodology and documentation for conceptual search"}
)
logger.info(f"✅ Methodology database initialized: {self.methodology_dir}")
def build_knowledge_bases(self, rebuild: bool = False):
"""Build the knowledge bases according to the specified mode"""
logger.info(f"🚀 Building knowledge bases - Mode: {self.build_mode}")
start_time = time.time()
if rebuild:
self._rebuild_collections()
if self.build_mode in ['variables', 'both']:
logger.info("🎯 Building VARIABLES database...")
self._build_variables_database()
if self.build_mode in ['methodology', 'both']:
logger.info("📚 Building METHODOLOGY database...")
self._build_methodology_database()
build_time = time.time() - start_time
self._display_final_stats(build_time)
def _rebuild_collections(self):
"""Rebuild collections if they exist"""
if self.build_mode in ['variables', 'both']:
if self.use_faiss:
# Remove existing FAISS files
faiss_files = ['variables.faiss', 'variables_metadata.json']
for fname in faiss_files:
fpath = self.variables_dir / fname
if fpath.exists():
fpath.unlink()
logger.info(f"🔄 Removed existing FAISS file: {fname}")
else:
# Rebuild ChromaDB collection
if self.variables_client:
try:
self.variables_client.delete_collection("census_variables")
self.variables_collection = self.variables_client.create_collection(
"census_variables",
metadata={"description": "Census canonical variables for entity lookup"}
)
logger.info("🔄 Variables collection rebuilt")
except:
pass
if self.methodology_client and self.build_mode in ['methodology', 'both']:
try:
self.methodology_client.delete_collection("census_methodology")
self.methodology_collection = self.methodology_client.create_collection(
"census_methodology",
metadata={"description": "Census methodology and documentation for conceptual search"}
)
logger.info("🔄 Methodology collection rebuilt")
except:
pass
def _build_variables_database(self):
"""Build variables database using FAISS or ChromaDB"""
canonical_path = self.source_dir / "canonical_variables.json"
if not canonical_path.exists():
logger.error("❌ No canonical_variables.json found - cannot build variables database")
return
if self.use_faiss:
self._build_variables_faiss(canonical_path)
else:
self._build_variables_chromadb(canonical_path)
def _build_variables_faiss(self, canonical_path: Path):
"""Build variables database using FAISS index"""
logger.info("🎯 Processing canonical variables for FAISS database...")
with open(canonical_path) as f:
data = json.load(f)
variables = data.get('variables', data)
logger.info(f"📊 Found {len(variables)} canonical variables")
# Process variables in batches for memory efficiency
var_items = list(variables.items())
if self.test_mode:
var_items = var_items[:1000]
logger.info(f"🧪 Test mode: Limited to {len(var_items)} variables")
all_texts = []
all_metadata = []
all_embeddings = []
batch_size = 1000
total_batches = (len(var_items) + batch_size - 1) // batch_size
for batch_num, i in enumerate(range(0, len(var_items), batch_size)):
batch = var_items[i:i + batch_size]
logger.info(f"🔄 Processing FAISS batch {batch_num + 1}/{total_batches}")
batch_texts = []
batch_metadata = []
for temporal_id, var_data in batch:
# Create optimized variable text for entity lookup
parts = [f"Variable {temporal_id}"]
label = var_data.get('label', 'Unknown')
concept = var_data.get('concept', 'Unknown')
if label != 'Unknown':
parts.append(f"Label: {label}")
if concept != 'Unknown':
parts.append(f"Concept: {concept}")
# Add context for better search
if var_data.get('survey_context'):
parts.append(f"Survey: {var_data['survey_context']}")
# Add top category weights for semantic understanding
weights = var_data.get('category_weights_linear', {})
if weights:
weight_strs = [f"{k}: {v:.2f}" for k, v in weights.items() if v > 0.1]
if weight_strs:
parts.append(f"Categories: {', '.join(weight_strs)}")
text = ". ".join(parts) + "."
metadata = {
'temporal_id': temporal_id,
'variable_id': var_data.get('variable_id', ''),
'concept': concept,
'label': label,
'source_file': 'canonical_variables.json',
'category': 'canonical_variables'
}
batch_texts.append(text)
batch_metadata.append(metadata)
# Generate embeddings for batch
logger.info(f"🧠 Generating embeddings for {len(batch_texts)} variables...")
embeddings = self.embedding_model.encode(batch_texts, show_progress_bar=False)
all_texts.extend(batch_texts)
all_metadata.extend(batch_metadata)
all_embeddings.extend(embeddings)
self.variables_stats['canonical_variables'] += len(batch_texts)
# Build FAISS index
logger.info(f"🔧 Building FAISS index for {len(all_embeddings)} variables...")
# Convert to numpy array
embeddings_array = np.array(all_embeddings).astype('float32')
# Create FAISS index (L2 distance, good for semantic similarity)
dimension = embeddings_array.shape[1]
index = faiss.IndexFlatL2(dimension)
# Add embeddings to index
index.add(embeddings_array)
# Save FAISS index
faiss_path = self.variables_dir / "variables.faiss"
faiss.write_index(index, str(faiss_path))
logger.info(f"💾 FAISS index saved: {faiss_path}")
# Save metadata separately
metadata_path = self.variables_dir / "variables_metadata.json"
with open(metadata_path, 'w') as f:
json.dump(all_metadata, f, indent=2)
logger.info(f"💾 Metadata saved: {metadata_path}")
# Save build info
build_info = {
'model_name': self.model_name,
'embedding_dimension': dimension,
'variable_count': len(all_embeddings),
'build_timestamp': time.time(),
'index_type': 'faiss_flat_l2'
}
build_info_path = self.variables_dir / "build_info.json"
with open(build_info_path, 'w') as f:
json.dump(build_info, f, indent=2)
logger.info(f"💾 Build info saved: {build_info_path}")
logger.info(f"✅ FAISS variables database complete: {len(all_embeddings)} variables")
def _build_variables_chromadb(self, canonical_path: Path):
"""Build variables database using ChromaDB (original method)"""
logger.info("🎯 Processing canonical variables for ChromaDB database...")
with open(canonical_path) as f:
data = json.load(f)
variables = data.get('variables', data)
logger.info(f"📊 Found {len(variables)} canonical variables")
# Process in batches
var_items = list(variables.items())
batch_size = 200 if self.test_mode else 1000
total_batches = (len(var_items) + batch_size - 1) // batch_size
if self.test_mode:
var_items = var_items[:1000]
logger.info(f"🧪 Test mode: Limited to {len(var_items)} variables")
for batch_num, i in enumerate(range(0, len(var_items), batch_size)):
batch = var_items[i:i + batch_size]
logger.info(f"🔄 Processing variables batch {batch_num + 1}/{total_batches}")
chunks = []
for temporal_id, var_data in batch:
# Create optimized variable text for entity lookup
parts = [f"Variable {temporal_id}"]
label = var_data.get('label', 'Unknown')
concept = var_data.get('concept', 'Unknown')
if label != 'Unknown':
parts.append(f"Label: {label}")
if concept != 'Unknown':
parts.append(f"Concept: {concept}")
# Add context for better search
if var_data.get('survey_context'):
parts.append(f"Survey: {var_data['survey_context']}")
# Add top category weights for semantic understanding
weights = var_data.get('category_weights_linear', {})
if weights:
weight_strs = [f"{k}: {v:.2f}" for k, v in weights.items() if v > 0.1]
if weight_strs:
parts.append(f"Categories: {', '.join(weight_strs)}")
text = ". ".join(parts) + "."
chunk_id = f"var_{temporal_id}_{hashlib.md5(text.encode()).hexdigest()[:8]}"
chunks.append({
'id': chunk_id,
'text': text,
'metadata': {
'source_file': 'canonical_variables.json',
'category': 'canonical_variables',
'temporal_id': temporal_id,
'variable_id': var_data.get('variable_id', ''),
'file_type': 'canonical_variable',
'concept': concept,
'label': label
}
})
# Generate embeddings and store
if chunks:
logger.info(f"🧠 Generating embeddings for {len(chunks)} variables...")
texts = [c['text'] for c in chunks]
embeddings = self.embedding_model.encode(texts, show_progress_bar=False)
# Store in batches of 500
for j in range(0, len(chunks), 500):
batch_chunks = chunks[j:j + 500]
batch_texts = [c['text'] for c in batch_chunks]
batch_ids = [c['id'] for c in batch_chunks]
batch_meta = [c['metadata'] for c in batch_chunks]
batch_embeddings = embeddings[j:j + 500].tolist()
self.variables_collection.add(
documents=batch_texts,
embeddings=batch_embeddings,
metadatas=batch_meta,
ids=batch_ids
)
logger.info(f"💾 Variables: Stored batch {j//500 + 1}: {len(batch_chunks)} variables")
self.variables_stats['canonical_variables'] += len(chunks)
logger.info(f"✅ Variables database complete: {self.variables_stats['canonical_variables']} variables")
def _build_methodology_database(self):
"""Build methodology-only database from documentation files"""
# Create temp directory for parallel processing
temp_dir = self.methodology_dir / "temp"
temp_dir.mkdir(parents=True, exist_ok=True)
# Collect methodology files (exclude canonical_variables.json)
logger.info("📁 Collecting methodology files...")
all_files = []
patterns = ['*.pdf', '*.html', '*.htm', '*.md', '*.txt', '*.Rmd', '*.xlsx']
# Exclude raw data files and focus on documentation
exclude_patterns = [
'canonical_variables.json',
'acs1_raw.json',
'acs5_raw.json',
'raw_data',
'data_dumps'
]
for pattern in patterns:
pattern_files = list(self.source_dir.rglob(pattern))
logger.info(f" Found {len(pattern_files)} {pattern} files")
for file_path in pattern_files:
# Skip hidden files and excluded patterns
if (any(part.startswith('.') for part in file_path.parts) or
any(exclude in str(file_path).lower() for exclude in exclude_patterns)):
continue
if self.test_mode and file_path.stat().st_size > 10 * 1024 * 1024:
continue
# Determine category from path
category = file_path.parts[1] if len(file_path.parts) > 1 else 'general'
all_files.append((file_path, category))
if self.test_mode:
all_files = all_files[:100]
logger.info(f"🧪 Test mode: Limited to {len(all_files)} files")
logger.info(f"🚀 Processing {len(all_files)} methodology files with {self.workers} workers")
# Split files among workers
chunk_size = max(1, len(all_files) // self.workers)
file_chunks = [all_files[i:i + chunk_size] for i in range(0, len(all_files), chunk_size)]
# Process in parallel
logger.info("🔄 Starting parallel workers for methodology...")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
futures = [
executor.submit(worker_process_files, chunk, i, str(self.source_dir),
str(temp_dir), self.model_name)
for i, chunk in enumerate(file_chunks) if chunk
]
completed = 0
for future in futures:
result = future.result()
completed += 1
logger.info(f"✅ Methodology Worker {completed} completed: {result}")
self.methodology_stats['files_processed'] += result['files_processed']
self.methodology_stats['chunks_created'] += result['chunks_created']
self.methodology_stats['errors'] += result['errors']
# Merge temp files into methodology database
self._merge_methodology_temp_files(temp_dir)
# Cleanup
self._cleanup_temp_files(temp_dir)
def _merge_methodology_temp_files(self, temp_dir):
"""Merge temporary files into methodology ChromaDB"""
logger.info("🔄 Merging methodology temp files into ChromaDB...")
temp_files = list(temp_dir.glob("worker_*.json"))
logger.info(f"📁 Found {len(temp_files)} temp files to merge")
total_merged = 0
for i, temp_file in enumerate(temp_files):
logger.info(f"📥 Merging {temp_file.name} ({i+1}/{len(temp_files)})...")
with open(temp_file) as f:
data = json.load(f)
chunks = data.get('chunks', [])
if not chunks:
logger.info(f" ⚠️ No chunks in {temp_file.name}")
continue
# Store in batches
batches = (len(chunks) + 499) // 500
for j in range(0, len(chunks), 500):
batch = chunks[j:j + 500]
batch_num = j // 500 + 1
logger.info(f" 💾 Methodology: Storing batch {batch_num}/{batches}: {len(batch)} chunks")
texts = [c['text'] for c in batch]
ids = [c['id'] for c in batch]
metadatas = [c['metadata'] for c in batch]
embeddings = [c['embedding'] for c in batch]
self.methodology_collection.add(
documents=texts,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
total_merged += len(chunks)
logger.info(f" ✅ Merged {len(chunks)} chunks from {temp_file.name}")
logger.info(f"🎉 Methodology merge complete: {total_merged} total chunks merged")
def _cleanup_temp_files(self, temp_dir):
"""Clean up temporary files"""
for temp_file in temp_dir.glob("worker_*.json"):
temp_file.unlink()
if temp_dir.exists() and not any(temp_dir.iterdir()):
temp_dir.rmdir()
def _display_final_stats(self, build_time):
"""Display comprehensive build statistics"""
logger.info("🎉 DUAL-PATH BUILD COMPLETE!")
logger.info("=" * 60)
logger.info(f"Build time: {build_time:.2f}s")
logger.info(f"Build mode: {self.build_mode}")
if self.build_mode in ['variables', 'both']:
backend_type = "FAISS" if self.use_faiss else "ChromaDB"
logger.info(f"\n🎯 VARIABLES DATABASE ({self.variables_dir}) - {backend_type}:")
logger.info(f" Canonical variables: {self.variables_stats['canonical_variables']:,}")
if self.use_faiss:
faiss_path = self.variables_dir / "variables.faiss"
metadata_path = self.variables_dir / "variables_metadata.json"
logger.info(f" FAISS index: {faiss_path}")
logger.info(f" Metadata file: {metadata_path}")
if faiss_path.exists():
size_mb = faiss_path.stat().st_size / 1024 / 1024
logger.info(f" Index size: {size_mb:.1f} MB")
else:
if self.variables_collection:
logger.info(f" Total documents: {self.variables_collection.count():,}")
if self.build_mode in ['methodology', 'both']:
logger.info(f"\n📚 METHODOLOGY DATABASE ({self.methodology_dir}) - ChromaDB:")
logger.info(f" Files processed: {self.methodology_stats['files_processed']:,}")
logger.info(f" Chunks created: {self.methodology_stats['chunks_created']:,}")
logger.info(f" Errors: {self.methodology_stats['errors']}")
if self.methodology_collection:
logger.info(f" Total documents: {self.methodology_collection.count():,}")
logger.info(f"\n💡 NEXT STEPS:")
if self.build_mode in ['variables', 'both']:
if self.use_faiss:
logger.info(f" Variables: FAISS index for lightning-fast entity lookup")
else:
logger.info(f" Variables: ChromaDB for entity lookup and GraphRAG potential")
if self.build_mode in ['methodology', 'both']:
logger.info(f" Methodology: ChromaDB optimized for conceptual search")
logger.info(f" Ready for MCP server integration with fast startup!")
def main():
parser = argparse.ArgumentParser(description='Dual-Path Knowledge Base Builder')
parser.add_argument('--variables-only', action='store_true', help='Build only variables database')
parser.add_argument('--methodology-only', action='store_true', help='Build only methodology database')
parser.add_argument('--both', action='store_true', help='Build both databases')
parser.add_argument('--faiss', action='store_true', help='Use FAISS for variables database (faster loading)')
parser.add_argument('--rebuild', action='store_true', help='Force rebuild existing databases')
parser.add_argument('--test-mode', action='store_true', help='Test with subset of data')
parser.add_argument('--source-dir', default='source-docs', help='Source directory')
parser.add_argument('--variables-dir', default='variables-db', help='Variables database directory')
parser.add_argument('--methodology-dir', default='methodology-db', help='Methodology database directory')
parser.add_argument('--model', default='sentence-transformers/all-mpnet-base-v2', help='Embedding model')
parser.add_argument('--workers', type=int, default=5, help='Number of parallel workers')
args = parser.parse_args()
# Determine build mode
if args.variables_only:
build_mode = 'variables'
elif args.methodology_only:
build_mode = 'methodology'
elif args.both:
build_mode = 'both'
else:
# Default to both if no mode specified
build_mode = 'both'
logger.info("No build mode specified, defaulting to --both")
# Validate FAISS usage
if args.faiss and build_mode == 'methodology':
logger.warning("FAISS flag ignored - only applies to variables database")
args.faiss = False
builder = DualPathKnowledgeBuilder(
source_dir=Path(args.source_dir),
build_mode=build_mode,
variables_dir=Path(args.variables_dir),
methodology_dir=Path(args.methodology_dir),
test_mode=args.test_mode,
model_name=args.model,
workers=args.workers,
use_faiss=args.faiss
)
builder.build_knowledge_bases(rebuild=args.rebuild)
logger.info("🚀 Dual-path knowledge base build completed!")
if __name__ == "__main__":
main()