Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
vector_db.py.broken14.8 kB
""" FAISS-Integrated Dual-Path Vector Database for Census MCP Server Fixes timeout issues by replacing ChromaDB variable loading with instant FAISS loading. This implementation: 1. Loads variables from FAISS index (instant startup) 2. Keeps methodology in ChromaDB (conceptual search) 3. Smart routing between the two databases 4. Backward compatible interface """ import asyncio import json import logging import os import numpy as np from pathlib import Path from typing import Dict, List, Any, Optional, Tuple # Vector DB imports try: import chromadb from chromadb.config import Settings CHROMADB_AVAILABLE = True except ImportError: CHROMADB_AVAILABLE = False logging.warning("ChromaDB not available for methodology search") try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False logging.warning("FAISS not available for variable search") try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False logging.warning("SentenceTransformers not available") from utils.config import get_config logger = logging.getLogger(__name__) class DualPathKnowledgeBase: """ Dual-path knowledge base with FAISS for variables and ChromaDB for methodology. Fixes MCP timeout issues by: - Variables: FAISS index (instant loading, no lazy loading) - Methodology: ChromaDB (conceptual search quality) - Smart routing: Detects query type and routes to appropriate database """ def __init__(self, corpus_path: Optional[Path] = None, vector_db_path: Optional[Path] = None): """Initialize dual-path knowledge base.""" self.config = get_config() # Database paths self.variables_db_path = self.config.base_dir / "knowledge-base" / "variables-faiss" self.methodology_db_path = self.config.base_dir / "knowledge-base" / "methodology-db" # FAISS components (variables) self.variables_index = None self.variables_metadata = None self.embedding_model = None # ChromaDB components (methodology) self.methodology_client = None self.methodology_collection = None # Initialize components self._init_embedding_model() self._init_dual_vector_dbs() logger.info("Dual-path knowledge base initialized successfully") def _init_embedding_model(self): """Initialize sentence transformer for embeddings.""" if not SENTENCE_TRANSFORMERS_AVAILABLE: logger.error("SentenceTransformers not available. Search functionality disabled.") return try: # Use consistent model across the system model_name = "BAAI/bge-large-en-v1.5" # Match FAISS index logger.info(f"Loading embedding model: {model_name}") self.embedding_model = SentenceTransformer(model_name, trust_remote_code=True) self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension() logger.info(f"Embedding model loaded: {self.embedding_dimension}D vectors") except Exception as e: logger.error(f"Failed to load embedding model: {str(e)}") self.embedding_model = None def _init_dual_vector_dbs(self): """Initialize both FAISS (variables) and ChromaDB (methodology) databases.""" # 1. Initialize FAISS variables database self._init_variables_faiss() # 2. Initialize ChromaDB methodology database self._init_methodology_chromadb() # Log status var_count = len(self.variables_metadata) if self.variables_metadata else 0 method_count = self.methodology_collection.count() if self.methodology_collection else 0 logger.info(f"Variables database: {var_count:,} variables (FAISS)") logger.info(f"Methodology database: {method_count:,} documents (ChromaDB)") def _init_variables_faiss(self): """Initialize FAISS variables database for instant loading.""" if not FAISS_AVAILABLE: logger.warning("FAISS not available - falling back to ChromaDB for variables") return try: # Check for FAISS files faiss_index_path = self.variables_db_path / "variables.faiss" metadata_path = self.variables_db_path / "variables_metadata.json" if faiss_index_path.exists() and metadata_path.exists(): # Load FAISS index self.variables_index = faiss.read_index(str(faiss_index_path)) # Load metadata with open(metadata_path, 'r') as f: self.variables_metadata = json.load(f) logger.info(f"✅ Variables FAISS loaded: {len(self.variables_metadata):,} variables") else: logger.warning(f"FAISS files not found at {self.variables_db_path}") logger.warning("Expected: variables.faiss + variables_metadata.json") # Could fall back to ChromaDB here if needed except Exception as e: logger.error(f"Failed to load FAISS variables database: {str(e)}") self.variables_index = None self.variables_metadata = None def _init_methodology_chromadb(self): """Initialize ChromaDB methodology database.""" if not CHROMADB_AVAILABLE: logger.warning("ChromaDB not available - methodology search disabled") return try: # Create methodology DB directory self.methodology_db_path.mkdir(parents=True, exist_ok=True) # Initialize ChromaDB client self.methodology_client = chromadb.PersistentClient( path=str(self.methodology_db_path), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Get methodology collection collection_name = "census_methodology" try: self.methodology_collection = self.methodology_client.get_collection(collection_name) logger.info(f"✅ Methodology ChromaDB loaded: {collection_name}") except Exception: logger.warning(f"Methodology collection not found: {collection_name}") # Collection will be created during build process except Exception as e: logger.error(f"Failed to initialize methodology database: {str(e)}") self.methodology_client = None self.methodology_collection = None def _detect_query_type(self, query: str) -> str: """ Detect whether query is about variables or methodology. Returns: 'variables': For variable lookup queries 'methodology': For conceptual/methodology queries 'both': For ambiguous queries """ query_lower = query.lower() # Variable indicators variable_indicators = [ 'variable', 'var_', 'b19013', 'b17001', 'estimate', 'margin of error', 'acs variable', 'census variable', 'table', 'dataset', 'field' ] # Methodology indicators methodology_indicators = [ 'methodology', 'how to', 'guide', 'documentation', 'calculate', 'definition', 'meaning', 'interpret', 'statistical', 'survey design', 'sampling', 'margin of error calculation', 'confidence interval' ] var_score = sum(1 for indicator in variable_indicators if indicator in query_lower) method_score = sum(1 for indicator in methodology_indicators if indicator in query_lower) if var_score > method_score: return 'variables' elif method_score > var_score: return 'methodology' else: return 'both' # Search both and merge results async def search_variables(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]: """Search variables database using FAISS.""" if not self.variables_index or not self.variables_metadata or not self.embedding_model: logger.warning("Variables search not available (FAISS not loaded)") return [] try: # Generate query embedding search_text = f"{query} {context}".strip() query_embedding = self.embedding_model.encode([search_text], normalize_embeddings=True) # Search FAISS index distances, indices = self.variables_index.search( query_embedding.astype('float32'), top_k ) # Format results using metadata results = [] for i, (distance, idx) in enumerate(zip(distances[0], indices[0])): if idx >= 0 and idx < len(self.variables_metadata): # Valid result metadata = self.variables_metadata[idx] results.append({ 'metadata': metadata, 'distance': float(distance), 'score': max(0.0, 1.0 - (float(distance) / 2.0)), # Convert to similarity 'type': 'variable', 'source': 'variables_faiss', 'content': f"Variable {metadata.get('variable_id', 'Unknown')}: {metadata.get('label', 'No description')}" }) logger.info(f"Variables search: {len(results)} results for '{query}'") return results except Exception as e: logger.error(f"Variables search failed: {str(e)}") return [] async def search_methodology(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]: """Search methodology database using ChromaDB.""" if not self.methodology_collection or not self.embedding_model: logger.warning("Methodology search not available (ChromaDB not loaded)") return [] try: # Prepare search query search_text = f"{query} {context}".strip() # Generate query embedding query_embedding = self.embedding_model.encode([search_text]) # Search ChromaDB results = self.methodology_collection.query( query_embeddings=query_embedding.tolist(), n_results=top_k ) # Format results formatted_results = [] for i in range(len(results['documents'][0])): formatted_results.append({ 'content': results['documents'][0][i], 'metadata': results['metadatas'][0][i], 'distance': results['distances'][0][i], 'score': max(0.0, 1.0 - results['distances'][0][i]), # Convert to similarity 'type': 'methodology', 'source': 'methodology_chromadb', 'title': results['metadatas'][0][i].get('title', 'Untitled') }) logger.info(f"Methodology search: {len(formatted_results)} results for '{query}'") return formatted_results except Exception as e: logger.error(f"Methodology search failed: {str(e)}") return [] async def search_documentation(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]: """ Smart search with automatic routing between variables and methodology. This is the main interface that maintains backward compatibility. """ try: # Detect query type query_type = self._detect_query_type(query) if query_type == 'variables': # Search only variables return await self.search_variables(query, context, top_k) elif query_type == 'methodology': # Search only methodology return await self.search_methodology(query, context, top_k) else: # Search both and merge results var_results = await self.search_variables(query, context, top_k // 2) method_results = await self.search_methodology(query, context, top_k // 2) # Merge and sort by score all_results = var_results + method_results all_results.sort(key=lambda x: x['score'], reverse=True) return all_results[:top_k] except Exception as e: logger.error(f"Documentation search failed: {str(e)}") return [] # Backward compatibility methods async def search(self, query: str, context: str = "", top_k: int = 10) -> List[Dict[str, Any]]: """Backward compatible search method.""" return await self.search_documentation(query, context, top_k) def add_document(self, title: str, content: str, metadata: Optional[Dict] = None): """Add document to methodology database (variables are read-only).""" if not self.methodology_collection: logger.warning("Cannot add document: methodology database not available") return try: if not self.embedding_model: logger.warning("Cannot add document: embedding model not available") return # Generate embedding embedding = self.embedding_model.encode([content]) # Add to methodology collection doc_id = f"doc_{hash(title + content) % 1000000}" doc_metadata = metadata or {} doc_metadata.update({'title': title, 'source': 'manual_addition'}) self.methodology_collection.add( embeddings=embedding.tolist(), documents=[content], metadatas=[doc_metadata], ids=[doc_id] ) logger.info(f"Added document to methodology database: {title}") except Exception as e: logger.error(f"Failed to add document: {str(e)}") # Backward compatibility alias KnowledgeBase = DualPathKnowledgeBase

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server