Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
vector_db.py.backup19.1 kB
""" Vector Database and Knowledge Base for Census MCP Server Handles RAG (Retrieval-Augmented Generation) functionality using: - Sentence transformers for high-quality local embeddings (NO API key required) - ChromaDB for vector storage and similarity search - R documentation corpus processing and indexing - Context enrichment for Census queries """ import asyncio import json import logging import os import re from pathlib import Path from typing import Dict, List, Any, Optional, Tuple import hashlib # Vector DB and embedding imports try: import chromadb from chromadb.config import Settings CHROMADB_AVAILABLE = True except ImportError: CHROMADB_AVAILABLE = False logging.warning("ChromaDB not available. Install with: pip install chromadb") try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False logging.warning("SentenceTransformers not available. Install with: pip install sentence-transformers") from utils.config import get_config logger = logging.getLogger(__name__) class KnowledgeBase: """ Vector database and knowledge management for Census expertise. Provides: - Document indexing and similarity search using sentence transformers (local, no API key) - Variable context lookup and enrichment - Location parsing and geographic knowledge - R documentation corpus integration """ def __init__(self, corpus_path: Optional[Path] = None, vector_db_path: Optional[Path] = None): """ Initialize knowledge base. Args: corpus_path: Path to R documentation corpus vector_db_path: Path to vector database storage """ self.config = get_config() self.corpus_path = corpus_path or self.config.r_docs_corpus_path self.vector_db_path = vector_db_path or self.config.vector_db_path # Initialize components self._init_embedding_model() self._init_vector_db() self._init_variable_contexts() self._init_location_knowledge() # Load or build corpus index self._ensure_corpus_indexed() logger.info("Knowledge Base initialized successfully") def _init_embedding_model(self): """Initialize sentence transformer for local embeddings.""" if not SENTENCE_TRANSFORMERS_AVAILABLE: logger.error("SentenceTransformers not available. RAG functionality disabled.") self.embedding_model = None self.model_name = None return try: # Use the model specified in config, with fallback model_name = getattr(self.config, 'embedding_model', 'all-mpnet-base-v2') # Handle old OpenAI model names in config if 'text-embedding' in model_name: logger.warning(f"OpenAI model '{model_name}' detected in config, switching to sentence transformers") model_name = 'all-mpnet-base-v2' logger.info(f"Loading sentence transformer model: {model_name}") # Load sentence transformer model (downloads on first use) self.embedding_model = SentenceTransformer(model_name) self.model_name = model_name self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension() logger.info(f"✅ Sentence transformer model loaded successfully") logger.info(f" Model: {model_name}") logger.info(f" Dimensions: {self.embedding_dimension}") logger.info(f" Max sequence length: {self.embedding_model.max_seq_length}") except Exception as e: logger.error(f"Failed to load sentence transformer model: {str(e)}") self.embedding_model = None self.model_name = None def _init_vector_db(self): """Initialize ChromaDB for vector storage.""" if not CHROMADB_AVAILABLE: logger.error("ChromaDB not available. Vector search disabled.") self.vector_db = None self.collection = None return try: # Create vector DB directory self.vector_db_path.mkdir(parents=True, exist_ok=True) # Initialize ChromaDB client self.vector_db = chromadb.PersistentClient( path=str(self.vector_db_path), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Get or create collection collection_name = "census_knowledge" try: self.collection = self.vector_db.get_collection(collection_name) logger.info(f"Loaded existing collection: {collection_name}") # Check if collection metadata matches current model metadata = self.collection.metadata or {} stored_model = metadata.get('embedding_model', 'unknown') if self.model_name and stored_model != self.model_name: logger.warning(f"Vector DB was built with '{stored_model}', current model is '{self.model_name}'") logger.warning("Consider rebuilding vector DB for optimal performance") except: self.collection = self.vector_db.create_collection( name=collection_name, metadata={ "description": "Census R documentation and knowledge corpus", "embedding_model": self.model_name or "unknown", "embedding_dimension": getattr(self, 'embedding_dimension', 0) } ) logger.info(f"Created new collection: {collection_name}") except Exception as e: logger.error(f"Failed to initialize vector database: {str(e)}") self.vector_db = None self.collection = None def _init_variable_contexts(self): """Initialize variable context knowledge.""" # Enhanced variable contexts with definitions and usage notes self.variable_contexts = { 'population': { 'label': 'Total Population', 'definition': 'Total count of people residing in the area', 'variable_code': 'B01003_001', 'table': 'B01003', 'universe': 'Total population', 'notes': 'Includes all residents regardless of citizenship status' }, 'median_income': { 'label': 'Median Household Income', 'definition': 'Median income of households in the past 12 months (in inflation-adjusted dollars)', 'variable_code': 'B19013_001', 'table': 'B19013', 'universe': 'Households', 'notes': 'Inflation-adjusted to survey year dollars. Excludes group quarters population.' }, 'poverty_rate': { 'label': 'Population Below Poverty Level', 'definition': 'Number of people with income below the federal poverty threshold', 'variable_code': 'B17001_002', 'table': 'B17001', 'universe': 'Population for whom poverty status is determined', 'notes': 'Excludes people in institutions, military group quarters, and college dormitories' }, 'median_home_value': { 'label': 'Median Home Value', 'definition': 'Median value of owner-occupied housing units', 'variable_code': 'B25077_001', 'table': 'B25077', 'universe': 'Owner-occupied housing units', 'notes': 'Self-reported by homeowners. May not reflect current market values.' }, 'unemployment_rate': { 'label': 'Unemployment Rate', 'definition': 'Percentage of labor force that is unemployed', 'variable_code': 'B23025_005', 'table': 'B23025', 'universe': 'Population 16 years and over', 'notes': 'Calculated as unemployed / (employed + unemployed) * 100' } } def _init_location_knowledge(self): """Initialize geographic knowledge and patterns.""" # Common location patterns and disambiguation self.location_patterns = { 'major_cities': { 'new york': {'state': 'NY', 'type': 'place', 'full_name': 'New York city, New York'}, 'los angeles': {'state': 'CA', 'type': 'place', 'full_name': 'Los Angeles city, California'}, 'chicago': {'state': 'IL', 'type': 'place', 'full_name': 'Chicago city, Illinois'}, 'houston': {'state': 'TX', 'type': 'place', 'full_name': 'Houston city, Texas'}, 'philadelphia': {'state': 'PA', 'type': 'place', 'full_name': 'Philadelphia city, Pennsylvania'}, 'phoenix': {'state': 'AZ', 'type': 'place', 'full_name': 'Phoenix city, Arizona'}, 'san antonio': {'state': 'TX', 'type': 'place', 'full_name': 'San Antonio city, Texas'}, 'san diego': {'state': 'CA', 'type': 'place', 'full_name': 'San Diego city, California'}, 'dallas': {'state': 'TX', 'type': 'place', 'full_name': 'Dallas city, Texas'}, 'san jose': {'state': 'CA', 'type': 'place', 'full_name': 'San Jose city, California'}, 'baltimore': {'state': 'MD', 'type': 'place', 'full_name': 'Baltimore city, Maryland'}, }, 'ambiguous_names': { 'springfield': ['IL', 'MA', 'MO', 'OH'], # Multiple Springfields 'washington': ['DC', 'WA'], # Washington DC vs Washington State 'baltimore': ['city', 'county'], # Baltimore city vs Baltimore County } } def _ensure_corpus_indexed(self): """Check if corpus is already indexed in vector DB.""" if not self.embedding_model or not self.collection: logger.warning("Cannot check corpus: sentence transformer or vector DB not available") return try: # Check if corpus is already indexed count = self.collection.count() if count > 0: logger.info(f"Corpus already indexed: {count} documents") return else: logger.warning("No documents found in knowledge base. Run build-kb.py to create vector database.") except Exception as e: logger.error(f"Error checking corpus index: {str(e)}") async def get_variable_context(self, variables: List[str]) -> Dict[str, Dict[str, Any]]: """ Get enhanced context for variables using both static knowledge and RAG. Args: variables: List of variable names Returns: Dictionary mapping variables to context information """ context = {} for var in variables: var_lower = var.lower().strip() # Start with static context var_context = self.variable_contexts.get(var_lower, { 'label': var.title(), 'definition': f"Census variable: {var}", 'notes': 'Variable definition not available in local knowledge base' }) # Enhance with RAG if available if self.embedding_model and self.collection: try: rag_context = await self._search_variable_documentation(var) if rag_context: var_context.update(rag_context) except Exception as e: logger.warning(f"RAG search failed for variable {var}: {str(e)}") context[var] = var_context return context async def _search_variable_documentation(self, variable: str) -> Optional[Dict[str, Any]]: """Search documentation for variable-specific context.""" try: query = f"Census variable {variable} definition methodology" results = await self.search_documentation(query, context="variable_lookup") if results: # Extract relevant information from search results best_result = results[0] return { 'documentation': best_result.get('content', ''), 'source': best_result.get('source', ''), 'rag_enhanced': True } except Exception as e: logger.warning(f"Variable documentation search failed: {str(e)}") return None async def parse_location(self, location: str) -> Dict[str, Any]: """ Parse and enhance location information. Args: location: Location string Returns: Enhanced location information with geographic context """ location_lower = location.lower().strip() # Check major cities if location_lower in self.location_patterns['major_cities']: city_info = self.location_patterns['major_cities'][location_lower] return { 'original': location, 'normalized': city_info['full_name'], 'state': city_info['state'], 'type': city_info['type'], 'confidence': 'high' } # Check for ambiguous names for ambiguous, states in self.location_patterns['ambiguous_names'].items(): if ambiguous in location_lower: return { 'original': location, 'ambiguous': True, 'possible_states': states, 'recommendation': f"Specify state for {ambiguous} (e.g., '{ambiguous}, {states[0]}')", 'confidence': 'low' } # Default parsing return { 'original': location, 'normalized': location, 'confidence': 'medium' } async def search_documentation(self, query: str, context: str = "", top_k: Optional[int] = None) -> List[Dict[str, Any]]: """ Search documentation using vector similarity with sentence transformers. Args: query: Search query context: Additional context for the search top_k: Number of results to return Returns: List of relevant documents with metadata """ if not self.embedding_model or not self.collection: logger.warning("Cannot search: sentence transformer or vector DB not available") return [] try: # Prepare search query search_text = f"{query} {context}".strip() top_k = top_k or self.config.vector_search_top_k # Generate query embedding using sentence transformers query_embedding = self.embedding_model.encode([search_text]) # Search vector database results = self.collection.query( query_embeddings=query_embedding.tolist(), n_results=top_k ) # Format results formatted_results = [] for i in range(len(results['documents'][0])): formatted_results.append({ 'content': results['documents'][0][i], 'metadata': results['metadatas'][0][i], 'score': 1 - results['distances'][0][i], # Convert distance to similarity 'source': results['metadatas'][0][i].get('source', 'Unknown'), 'title': results['metadatas'][0][i].get('title', 'Untitled') }) # Filter by threshold threshold = self.config.vector_search_threshold filtered_results = [r for r in formatted_results if r['score'] >= threshold] logger.info(f"Documentation search: {len(filtered_results)} relevant results from {len(results['documents'][0])} total") return filtered_results except Exception as e: logger.error(f"Documentation search failed: {str(e)}") return [] async def add_document(self, title: str, content: str, metadata: Optional[Dict] = None): """Add a document to the knowledge base using sentence transformers.""" if not self.embedding_model or not self.collection: logger.warning("Cannot add document: sentence transformer or vector DB not available") return try: # Generate unique ID doc_id = hashlib.md5(f"{title}_{content[:100]}".encode()).hexdigest() # Generate embedding using sentence transformers embedding = self.embedding_model.encode([content]) # Prepare metadata doc_metadata = metadata or {} doc_metadata.update({ 'title': title, 'source': 'user_added', 'timestamp': str(asyncio.get_event_loop().time()) }) # Add to collection self.collection.add( documents=[content], metadatas=[doc_metadata], embeddings=embedding.tolist(), ids=[doc_id] ) logger.info(f"Added document: {title}") except Exception as e: logger.error(f"Error adding document: {str(e)}") def get_stats(self) -> Dict[str, Any]: """Get knowledge base statistics.""" stats = { 'embedding_model': self.model_name or 'Not available', 'embedding_dimension': getattr(self, 'embedding_dimension', 0), 'vector_db_available': self.collection is not None, 'corpus_path': str(self.corpus_path), 'variable_contexts': len(self.variable_contexts), 'location_patterns': len(self.location_patterns['major_cities']), 'dependencies': 'sentence-transformers (local, no API key required)' } if self.collection: try: stats['document_count'] = self.collection.count() metadata = self.collection.metadata or {} stats['collection_metadata'] = metadata except: stats['document_count'] = 'Unknown' return stats

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server