"""
FAISS-Integrated Dual-Path Vector Database for Census MCP Server
Fixes timeout issues by replacing ChromaDB variable loading with instant FAISS loading.
This implementation:
1. Loads variables from FAISS index (instant startup)
2. Keeps methodology in ChromaDB (conceptual search)
3. Smart routing between the two databases
4. Backward compatible interface
"""
import asyncio
import json
import logging
import os
import numpy as np
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# Vector DB imports
try:
import chromadb
from chromadb.config import Settings
CHROMADB_AVAILABLE = True
except ImportError:
CHROMADB_AVAILABLE = False
logging.warning("ChromaDB not available for methodology search")
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
logging.warning("FAISS not available for variable search")
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
logging.warning("SentenceTransformers not available")
from utils.config import get_config
logger = logging.getLogger(__name__)
class DualPathKnowledgeBase:
"""
Dual-path knowledge base with FAISS for variables and ChromaDB for methodology.
Fixes MCP timeout issues by:
- Variables: FAISS index (instant loading, no lazy loading)
- Methodology: ChromaDB (conceptual search quality)
- Smart routing: Detects query type and routes to appropriate database
"""
def __init__(self, corpus_path: Optional[Path] = None, vector_db_path: Optional[Path] = None):
"""Initialize dual-path knowledge base."""
self.config = get_config()
# Database paths
self.variables_db_path = self.config.base_dir / "knowledge-base" / "variables-faiss"
self.methodology_db_path = self.config.base_dir / "knowledge-base" / "methodology-db"
# FAISS components (variables)
self.variables_index = None
self.variables_metadata = None
self.embedding_model = None
# ChromaDB components (methodology)
self.methodology_client = None
self.methodology_collection = None
# Initialize components
self._init_embedding_model()
self._init_dual_vector_dbs()
logger.info("Dual-path knowledge base initialized successfully")
def _init_embedding_model(self):
"""Initialize sentence transformer for embeddings."""
if not SENTENCE_TRANSFORMERS_AVAILABLE:
logger.error("SentenceTransformers not available. Search functionality disabled.")
return
try:
# Use consistent model across the system
model_name = "BAAI/bge-large-en-v1.5" # Match FAISS index
logger.info(f"Loading embedding model: {model_name}")
self.embedding_model = SentenceTransformer(model_name, trust_remote_code=True)
self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()
logger.info(f"Embedding model loaded: {self.embedding_dimension}D vectors")
except Exception as e:
logger.error(f"Failed to load embedding model: {str(e)}")
self.embedding_model = None
def _init_dual_vector_dbs(self):
"""Initialize both FAISS (variables) and ChromaDB (methodology) databases."""
# 1. Initialize FAISS variables database
self._init_variables_faiss()
# 2. Initialize ChromaDB methodology database
self._init_methodology_chromadb()
# Log status
var_count = len(self.variables_metadata) if self.variables_metadata else 0
method_count = self.methodology_collection.count() if self.methodology_collection else 0
logger.info(f"Variables database: {var_count:,} variables (FAISS)")
logger.info(f"Methodology database: {method_count:,} documents (ChromaDB)")
def _init_variables_faiss(self):
"""Initialize FAISS variables database for instant loading."""
if not FAISS_AVAILABLE:
logger.warning("FAISS not available - falling back to ChromaDB for variables")
return
try:
# Check for FAISS files
faiss_index_path = self.variables_db_path / "variables.faiss"
metadata_path = self.variables_db_path / "variables_metadata.json"
if faiss_index_path.exists() and metadata_path.exists():
# Load FAISS index
self.variables_index = faiss.read_index(str(faiss_index_path))
# Load metadata
with open(metadata_path, 'r') as f:
self.variables_metadata = json.load(f)
logger.info(f"✅ Variables FAISS loaded: {len(self.variables_metadata):,} variables")
else:
logger.warning(f"FAISS files not found at {self.variables_db_path}")
logger.warning("Expected: variables.faiss + variables_metadata.json")
# Could fall back to ChromaDB here if needed
except Exception as e:
logger.error(f"Failed to load FAISS variables database: {str(e)}")
self.variables_index = None
self.variables_metadata = None
def _init_methodology_chromadb(self):
"""Initialize ChromaDB methodology database."""
if not CHROMADB_AVAILABLE:
logger.warning("ChromaDB not available - methodology search disabled")
return
try:
# Create methodology DB directory
self.methodology_db_path.mkdir(parents=True, exist_ok=True)
# Initialize ChromaDB client
self.methodology_client = chromadb.PersistentClient(
path=str(self.methodology_db_path),
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Get methodology collection
collection_name = "census_methodology"
try:
self.methodology_collection = self.methodology_client.get_collection(collection_name)
logger.info(f"✅ Methodology ChromaDB loaded: {collection_name}")
except Exception:
logger.warning(f"Methodology collection not found: {collection_name}")
# Collection will be created during build process
except Exception as e:
logger.error(f"Failed to initialize methodology database: {str(e)}")
self.methodology_client = None
self.methodology_collection = None
def _detect_query_type(self, query: str) -> str:
"""
Detect whether query is about variables or methodology.
Returns:
'variables': For variable lookup queries
'methodology': For conceptual/methodology queries
'both': For ambiguous queries
"""
query_lower = query.lower()
# Variable indicators
variable_indicators = [
'variable', 'var_', 'b19013', 'b17001', 'estimate', 'margin of error',
'acs variable', 'census variable', 'table', 'dataset', 'field'
]
# Methodology indicators
methodology_indicators = [
'methodology', 'how to', 'guide', 'documentation', 'calculate',
'definition', 'meaning', 'interpret', 'statistical', 'survey design',
'sampling', 'margin of error calculation', 'confidence interval'
]
var_score = sum(1 for indicator in variable_indicators if indicator in query_lower)
method_score = sum(1 for indicator in methodology_indicators if indicator in query_lower)
if var_score > method_score:
return 'variables'
elif method_score > var_score:
return 'methodology'
else:
return 'both' # Search both and merge results
async def search_variables(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]:
"""Search variables database using FAISS."""
if not self.variables_index or not self.variables_metadata or not self.embedding_model:
logger.warning("Variables search not available (FAISS not loaded)")
return []
try:
# Generate query embedding
search_text = f"{query} {context}".strip()
query_embedding = self.embedding_model.encode([search_text], normalize_embeddings=True)
# Search FAISS index
distances, indices = self.variables_index.search(
query_embedding.astype('float32'), top_k
)
# Format results using metadata
results = []
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
if idx >= 0 and idx < len(self.variables_metadata): # Valid result
metadata = self.variables_metadata[idx]
results.append({
'metadata': metadata,
'distance': float(distance),
'score': max(0.0, 1.0 - (float(distance) / 2.0)), # Convert to similarity
'type': 'variable',
'source': 'variables_faiss',
'content': f"Variable {metadata.get('variable_id', 'Unknown')}: {metadata.get('label', 'No description')}"
})
logger.info(f"Variables search: {len(results)} results for '{query}'")
return results
except Exception as e:
logger.error(f"Variables search failed: {str(e)}")
return []
async def search_methodology(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]:
"""Search methodology database using ChromaDB."""
if not self.methodology_collection or not self.embedding_model:
logger.warning("Methodology search not available (ChromaDB not loaded)")
return []
try:
# Prepare search query
search_text = f"{query} {context}".strip()
# Generate query embedding
query_embedding = self.embedding_model.encode([search_text])
# Search ChromaDB
results = self.methodology_collection.query(
query_embeddings=query_embedding.tolist(),
n_results=top_k
)
# Format results
formatted_results = []
for i in range(len(results['documents'][0])):
formatted_results.append({
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i],
'score': max(0.0, 1.0 - results['distances'][0][i]), # Convert to similarity
'type': 'methodology',
'source': 'methodology_chromadb',
'title': results['metadatas'][0][i].get('title', 'Untitled')
})
logger.info(f"Methodology search: {len(formatted_results)} results for '{query}'")
return formatted_results
except Exception as e:
logger.error(f"Methodology search failed: {str(e)}")
return []
async def search_documentation(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]:
"""
Smart search with automatic routing between variables and methodology.
This is the main interface that maintains backward compatibility.
"""
try:
# Detect query type
query_type = self._detect_query_type(query)
if query_type == 'variables':
# Search only variables
return await self.search_variables(query, context, top_k)
elif query_type == 'methodology':
# Search only methodology
return await self.search_methodology(query, context, top_k)
else:
# Search both and merge results
var_results = await self.search_variables(query, context, top_k // 2)
method_results = await self.search_methodology(query, context, top_k // 2)
# Merge and sort by score
all_results = var_results + method_results
all_results.sort(key=lambda x: x['score'], reverse=True)
return all_results[:top_k]
except Exception as e:
logger.error(f"Documentation search failed: {str(e)}")
return []
# Backward compatibility methods
async def search(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]:
"""Backward compatible search method."""
return await self.search_documentation(query, context, top_k)
def add_document(self, title: str, content: str, metadata: Optional[Dict] = None):
"""Add document to methodology database (variables are read-only)."""
if not self.methodology_collection:
logger.warning("Cannot add document: methodology database not available")
return
try:
if not self.embedding_model:
logger.warning("Cannot add document: embedding model not available")
return
# Generate embedding
embedding = self.embedding_model.encode([content])
# Add to methodology collection
doc_id = f"doc_{hash(title + content) % 1000000}"
doc_metadata = metadata or {}
doc_metadata.update({'title': title, 'source': 'manual_addition'})
self.methodology_collection.add(
embeddings=embedding.tolist(),
documents=[content],
metadatas=[doc_metadata],
ids=[doc_id]
)
logger.info(f"Added document to methodology database: {title}")
except Exception as e:
logger.error(f"Failed to add document: {str(e)}")
# Backward compatibility alias
KnowledgeBase = DualPathKnowledgeBase