Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
build-kb-concept-based.pyโ€ข96.7 kB
#!/usr/bin/env python3 """ Dual-Path Knowledge Base Builder - Concept-Based Variables Architecture Builds TWO separate vector databases optimized for different retrieval patterns: 1. VARIABLES DATABASE: 36K concept-based variables โ†’ FAISS index (fast loading) OR ChromaDB 2. METHODOLOGY DATABASE: Documentation, guides, PDFs โ†’ ChromaDB (conceptual search) Key Update: Handles canonical_variables_refactored.json with concept-based structure - Eliminates duplicate variables (65K โ†’ 36K concepts) - Survey instance awareness (ACS1/5yr as instances, not separate variables) - Rich metadata preservation for intelligent search CRITICAL FIX: Array synchronization validation and proper ID mapping to fix search issues Usage: python build-kb-concept-based.py --variables-only --output-dir variables-db --faiss python build-kb-concept-based.py --methodology-only --output-dir methodology-db python build-kb-concept-based.py --both --variables-dir variables-db --methodology-dir methodology-db --faiss """ import os import sys import numpy as np import logging import argparse import hashlib import json from pathlib import Path from typing import List, Dict, Any, Optional, Union import time import multiprocessing from concurrent.futures import ProcessPoolExecutor import re # Add bulletproofing imports from tenacity import retry, stop_after_attempt, wait_exponential try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: TIKTOKEN_AVAILABLE = False # Load environment variables from .env file try: from dotenv import load_dotenv # Look for .env in parent directory since we're in knowledge-base/ env_path = Path(__file__).parent.parent / '.env' load_dotenv(env_path) except ImportError: pass # dotenv not available, assume env vars are set # Document processing import PyPDF2 from bs4 import BeautifulSoup import markdown import pandas as pd # Vector DB and embeddings import chromadb from chromadb.config import Settings from sentence_transformers import SentenceTransformer # OpenAI for embeddings (optional) try: import openai from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False logging.warning("OpenAI not available. Install with: pip install openai") # FAISS for variables database try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False logging.warning("FAISS not available. Install with: pip install faiss-cpu") # Setup logging - only warnings and errors logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Suppress OpenAI HTTP request logging spam logging.getLogger("openai").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) # Create a separate logger for progress that bypasses the warning filter progress_logger = logging.getLogger('progress') progress_logger.setLevel(logging.INFO) progress_handler = logging.StreamHandler() progress_handler.setFormatter(logging.Formatter('%(message)s')) progress_logger.addHandler(progress_handler) progress_logger.propagate = False def get_text_hash(text: str) -> str: """Get stable hash for text caching""" return hashlib.sha1(text.encode('utf-8')).hexdigest() def count_tokens(text: str) -> int: """Count tokens using tiktoken if available""" if not TIKTOKEN_AVAILABLE: # Rough approximation: 4 chars per token return len(text) // 4 try: encoding = tiktoken.encoding_for_model("text-embedding-3-large") return len(encoding.encode(text)) except: return len(text) // 4 def truncate_text_to_tokens(text: str, max_tokens: int = 7500) -> str: """Truncate text to stay under token limit""" if not TIKTOKEN_AVAILABLE: # Rough approximation: 4 chars per token max_chars = max_tokens * 4 return text[:max_chars] if len(text) > max_chars else text try: encoding = tiktoken.encoding_for_model("text-embedding-3-large") tokens = encoding.encode(text) if len(tokens) <= max_tokens: return text # Truncate and decode back return encoding.decode(tokens[:max_tokens]) except: max_chars = max_tokens * 4 return text[:max_chars] if len(text) > max_chars else text @retry( stop=stop_after_attempt(8), wait=wait_exponential(multiplier=1, min=1, max=60) ) def create_embeddings_with_retry(client, texts: List[str]) -> List[List[float]]: """Create embeddings with bulletproof retry logic""" response = client.embeddings.create( input=texts, model="text-embedding-3-large" ) return [item.embedding for item in response.data] def show_progress(current, total, prefix="Progress", bar_length=50): """Show a clean progress bar""" percent = current / total filled_length = int(bar_length * percent) bar = 'โ–ˆ' * filled_length + 'โ–‘' * (bar_length - filled_length) print(f'\r{prefix}: [{bar}] {percent:.1%} ({current}/{total})', end='', flush=True) if current == total: print() # New line when complete def process_methodology_worker(args): """Worker function for parallel methodology processing""" worker_id, files_batch, temp_dir, use_openai = args try: # Initialize embedding model based on use_openai flag if use_openai: if not OPENAI_AVAILABLE: raise ImportError("OpenAI package required for --use-openai") api_key = os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("OPENAI_API_KEY environment variable required") openai_client = OpenAI(api_key=api_key) model = openai_client # Pass OpenAI client as "model" else: model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2", cache_folder='./model_cache') worker_results = [] files_processed = 0 chunks_created = 0 errors = 0 for file_path in files_batch: try: chunks = process_single_file(file_path, model, use_openai) worker_results.extend(chunks) files_processed += 1 chunks_created += len(chunks) except Exception as e: logger.error(f"Worker {worker_id}: Error processing {file_path}: {e}") errors += 1 # Save worker results to temporary file temp_file = temp_dir / f"worker_{worker_id}.json" with open(temp_file, 'w') as f: json.dump(worker_results, f) return {'files_processed': files_processed, 'chunks_created': chunks_created, 'errors': errors} except Exception as e: logger.error(f"Worker {worker_id} failed: {e}") return {'files_processed': 0, 'chunks_created': 0, 'errors': len(files_batch)} def process_single_file(file_path: Path, model, use_openai: bool = False) -> List[Dict]: """Process a single file and return chunks with embeddings""" try: content = extract_content(file_path) if not content.strip(): return [] chunks = create_chunks(content, file_path) # Generate embeddings for all chunks chunk_texts = [chunk['text'] for chunk in chunks] if use_openai: # Bulletproof OpenAI embeddings with caching and token limits cache_dir = Path("./embedding_cache") cache_dir.mkdir(exist_ok=True) embeddings = [] batch_size = 50 # Conservative batch size total_batches = (len(chunk_texts) + batch_size - 1) // batch_size for i in range(0, len(chunk_texts), batch_size): batch_texts = chunk_texts[i:i + batch_size] batch_embeddings = [] for text in batch_texts: # Token pre-flight check token_count = count_tokens(text) if token_count > 7500: logger.warning(f"Truncating text: {token_count} tokens -> 7500") text = truncate_text_to_tokens(text, 7500) # Check cache first text_hash = get_text_hash(text) cache_file = cache_dir / f"{text_hash}.npy" if cache_file.exists(): embedding = np.load(cache_file).tolist() batch_embeddings.append(embedding) else: # Need to embed this text try: response_embeddings = create_embeddings_with_retry(model, [text]) embedding = response_embeddings[0] batch_embeddings.append(embedding) # Cache the result np.save(cache_file, np.array(embedding)) except Exception as e: logger.error(f"Failed to embed text after retries: {e}") # Use zero embedding as fallback batch_embeddings.append([0.0] * 3072) embeddings.extend(batch_embeddings) # Show progress current_batch = i // batch_size + 1 if current_batch % 5 == 0 or current_batch == 1 or current_batch == total_batches: show_progress(current_batch, total_batches, "OpenAI Embedding") # Rate limiting time.sleep(0.5) embeddings = np.array(embeddings) else: # Generate sentence transformer embeddings embeddings = model.encode(chunk_texts, show_progress_bar=False) # Add embeddings to chunks for i, chunk in enumerate(chunks): chunk['embedding'] = embeddings[i].tolist() return chunks except Exception as e: logger.error(f"Error processing {file_path}: {e}") return [] def extract_content(file_path: Path) -> str: """Extract text content from various file types""" suffix = file_path.suffix.lower() if suffix == '.pdf': return extract_pdf_content(file_path) elif suffix in ['.md', '.txt']: return extract_text_content(file_path) elif suffix == '.html': return extract_html_content(file_path) elif suffix in ['.csv', '.xlsx']: return extract_tabular_content(file_path) else: return extract_text_content(file_path) def extract_pdf_content(file_path: Path) -> str: """Extract text from PDF files""" try: with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) text = "" for page in reader.pages: text += page.extract_text() + "\n" return text except Exception as e: logger.error(f"Error extracting PDF {file_path}: {e}") return "" def extract_text_content(file_path: Path) -> str: """Extract text from text files""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: return file.read() except Exception as e: logger.error(f"Error reading text file {file_path}: {e}") return "" def extract_html_content(file_path: Path) -> str: """Extract text from HTML files""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: soup = BeautifulSoup(file.read(), 'html.parser') return soup.get_text() except Exception as e: logger.error(f"Error extracting HTML {file_path}: {e}") return "" def extract_tabular_content(file_path: Path) -> str: """Extract summary from CSV/Excel files""" try: if file_path.suffix.lower() == '.csv': df = pd.read_csv(file_path, nrows=100) # Sample first 100 rows else: df = pd.read_excel(file_path, nrows=100) summary = f"Table: {file_path.name}\n" summary += f"Columns: {', '.join(df.columns)}\n" summary += f"Sample data:\n{df.head().to_string()}\n" return summary except Exception as e: logger.error(f"Error extracting tabular data {file_path}: {e}") return "" def extract_headings_and_sections(text: str, file_type: str) -> List[Dict]: """Extract sections based on document type""" sections = [] if file_type in ['.md', '.markdown']: # Markdown heading extraction lines = text.split('\n') current_section = {'heading': None, 'level': 0, 'content': []} for line in lines: line = line.strip() if line.startswith('#'): # Save previous section if current_section['content']: sections.append({ 'heading': current_section['heading'] or 'Introduction', 'level': current_section['level'], 'content': '\n'.join(current_section['content']).strip() }) # Start new section level = len(line) - len(line.lstrip('#')) heading = line.lstrip('# ').strip() current_section = {'heading': heading, 'level': level, 'content': []} else: if line: # Skip empty lines current_section['content'].append(line) # Add final section if current_section['content']: sections.append({ 'heading': current_section['heading'] or 'Introduction', 'level': current_section['level'], 'content': '\n'.join(current_section['content']).strip() }) elif file_type in ['.html', '.htm']: # HTML heading extraction (basic) import re # Split on h1-h6 tags parts = re.split(r'<h[1-6][^>]*>([^<]+)</h[1-6]>', text, flags=re.IGNORECASE) current_heading = None for i, part in enumerate(parts): if i % 2 == 1: # Heading content current_heading = part.strip() elif part.strip(): # Content sections.append({ 'heading': current_heading or 'Introduction', 'level': 1, 'content': part.strip() }) else: # Fallback: try to detect section patterns import re lines = text.split('\n') current_section = {'heading': None, 'content': []} for line in lines: line = line.strip() # Detect section headers (numbered, all caps, etc.) if (re.match(r'^\d+\.\s+[A-Z]', line) or # "1. SECTION" re.match(r'^[A-Z][A-Z\s]{10,} """Robust chunking with fallback for documents without paragraph breaks""" chunks = [] # Clean text content = re.sub(r'\s+', ' ', content).strip() if len(content) < 100: return chunks # Settings chunk_size = 1000 overlap_size = 200 max_safe_size = 5000 # Absolute max before we force split # First try: paragraph-based splitting (works for most documents) paragraphs = content.split('\n\n') # If we only got 1 paragraph and it's huge, we need fallback splitting if len(paragraphs) == 1 and len(paragraphs[0]) > max_safe_size: # Fallback: split by sentences first sentences = re.split(r'[.!?]+\s+', content) if len(sentences) > 1: paragraphs = sentences else: # Last resort: split by fixed character chunks paragraphs = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size-overlap_size)] current_chunk = "" chunk_num = 0 for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue # If this single paragraph is still too large, force split it if len(paragraph) > max_safe_size: # Split oversized paragraph into smaller pieces for i in range(0, len(paragraph), max_safe_size-overlap_size): piece = paragraph[i:i+max_safe_size] if len(piece.strip()) > 100: chunks.append(create_chunk_metadata( piece.strip(), file_path, chunk_num )) chunk_num += 1 continue # Normal processing test_chunk = current_chunk + " " + paragraph if current_chunk else paragraph if len(test_chunk) > chunk_size and current_chunk: # Save current chunk if len(current_chunk.strip()) > 100: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap_size:] if len(current_chunk) > overlap_size else current_chunk current_chunk = overlap_text + " " + paragraph if overlap_text else paragraph else: current_chunk = test_chunk # Add final chunk if current_chunk and len(current_chunk.strip()) > 100: # Final safety check if len(current_chunk) > max_safe_size: # Force split the final chunk for i in range(0, len(current_chunk), max_safe_size-overlap_size): piece = current_chunk[i:i+max_safe_size] if len(piece.strip()) > 100: chunks.append(create_chunk_metadata( piece.strip(), file_path, chunk_num )) chunk_num += 1 else: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) return chunks def create_chunk_metadata(text: str, file_path: Path, chunk_num: int) -> Dict: """Create metadata for a text chunk""" content_hash = hashlib.md5(text.encode()).hexdigest()[:8] file_hash = hashlib.md5(str(file_path).encode()).hexdigest()[:6] chunk_id = f"{file_path.stem}_{file_hash}_{chunk_num}_{content_hash}" return { 'id': chunk_id, 'text': text, 'metadata': { 'source': str(file_path), 'filename': file_path.name, 'chunk_number': chunk_num, 'total_chunks': 1, # Will be updated later if needed 'file_type': file_path.suffix[1:] if file_path.suffix else 'unknown' } } class ConceptBasedKnowledgeBuilder: """ Builds separated knowledge bases optimized for different retrieval patterns: - Variables DB: Entity lookup for 36K concept-based variables (no duplicates) - Methodology DB: Conceptual search for documentation Key improvements: - Handles canonical_variables_refactored.json structure - Survey instance awareness (ACS1/5yr metadata) - Rich metadata preservation for intelligent search - CRITICAL FIX: Array synchronization validation and proper ID mapping """ def __init__(self, source_dir: Path, build_mode: str, variables_dir: Path = None, methodology_dir: Path = None, test_mode: bool = False, model_name: str = "sentence-transformers/all-mpnet-base-v2", workers: int = 5, use_faiss: bool = False, use_openai: bool = False): self.source_dir = Path(source_dir) self.build_mode = build_mode # 'variables', 'methodology', or 'both' self.variables_dir = Path(variables_dir) if variables_dir else None self.methodology_dir = Path(methodology_dir) if methodology_dir else None self.test_mode = test_mode self.model_name = model_name self.workers = workers self.use_openai = use_openai # Stats tracking self.variables_stats = { 'concepts_processed': 0, 'survey_instances_processed': 0, 'files_processed': 0, 'chunks_created': 0, 'errors': 0 } self.methodology_stats = {'files_processed': 0, 'chunks_created': 0, 'errors': 0} # Initialize model self._init_model() self._init_openai_client() # Initialize databases based on build mode self._init_databases() self.use_faiss = use_faiss and (build_mode in ['variables', 'both']) # Validate FAISS availability if self.use_faiss and not FAISS_AVAILABLE: logger.error("FAISS requested but not available. Install with: pip install faiss-cpu") raise ImportError("FAISS not available") logger.info(f"๐Ÿš€ Concept-Based Knowledge Builder initialized:") logger.info(f" Build mode: {build_mode}") logger.info(f" Variables dir: {variables_dir}") logger.info(f" Variables backend: {'FAISS' if self.use_faiss else 'ChromaDB'}") logger.info(f" Methodology dir: {methodology_dir}") logger.info(f" Test mode: {test_mode}") logger.info(f" Embedding model: {'OpenAI' if use_openai else 'SentenceTransformers'}") logger.info(f" ๐ŸŽฏ CONCEPT-BASED: Handles refactored canonical variables") def _init_model(self): """Initialize embedding model with local caching""" if not self.use_openai: os.environ['HF_HUB_OFFLINE'] = '1' os.environ['TRANSFORMERS_OFFLINE'] = '1' os.environ['TOKENIZERS_PARALLELISM'] = 'false' logger.info(f"๐Ÿ”„ Loading model: {self.model_name} (768 dimensions)") self.embedding_model = SentenceTransformer(self.model_name, cache_folder='./model_cache') logger.info(f"โœ… Model cached locally in ./model_cache") else: self.embedding_model = None logger.info(f"๐Ÿ”„ Will use OpenAI embeddings API") def _init_openai_client(self): """Initialize OpenAI client if using OpenAI embeddings""" if self.use_openai: if not OPENAI_AVAILABLE: raise ImportError("OpenAI package required for --use-openai. Install with: pip install openai") # Get API key from environment api_key = os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("OPENAI_API_KEY environment variable required for --use-openai") self.openai_client = OpenAI(api_key=api_key) logger.info(f"โœ… OpenAI client initialized") else: self.openai_client = None def _generate_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using either SentenceTransformers or OpenAI""" if self.use_openai: return self._generate_openai_embeddings(texts) else: return self._generate_sentence_transformer_embeddings(texts) def _generate_openai_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using OpenAI API""" embeddings = [] batch_size = 100 # OpenAI batch limit for i in range(0, len(texts), batch_size): batch_texts = texts[i:i + batch_size] # Only log every 5th batch to reduce noise if (i//batch_size + 1) % 5 == 0 or i == 0: logger.info(f"๐Ÿง  Generating OpenAI embeddings for batch {i//batch_size + 1}/{total_batches} ({len(batch_texts)} texts)...") response = self.openai_client.embeddings.create( input=batch_texts, model="text-embedding-3-large" # Best quality OpenAI model ) batch_embeddings = [item.embedding for item in response.data] embeddings.extend(batch_embeddings) # Rate limiting time.sleep(0.1) return np.array(embeddings, dtype=np.float32) def _generate_sentence_transformer_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using SentenceTransformers""" logger.info(f"๐Ÿง  Generating SentenceTransformer embeddings for {len(texts)} texts...") return self.embedding_model.encode(texts, show_progress_bar=False, convert_to_numpy=True).astype(np.float32) def _init_databases(self): """Initialize ChromaDB collections based on build mode""" self.variables_client = None self.methodology_client = None self.variables_collection = None self.methodology_collection = None if self.build_mode in ['variables', 'both']: self.variables_dir.mkdir(parents=True, exist_ok=True) self.variables_client = chromadb.PersistentClient( path=str(self.variables_dir), settings=Settings(anonymized_telemetry=False, allow_reset=True) ) try: self.variables_collection = self.variables_client.get_collection("census_variables") except: self.variables_collection = self.variables_client.create_collection( "census_variables", metadata={"description": "Census concept-based variables for entity lookup"} ) logger.info(f"โœ… Variables ChromaDB initialized: {self.variables_dir}") if self.build_mode in ['methodology', 'both']: self.methodology_dir.mkdir(parents=True, exist_ok=True) self.methodology_client = chromadb.PersistentClient( path=str(self.methodology_dir), settings=Settings(anonymized_telemetry=False, allow_reset=True) ) try: self.methodology_collection = self.methodology_client.get_collection("census_methodology") except: self.methodology_collection = self.methodology_client.create_collection( "census_methodology", metadata={"description": "Census methodology and documentation"} ) logger.info(f"โœ… Methodology ChromaDB initialized: {self.methodology_dir}") def build_knowledge_bases(self, rebuild: bool = False): """Main orchestration method""" start_time = time.time() print(f"๐Ÿš€ Starting concept-based dual-path knowledge base build...") if self.build_mode in ['variables', 'both']: self._build_variables_database(rebuild) if self.build_mode in ['methodology', 'both']: self._build_methodology_database(rebuild) build_time = time.time() - start_time self._display_final_stats(build_time) def _build_variables_database(self, rebuild: bool = False): """Build the concept-based variables database""" canonical_path = self.source_dir / "canonical_variables_refactored.json" if not canonical_path.exists(): # Fallback to original canonical variables canonical_path = self.source_dir / "canonical_variables.json" if not canonical_path.exists(): raise FileNotFoundError(f"No canonical variables file found in {self.source_dir}") logger.info(f"๐Ÿ“ Using canonical variables: {canonical_path}") if self.use_faiss: self._build_variables_faiss(canonical_path) else: self._build_variables_chromadb(canonical_path) def _load_canonical_variables(self, canonical_path: Path) -> tuple[dict, bool]: """Load canonical variables and detect structure type""" with open(canonical_path) as f: data = json.load(f) is_refactored = self._is_refactored_structure(data) if is_refactored: logger.info("๐ŸŽฏ Detected CONCEPT-BASED structure (refactored)") # Try 'concepts' key first, fallback to root level if 'concepts' in data and data['concepts']: concepts = data['concepts'] logger.info(f"Using 'concepts' key: {len(concepts)} variables found") else: # Fallback to root level parsing concepts = {k: v for k, v in data.items() if k != 'metadata' and isinstance(v, dict)} logger.info(f"Using root-level parsing: {len(concepts)} variables found") if not concepts: raise ValueError("No concepts found in canonical data - check file structure") else: logger.info("๐Ÿ“ฆ Detected TEMPORAL structure (original)") concepts = data.get('variables', data) if not concepts: raise ValueError("No variables found in canonical data") logger.info(f"โœ… Loaded {len(concepts)} variables for processing") return concepts, is_refactored def _is_refactored_structure(self, data: dict) -> bool: """Detect if this is the new concept-based structure""" # Check for refactored structure indicators if 'metadata' in data and isinstance(data.get('metadata'), dict): metadata = data['metadata'] if metadata.get('structure_type') == 'concept_based': return True if 'consolidation_stats' in metadata: return True # Check for concept-based structure patterns sample_keys = list(data.keys())[:5] # Check first 5 keys for key in sample_keys: if key == 'metadata': continue if isinstance(data.get(key), dict): item = data[key] # Look for concept-based indicators if 'concept_name' in item or 'survey_instances' in item: return True return False def _create_concept_embedding_text(self, variable_id: str, concept_data: dict, is_refactored: bool) -> tuple[str, dict]: """Create optimized embedding text and metadata for concept""" if is_refactored: # Concept-based structure - use actual field names from source concept_name = concept_data.get('concept', variable_id) # Source uses 'concept' label = concept_data.get('label', '') # Source uses 'label' summary = concept_data.get('summary', '') # ADDED: rich summary text key_terms = concept_data.get('key_terms', []) # ADDED: key terms for search # Build rich embedding text from actual meaningful fields parts = [] # Add concept name (the main descriptive title) if concept_name and concept_name != variable_id: parts.append(concept_name) # Add label (detailed description) if label and label.lower() != concept_name.lower(): parts.append(label) # Add summary (rich explanatory text) if summary: # Use first sentence of summary to avoid too much text summary_first = summary.split('.')[0] + '.' if '.' in summary else summary parts.append(summary_first) # Add key terms for better searchability if key_terms and isinstance(key_terms, list): terms_text = ', '.join(key_terms[:5]) # Use first 5 terms parts.append(f"Key terms: {terms_text}") # Add survey instance information instances = concept_data.get('instances', []) if instances: # Just count instances, don't add all the text survey_types = set() for instance in instances: if isinstance(instance, dict) and 'survey_type' in instance: survey_types.add(instance['survey_type']) if survey_types: parts.append(f"Available in: {', '.join(sorted(survey_types))}") self.variables_stats['survey_instances_processed'] += len(instances) # Create metadata using corrected field names metadata = { 'variable_id': variable_id, 'concept_name': concept_name, 'description': label, # Use label as description 'summary': summary, # Keep full summary - DO NOT TRUNCATE 'key_terms': ', '.join(key_terms[:10]) if isinstance(key_terms, list) else str(key_terms), 'survey_instances_count': len(instances) if isinstance(instances, list) else 0, 'structure_type': 'concept_based' } else: # Original temporal structure label = concept_data.get('label', variable_id) description = concept_data.get('description', '') parts = [label] if description and description.lower() != label.lower(): parts.append(description) metadata = { 'variable_id': variable_id, 'label': label, 'description': description, 'structure_type': 'temporal_based' } # Add other available fields for field in ['universe', 'group', 'survey']: if field in concept_data: metadata[field] = concept_data[field] # Only join non-empty parts text_parts = [part for part in parts if part and part.strip()] text = " | ".join(text_parts) + "." if text_parts else f"{variable_id}." return text, metadata def _build_variables_faiss(self, canonical_path: Path): """Build concept-based variables database using FAISS index with proper synchronization""" logger.info("๐ŸŽฏ Processing canonical variables for FAISS database...") concepts, is_refactored = self._load_canonical_variables(canonical_path) logger.info(f"๐Ÿ“Š Found {len(concepts)} {'concepts' if is_refactored else 'temporal variables'}") # Process variables in batches for memory efficiency concept_items = list(concepts.items()) if self.test_mode: concept_items = concept_items[:1000] logger.info(f"๐Ÿงช Test mode: Limited to {len(concept_items)} variables") all_texts = [] all_metadata = [] all_embeddings = [] all_variable_ids = [] # Track variable IDs for mapping - CRITICAL ADDITION batch_size = 1000 total_batches = (len(concept_items) + batch_size - 1) // batch_size for batch_num, i in enumerate(range(0, len(concept_items), batch_size)): batch = concept_items[i:i + batch_size] logger.info(f"๐Ÿ”„ Processing FAISS batch {batch_num + 1}/{total_batches}") batch_texts = [] batch_metadata = [] batch_variable_ids = [] for variable_id, concept_data in batch: # Create optimized embedding text and metadata text, metadata = self._create_concept_embedding_text(variable_id, concept_data, is_refactored) batch_texts.append(text) batch_metadata.append(metadata) batch_variable_ids.append(variable_id) # Generate embeddings for batch logger.info(f"๐Ÿง  Generating embeddings for {len(batch_texts)} variables...") embeddings = self._generate_embeddings(batch_texts) # CRITICAL: Ensure arrays stay synchronized if not (len(batch_texts) == len(batch_metadata) == len(batch_variable_ids) == len(embeddings)): raise RuntimeError( f"Array synchronization error in batch {batch_num + 1}: " f"texts={len(batch_texts)}, metadata={len(batch_metadata)}, " f"ids={len(batch_variable_ids)}, embeddings={len(embeddings)}" ) all_texts.extend(batch_texts) all_metadata.extend(batch_metadata) all_embeddings.extend(embeddings) all_variable_ids.extend(batch_variable_ids) self.variables_stats['concepts_processed'] += len(batch_texts) logger.debug(f"Batch {batch_num + 1} processed: {len(batch_variable_ids)} variables") # Final synchronization validation if not (len(all_texts) == len(all_metadata) == len(all_embeddings) == len(all_variable_ids)): raise RuntimeError( f"Final array synchronization error: " f"texts={len(all_texts)}, metadata={len(all_metadata)}, " f"embeddings={len(all_embeddings)}, ids={len(all_variable_ids)}" ) logger.info(f"โœ… Array synchronization validated: {len(all_variable_ids)} variables") # Build FAISS index logger.info(f"๐Ÿ”ง Building FAISS index for {len(all_embeddings)} variables...") # Convert to numpy array embeddings_array = np.array(all_embeddings).astype('float32') # Create FAISS index (L2 distance, good for semantic similarity) dimension = embeddings_array.shape[1] index = faiss.IndexFlatL2(dimension) # Add embeddings to index index.add(embeddings_array) # Save FAISS index faiss_path = self.variables_dir / "variables.faiss" faiss.write_index(index, str(faiss_path)) logger.info(f"๐Ÿ’พ FAISS index saved: {faiss_path}") # Save metadata separately metadata_path = self.variables_dir / "variables_metadata.json" with open(metadata_path, 'w') as f: json.dump(all_metadata, f, indent=2) logger.info(f"๐Ÿ’พ Metadata saved: {metadata_path}") # Save variable ID mapping - THE CRITICAL MISSING PIECE ids_mapping = { 'variable_ids': all_variable_ids, 'total_variables': len(all_variable_ids), 'embedding_dimension': dimension, 'created_timestamp': time.time(), 'source_file': canonical_path.name, 'structure_type': 'concept_based' if is_refactored else 'temporal_based' } ids_path = self.variables_dir / "variables_ids.json" with open(ids_path, 'w') as f: json.dump(ids_mapping, f, indent=2) logger.info(f"๐Ÿ’พ Variable IDs mapping saved: {ids_path}") # Cross-validation: Verify saved files match in-memory data logger.info("๐Ÿ” Performing cross-validation...") # Reload and verify lengths with open(ids_path) as f: saved_ids_data = json.load(f) saved_ids = saved_ids_data['variable_ids'] with open(metadata_path) as f: saved_metadata = json.load(f) # Verify all arrays have same length if not (len(saved_ids) == len(saved_metadata) == index.ntotal): raise RuntimeError( f"Cross-validation failed: " f"saved_ids={len(saved_ids)}, saved_metadata={len(saved_metadata)}, " f"faiss_index={index.ntotal}" ) # Spot check: verify first and last entries match if saved_ids[0] != all_variable_ids[0]: raise RuntimeError(f"ID order mismatch: first entry {saved_ids[0]} != {all_variable_ids[0]}") if saved_ids[-1] != all_variable_ids[-1]: raise RuntimeError(f"ID order mismatch: last entry {saved_ids[-1]} != {all_variable_ids[-1]}") if saved_metadata[0]['variable_id'] != all_variable_ids[0]: raise RuntimeError(f"Metadata mismatch: first entry {saved_metadata[0]['variable_id']} != {all_variable_ids[0]}") logger.info("โœ… Cross-validation passed") # Save build info build_info = { 'model_name': self.model_name, 'embedding_dimension': dimension, 'variable_count': len(all_embeddings), 'structure_type': 'concept_based' if is_refactored else 'temporal_based', 'source_file': canonical_path.name, 'build_timestamp': time.time(), 'index_type': 'faiss_flat_l2', 'survey_instances_processed': self.variables_stats['survey_instances_processed'], 'has_id_mapping': True, 'arrays_synchronized': True, 'cross_validation_passed': True } build_info_path = self.variables_dir / "build_info.json" with open(build_info_path, 'w') as f: json.dump(build_info, f, indent=2) logger.info(f"๐Ÿ’พ Build info saved: {build_info_path}") structure_note = "concept-based variables" if is_refactored else "temporal variables" logger.info(f"โœ… FAISS variables database complete: {len(all_embeddings)} {structure_note}") logger.info(f"โœ… Synchronized arrays: IDs, metadata, embeddings all length {len(all_variable_ids)}") def _build_variables_chromadb(self, canonical_path: Path): """Build concept-based variables database using ChromaDB""" logger.info("๐ŸŽฏ Processing canonical variables for ChromaDB...") # Clear existing collection if rebuilding try: self.variables_client.delete_collection("census_variables") self.variables_collection = self.variables_client.create_collection( "census_variables", metadata={"description": "Census concept-based variables for entity lookup"} ) logger.info("๐Ÿ—‘๏ธ Cleared existing variables collection") except: pass concepts, is_refactored = self._load_canonical_variables(canonical_path) logger.info(f"๐Ÿ“Š Found {len(concepts)} {'concepts' if is_refactored else 'temporal variables'}") # Process in batches concept_items = list(concepts.items()) if self.test_mode: concept_items = concept_items[:1000] logger.info(f"๐Ÿงช Test mode: Limited to {len(concept_items)} variables") batch_size = 1000 total_batches = (len(concept_items) + batch_size - 1) // batch_size for batch_num, i in enumerate(range(0, len(concept_items), batch_size)): batch = concept_items[i:i + batch_size] logger.info(f"๐Ÿ”„ Processing ChromaDB batch {batch_num + 1}/{total_batches}") ids = [] texts = [] metadatas = [] for variable_id, concept_data in batch: text, metadata = self._create_concept_embedding_text(variable_id, concept_data, is_refactored) ids.append(variable_id) texts.append(text) metadatas.append(metadata) # Add to ChromaDB self.variables_collection.add( documents=texts, metadatas=metadatas, ids=ids ) self.variables_stats['concepts_processed'] += len(batch) structure_note = "concept-based variables" if is_refactored else "temporal variables" logger.info(f"โœ… ChromaDB variables database complete: {self.variables_stats['concepts_processed']} {structure_note}") def _build_methodology_database(self, rebuild: bool = False): """Build the methodology database from documentation files""" logger.info("๐Ÿ“š Building methodology database...") # Clear existing collection if rebuilding if rebuild: try: self.methodology_client.delete_collection("census_methodology") self.methodology_collection = self.methodology_client.create_collection( "census_methodology", metadata={"description": "Census methodology and documentation"} ) logger.info("๐Ÿ—‘๏ธ Cleared existing methodology collection") except: pass # Find documentation files doc_extensions = ['.pdf', '.md', '.txt', '.html', '.csv', '.xlsx'] doc_files = [] for ext in doc_extensions: pattern = f"**/*{ext}" files = list(self.source_dir.glob(pattern)) doc_files.extend(files) # Filter out build artifacts and focus on documentation exclude_patterns = [ 'canonical_variables.json', 'canonical_variables_refactored.json', 'failed_variables_retry.json', # ADDED: Exclude variable retry files 'acs1_raw.json', 'acs5_raw.json', 'raw_data', 'data_dumps', '__pycache__', '.git', 'node_modules', 'build', 'dist', '.DS_Store' ] doc_files = [] for f in self.source_dir.rglob('*'): if (f.is_file() and f.suffix.lower() in ['.pdf', '.md', '.txt', '.html', '.htm', '.csv', '.xlsx', '.Rmd'] and # REMOVED .json not any(exclude in str(f).lower() for exclude in exclude_patterns) and not any(part.startswith('.') for part in f.parts)): # Skip very large files that will hit OpenAI token limits if f.stat().st_size > 5 * 1024 * 1024: # Skip files > 5MB continue # Skip very large files in test mode if self.test_mode and f.stat().st_size > 1 * 1024 * 1024: continue doc_files.append(f) if not doc_files: logger.warning(f"No documentation files found in {self.source_dir}") return logger.info(f"๐Ÿ“„ Found {len(doc_files)} documentation files") # Process files in parallel if self.workers > 1: self._process_methodology_parallel(doc_files) else: self._process_methodology_sequential(doc_files) logger.info(f"โœ… Methodology database complete: {self.methodology_stats['chunks_created']} chunks from {self.methodology_stats['files_processed']} files") def _process_methodology_sequential(self, doc_files: List[Path]): """Process methodology files sequentially with clean progress""" print(f"Processing {len(doc_files)} methodology files...") for i, file_path in enumerate(doc_files): try: # Show file progress every 10 files if (i + 1) % 10 == 0 or i == 0 or i == len(doc_files) - 1: show_progress(i + 1, len(doc_files), "File Processing") chunks = process_single_file(file_path, self.embedding_model if not self.use_openai else self.openai_client, self.use_openai) if chunks: # Add to ChromaDB in batches batch_size = 500 for j in range(0, len(chunks), batch_size): batch = chunks[j:j + batch_size] texts = [c['text'] for c in batch] ids = [c['id'] for c in batch] metadatas = [c['metadata'] for c in batch] embeddings = [c['embedding'] for c in batch] self.methodology_collection.add( documents=texts, embeddings=embeddings, metadatas=metadatas, ids=ids ) self.methodology_stats['files_processed'] += 1 self.methodology_stats['chunks_created'] += len(chunks) except Exception as e: logger.error(f"Error processing {file_path}: {e}") self.methodology_stats['errors'] += 1 def _process_methodology_parallel(self, doc_files: List[Path]): """Process methodology files in parallel""" logger.info(f"๐Ÿ”„ Processing methodology files with {self.workers} workers...") # Create temporary directory for worker results temp_dir = Path("temp_methodology") temp_dir.mkdir(exist_ok=True) # Split files among workers files_per_worker = (len(doc_files) + self.workers - 1) // self.workers worker_args = [] for i in range(self.workers): start_idx = i * files_per_worker end_idx = min((i + 1) * files_per_worker, len(doc_files)) files_batch = doc_files[start_idx:end_idx] if files_batch: worker_args.append((i, files_batch, temp_dir, self.use_openai)) # Process in parallel with ProcessPoolExecutor(max_workers=self.workers) as executor: results = list(executor.map(process_methodology_worker, worker_args)) # Aggregate results for result in results: for key in self.methodology_stats: self.methodology_stats[key] += result[key] # Merge worker results into ChromaDB self._merge_methodology_results(temp_dir) # Cleanup self._cleanup_temp_files(temp_dir) def _merge_methodology_results(self, temp_dir: Path): """Merge worker results into ChromaDB""" logger.info("๐Ÿ”— Merging worker results into methodology database...") total_merged = 0 for temp_file in temp_dir.glob("worker_*.json"): with open(temp_file) as f: chunks = json.load(f) if not chunks: continue # Add to ChromaDB in batches batches = (len(chunks) + 499) // 500 for j in range(0, len(chunks), 500): batch = chunks[j:j + 500] batch_num = j // 500 + 1 texts = [c['text'] for c in batch] ids = [c['id'] for c in batch] metadatas = [c['metadata'] for c in batch] embeddings = [c['embedding'] for c in batch] self.methodology_collection.add( documents=texts, embeddings=embeddings, metadatas=metadatas, ids=ids ) total_merged += len(chunks) logger.info(f" โœ… Merged {len(chunks)} chunks from {temp_file.name}") logger.info(f"๐ŸŽ‰ Methodology merge complete: {total_merged} total chunks merged") def _cleanup_temp_files(self, temp_dir): """Clean up temporary files""" for temp_file in temp_dir.glob("worker_*.json"): temp_file.unlink() if temp_dir.exists() and not any(temp_dir.iterdir()): temp_dir.rmdir() def _display_final_stats(self, build_time): """Display simple completion message""" print(f"๐ŸŽ‰ Build complete in {build_time:.1f}s!") if self.build_mode in ['variables', 'both']: print(f" Variables: {self.variables_stats['concepts_processed']:,} concepts") if self.build_mode in ['methodology', 'both']: print(f" Methodology: {self.methodology_stats['chunks_created']:,} chunks from {self.methodology_stats['files_processed']:,} files") if self.methodology_stats['errors'] > 0: print(f" Errors: {self.methodology_stats['errors']}") def main(): parser = argparse.ArgumentParser(description='Concept-Based Dual-Path Knowledge Base Builder') parser.add_argument('--variables-only', action='store_true', help='Build only variables database') parser.add_argument('--methodology-only', action='store_true', help='Build only methodology database') parser.add_argument('--both', action='store_true', help='Build both databases') parser.add_argument('--faiss', action='store_true', help='Use FAISS for variables database (faster loading)') parser.add_argument('--rebuild', action='store_true', help='Force rebuild existing databases') parser.add_argument('--test-mode', action='store_true', help='Test with subset of data') parser.add_argument('--source-dir', default='source-docs', help='Source directory') parser.add_argument('--variables-dir', default='variables-db', help='Variables database directory') parser.add_argument('--methodology-dir', default='methodology-db', help='Methodology database directory') parser.add_argument('--model', default='sentence-transformers/all-mpnet-base-v2', help='Embedding model') parser.add_argument('--use-openai', action='store_true', help='Use OpenAI embeddings instead of SentenceTransformers') parser.add_argument('--workers', type=int, default=5, help='Number of parallel workers') args = parser.parse_args() # Determine build mode if args.variables_only: build_mode = 'variables' elif args.methodology_only: build_mode = 'methodology' elif args.both: build_mode = 'both' else: # Default to both if no mode specified build_mode = 'both' logger.info("No build mode specified, defaulting to --both") # Validate FAISS usage if args.faiss and build_mode == 'methodology': logger.warning("FAISS flag ignored - only applies to variables database") args.faiss = False builder = ConceptBasedKnowledgeBuilder( source_dir=Path(args.source_dir), build_mode=build_mode, variables_dir=Path(args.variables_dir), methodology_dir=Path(args.methodology_dir), test_mode=args.test_mode, model_name=args.model, workers=args.workers, use_faiss=args.faiss, use_openai=args.use_openai ) builder.build_knowledge_bases(rebuild=args.rebuild) logger.info("๐Ÿš€ Concept-based dual-path knowledge base build completed!") if __name__ == "__main__": main() , line) or # "ALL CAPS HEADING" re.match(r'^Chapter \d+', line, re.IGNORECASE)): # "Chapter 1" # Save previous section if current_section['content']: sections.append({ 'heading': current_section['heading'] or 'Introduction', 'level': 1, 'content': '\n'.join(current_section['content']).strip() }) # Start new section current_section = {'heading': line, 'content': []} else: if line: current_section['content'].append(line) # Add final section if current_section['content']: sections.append({ 'heading': current_section['heading'] or 'Introduction', 'level': 1, 'content': '\n'.join(current_section['content']).strip() }) # If no sections found, return entire text as one section if not sections: sections.append({ 'heading': 'Document Content', 'level': 1, 'content': text.strip() }) return sections """Robust chunking with fallback for documents without paragraph breaks""" chunks = [] # Clean text content = re.sub(r'\s+', ' ', content).strip() if len(content) < 100: return chunks # Settings chunk_size = 1000 overlap_size = 200 max_safe_size = 5000 # Absolute max before we force split # First try: paragraph-based splitting (works for most documents) paragraphs = content.split('\n\n') # If we only got 1 paragraph and it's huge, we need fallback splitting if len(paragraphs) == 1 and len(paragraphs[0]) > max_safe_size: # Fallback: split by sentences first sentences = re.split(r'[.!?]+\s+', content) if len(sentences) > 1: paragraphs = sentences else: # Last resort: split by fixed character chunks paragraphs = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size-overlap_size)] current_chunk = "" chunk_num = 0 for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue # If this single paragraph is still too large, force split it if len(paragraph) > max_safe_size: # Split oversized paragraph into smaller pieces for i in range(0, len(paragraph), max_safe_size-overlap_size): piece = paragraph[i:i+max_safe_size] if len(piece.strip()) > 100: chunks.append(create_chunk_metadata( piece.strip(), file_path, chunk_num )) chunk_num += 1 continue # Normal processing test_chunk = current_chunk + " " + paragraph if current_chunk else paragraph if len(test_chunk) > chunk_size and current_chunk: # Save current chunk if len(current_chunk.strip()) > 100: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap_size:] if len(current_chunk) > overlap_size else current_chunk current_chunk = overlap_text + " " + paragraph if overlap_text else paragraph else: current_chunk = test_chunk # Add final chunk if current_chunk and len(current_chunk.strip()) > 100: # Final safety check if len(current_chunk) > max_safe_size: # Force split the final chunk for i in range(0, len(current_chunk), max_safe_size-overlap_size): piece = current_chunk[i:i+max_safe_size] if len(piece.strip()) > 100: chunks.append(create_chunk_metadata( piece.strip(), file_path, chunk_num )) chunk_num += 1 else: chunks.append(create_chunk_metadata( current_chunk.strip(), file_path, chunk_num )) return chunks def create_chunk_metadata(text: str, file_path: Path, chunk_num: int) -> Dict: """Create metadata for a text chunk""" content_hash = hashlib.md5(text.encode()).hexdigest()[:8] file_hash = hashlib.md5(str(file_path).encode()).hexdigest()[:6] chunk_id = f"{file_path.stem}_{file_hash}_{chunk_num}_{content_hash}" return { 'id': chunk_id, 'text': text, 'metadata': { 'source': str(file_path), 'filename': file_path.name, 'chunk_number': chunk_num, 'total_chunks': 1, # Will be updated later if needed 'file_type': file_path.suffix[1:] if file_path.suffix else 'unknown' } } class ConceptBasedKnowledgeBuilder: """ Builds separated knowledge bases optimized for different retrieval patterns: - Variables DB: Entity lookup for 36K concept-based variables (no duplicates) - Methodology DB: Conceptual search for documentation Key improvements: - Handles canonical_variables_refactored.json structure - Survey instance awareness (ACS1/5yr metadata) - Rich metadata preservation for intelligent search - CRITICAL FIX: Array synchronization validation and proper ID mapping """ def __init__(self, source_dir: Path, build_mode: str, variables_dir: Path = None, methodology_dir: Path = None, test_mode: bool = False, model_name: str = "sentence-transformers/all-mpnet-base-v2", workers: int = 5, use_faiss: bool = False, use_openai: bool = False): self.source_dir = Path(source_dir) self.build_mode = build_mode # 'variables', 'methodology', or 'both' self.variables_dir = Path(variables_dir) if variables_dir else None self.methodology_dir = Path(methodology_dir) if methodology_dir else None self.test_mode = test_mode self.model_name = model_name self.workers = workers self.use_openai = use_openai # Stats tracking self.variables_stats = { 'concepts_processed': 0, 'survey_instances_processed': 0, 'files_processed': 0, 'chunks_created': 0, 'errors': 0 } self.methodology_stats = {'files_processed': 0, 'chunks_created': 0, 'errors': 0} # Initialize model self._init_model() self._init_openai_client() # Initialize databases based on build mode self._init_databases() self.use_faiss = use_faiss and (build_mode in ['variables', 'both']) # Validate FAISS availability if self.use_faiss and not FAISS_AVAILABLE: logger.error("FAISS requested but not available. Install with: pip install faiss-cpu") raise ImportError("FAISS not available") logger.info(f"๐Ÿš€ Concept-Based Knowledge Builder initialized:") logger.info(f" Build mode: {build_mode}") logger.info(f" Variables dir: {variables_dir}") logger.info(f" Variables backend: {'FAISS' if self.use_faiss else 'ChromaDB'}") logger.info(f" Methodology dir: {methodology_dir}") logger.info(f" Test mode: {test_mode}") logger.info(f" Embedding model: {'OpenAI' if use_openai else 'SentenceTransformers'}") logger.info(f" ๐ŸŽฏ CONCEPT-BASED: Handles refactored canonical variables") def _init_model(self): """Initialize embedding model with local caching""" if not self.use_openai: os.environ['HF_HUB_OFFLINE'] = '1' os.environ['TRANSFORMERS_OFFLINE'] = '1' os.environ['TOKENIZERS_PARALLELISM'] = 'false' logger.info(f"๐Ÿ”„ Loading model: {self.model_name} (768 dimensions)") self.embedding_model = SentenceTransformer(self.model_name, cache_folder='./model_cache') logger.info(f"โœ… Model cached locally in ./model_cache") else: self.embedding_model = None logger.info(f"๐Ÿ”„ Will use OpenAI embeddings API") def _init_openai_client(self): """Initialize OpenAI client if using OpenAI embeddings""" if self.use_openai: if not OPENAI_AVAILABLE: raise ImportError("OpenAI package required for --use-openai. Install with: pip install openai") # Get API key from environment api_key = os.getenv('OPENAI_API_KEY') if not api_key: raise ValueError("OPENAI_API_KEY environment variable required for --use-openai") self.openai_client = OpenAI(api_key=api_key) logger.info(f"โœ… OpenAI client initialized") else: self.openai_client = None def _generate_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using either SentenceTransformers or OpenAI""" if self.use_openai: return self._generate_openai_embeddings(texts) else: return self._generate_sentence_transformer_embeddings(texts) def _generate_openai_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using OpenAI API""" embeddings = [] batch_size = 100 # OpenAI batch limit for i in range(0, len(texts), batch_size): batch_texts = texts[i:i + batch_size] # Only log every 5th batch to reduce noise if (i//batch_size + 1) % 5 == 0 or i == 0: logger.info(f"๐Ÿง  Generating OpenAI embeddings for batch {i//batch_size + 1}/{total_batches} ({len(batch_texts)} texts)...") response = self.openai_client.embeddings.create( input=batch_texts, model="text-embedding-3-large" # Best quality OpenAI model ) batch_embeddings = [item.embedding for item in response.data] embeddings.extend(batch_embeddings) # Rate limiting time.sleep(0.1) return np.array(embeddings, dtype=np.float32) def _generate_sentence_transformer_embeddings(self, texts: List[str]) -> np.ndarray: """Generate embeddings using SentenceTransformers""" logger.info(f"๐Ÿง  Generating SentenceTransformer embeddings for {len(texts)} texts...") return self.embedding_model.encode(texts, show_progress_bar=False, convert_to_numpy=True).astype(np.float32) def _init_databases(self): """Initialize ChromaDB collections based on build mode""" self.variables_client = None self.methodology_client = None self.variables_collection = None self.methodology_collection = None if self.build_mode in ['variables', 'both']: self.variables_dir.mkdir(parents=True, exist_ok=True) self.variables_client = chromadb.PersistentClient( path=str(self.variables_dir), settings=Settings(anonymized_telemetry=False, allow_reset=True) ) try: self.variables_collection = self.variables_client.get_collection("census_variables") except: self.variables_collection = self.variables_client.create_collection( "census_variables", metadata={"description": "Census concept-based variables for entity lookup"} ) logger.info(f"โœ… Variables ChromaDB initialized: {self.variables_dir}") if self.build_mode in ['methodology', 'both']: self.methodology_dir.mkdir(parents=True, exist_ok=True) self.methodology_client = chromadb.PersistentClient( path=str(self.methodology_dir), settings=Settings(anonymized_telemetry=False, allow_reset=True) ) try: self.methodology_collection = self.methodology_client.get_collection("census_methodology") except: self.methodology_collection = self.methodology_client.create_collection( "census_methodology", metadata={"description": "Census methodology and documentation"} ) logger.info(f"โœ… Methodology ChromaDB initialized: {self.methodology_dir}") def build_knowledge_bases(self, rebuild: bool = False): """Main orchestration method""" start_time = time.time() print(f"๐Ÿš€ Starting concept-based dual-path knowledge base build...") if self.build_mode in ['variables', 'both']: self._build_variables_database(rebuild) if self.build_mode in ['methodology', 'both']: self._build_methodology_database(rebuild) build_time = time.time() - start_time self._display_final_stats(build_time) def _build_variables_database(self, rebuild: bool = False): """Build the concept-based variables database""" canonical_path = self.source_dir / "canonical_variables_refactored.json" if not canonical_path.exists(): # Fallback to original canonical variables canonical_path = self.source_dir / "canonical_variables.json" if not canonical_path.exists(): raise FileNotFoundError(f"No canonical variables file found in {self.source_dir}") logger.info(f"๐Ÿ“ Using canonical variables: {canonical_path}") if self.use_faiss: self._build_variables_faiss(canonical_path) else: self._build_variables_chromadb(canonical_path) def _load_canonical_variables(self, canonical_path: Path) -> tuple[dict, bool]: """Load canonical variables and detect structure type""" with open(canonical_path) as f: data = json.load(f) is_refactored = self._is_refactored_structure(data) if is_refactored: logger.info("๐ŸŽฏ Detected CONCEPT-BASED structure (refactored)") # Try 'concepts' key first, fallback to root level if 'concepts' in data and data['concepts']: concepts = data['concepts'] logger.info(f"Using 'concepts' key: {len(concepts)} variables found") else: # Fallback to root level parsing concepts = {k: v for k, v in data.items() if k != 'metadata' and isinstance(v, dict)} logger.info(f"Using root-level parsing: {len(concepts)} variables found") if not concepts: raise ValueError("No concepts found in canonical data - check file structure") else: logger.info("๐Ÿ“ฆ Detected TEMPORAL structure (original)") concepts = data.get('variables', data) if not concepts: raise ValueError("No variables found in canonical data") logger.info(f"โœ… Loaded {len(concepts)} variables for processing") return concepts, is_refactored def _is_refactored_structure(self, data: dict) -> bool: """Detect if this is the new concept-based structure""" # Check for refactored structure indicators if 'metadata' in data and isinstance(data.get('metadata'), dict): metadata = data['metadata'] if metadata.get('structure_type') == 'concept_based': return True if 'consolidation_stats' in metadata: return True # Check for concept-based structure patterns sample_keys = list(data.keys())[:5] # Check first 5 keys for key in sample_keys: if key == 'metadata': continue if isinstance(data.get(key), dict): item = data[key] # Look for concept-based indicators if 'concept_name' in item or 'survey_instances' in item: return True return False def _create_concept_embedding_text(self, variable_id: str, concept_data: dict, is_refactored: bool) -> tuple[str, dict]: """Create optimized embedding text and metadata for concept""" if is_refactored: # Concept-based structure - use actual field names from source concept_name = concept_data.get('concept', variable_id) # Source uses 'concept' label = concept_data.get('label', '') # Source uses 'label' summary = concept_data.get('summary', '') # ADDED: rich summary text key_terms = concept_data.get('key_terms', []) # ADDED: key terms for search # Build rich embedding text from actual meaningful fields parts = [] # Add concept name (the main descriptive title) if concept_name and concept_name != variable_id: parts.append(concept_name) # Add label (detailed description) if label and label.lower() != concept_name.lower(): parts.append(label) # Add summary (rich explanatory text) if summary: # Use first sentence of summary to avoid too much text summary_first = summary.split('.')[0] + '.' if '.' in summary else summary parts.append(summary_first) # Add key terms for better searchability if key_terms and isinstance(key_terms, list): terms_text = ', '.join(key_terms[:5]) # Use first 5 terms parts.append(f"Key terms: {terms_text}") # Add survey instance information instances = concept_data.get('instances', []) if instances: # Just count instances, don't add all the text survey_types = set() for instance in instances: if isinstance(instance, dict) and 'survey_type' in instance: survey_types.add(instance['survey_type']) if survey_types: parts.append(f"Available in: {', '.join(sorted(survey_types))}") self.variables_stats['survey_instances_processed'] += len(instances) # Create metadata using corrected field names metadata = { 'variable_id': variable_id, 'concept_name': concept_name, 'description': label, # Use label as description 'summary': summary, # Keep full summary - DO NOT TRUNCATE 'key_terms': ', '.join(key_terms[:10]) if isinstance(key_terms, list) else str(key_terms), 'survey_instances_count': len(instances) if isinstance(instances, list) else 0, 'structure_type': 'concept_based' } else: # Original temporal structure label = concept_data.get('label', variable_id) description = concept_data.get('description', '') parts = [label] if description and description.lower() != label.lower(): parts.append(description) metadata = { 'variable_id': variable_id, 'label': label, 'description': description, 'structure_type': 'temporal_based' } # Add other available fields for field in ['universe', 'group', 'survey']: if field in concept_data: metadata[field] = concept_data[field] # Only join non-empty parts text_parts = [part for part in parts if part and part.strip()] text = " | ".join(text_parts) + "." if text_parts else f"{variable_id}." return text, metadata def _build_variables_faiss(self, canonical_path: Path): """Build concept-based variables database using FAISS index with proper synchronization""" logger.info("๐ŸŽฏ Processing canonical variables for FAISS database...") concepts, is_refactored = self._load_canonical_variables(canonical_path) logger.info(f"๐Ÿ“Š Found {len(concepts)} {'concepts' if is_refactored else 'temporal variables'}") # Process variables in batches for memory efficiency concept_items = list(concepts.items()) if self.test_mode: concept_items = concept_items[:1000] logger.info(f"๐Ÿงช Test mode: Limited to {len(concept_items)} variables") all_texts = [] all_metadata = [] all_embeddings = [] all_variable_ids = [] # Track variable IDs for mapping - CRITICAL ADDITION batch_size = 1000 total_batches = (len(concept_items) + batch_size - 1) // batch_size for batch_num, i in enumerate(range(0, len(concept_items), batch_size)): batch = concept_items[i:i + batch_size] logger.info(f"๐Ÿ”„ Processing FAISS batch {batch_num + 1}/{total_batches}") batch_texts = [] batch_metadata = [] batch_variable_ids = [] for variable_id, concept_data in batch: # Create optimized embedding text and metadata text, metadata = self._create_concept_embedding_text(variable_id, concept_data, is_refactored) batch_texts.append(text) batch_metadata.append(metadata) batch_variable_ids.append(variable_id) # Generate embeddings for batch logger.info(f"๐Ÿง  Generating embeddings for {len(batch_texts)} variables...") embeddings = self._generate_embeddings(batch_texts) # CRITICAL: Ensure arrays stay synchronized if not (len(batch_texts) == len(batch_metadata) == len(batch_variable_ids) == len(embeddings)): raise RuntimeError( f"Array synchronization error in batch {batch_num + 1}: " f"texts={len(batch_texts)}, metadata={len(batch_metadata)}, " f"ids={len(batch_variable_ids)}, embeddings={len(embeddings)}" ) all_texts.extend(batch_texts) all_metadata.extend(batch_metadata) all_embeddings.extend(embeddings) all_variable_ids.extend(batch_variable_ids) self.variables_stats['concepts_processed'] += len(batch_texts) logger.debug(f"Batch {batch_num + 1} processed: {len(batch_variable_ids)} variables") # Final synchronization validation if not (len(all_texts) == len(all_metadata) == len(all_embeddings) == len(all_variable_ids)): raise RuntimeError( f"Final array synchronization error: " f"texts={len(all_texts)}, metadata={len(all_metadata)}, " f"embeddings={len(all_embeddings)}, ids={len(all_variable_ids)}" ) logger.info(f"โœ… Array synchronization validated: {len(all_variable_ids)} variables") # Build FAISS index logger.info(f"๐Ÿ”ง Building FAISS index for {len(all_embeddings)} variables...") # Convert to numpy array embeddings_array = np.array(all_embeddings).astype('float32') # Create FAISS index (L2 distance, good for semantic similarity) dimension = embeddings_array.shape[1] index = faiss.IndexFlatL2(dimension) # Add embeddings to index index.add(embeddings_array) # Save FAISS index faiss_path = self.variables_dir / "variables.faiss" faiss.write_index(index, str(faiss_path)) logger.info(f"๐Ÿ’พ FAISS index saved: {faiss_path}") # Save metadata separately metadata_path = self.variables_dir / "variables_metadata.json" with open(metadata_path, 'w') as f: json.dump(all_metadata, f, indent=2) logger.info(f"๐Ÿ’พ Metadata saved: {metadata_path}") # Save variable ID mapping - THE CRITICAL MISSING PIECE ids_mapping = { 'variable_ids': all_variable_ids, 'total_variables': len(all_variable_ids), 'embedding_dimension': dimension, 'created_timestamp': time.time(), 'source_file': canonical_path.name, 'structure_type': 'concept_based' if is_refactored else 'temporal_based' } ids_path = self.variables_dir / "variables_ids.json" with open(ids_path, 'w') as f: json.dump(ids_mapping, f, indent=2) logger.info(f"๐Ÿ’พ Variable IDs mapping saved: {ids_path}") # Cross-validation: Verify saved files match in-memory data logger.info("๐Ÿ” Performing cross-validation...") # Reload and verify lengths with open(ids_path) as f: saved_ids_data = json.load(f) saved_ids = saved_ids_data['variable_ids'] with open(metadata_path) as f: saved_metadata = json.load(f) # Verify all arrays have same length if not (len(saved_ids) == len(saved_metadata) == index.ntotal): raise RuntimeError( f"Cross-validation failed: " f"saved_ids={len(saved_ids)}, saved_metadata={len(saved_metadata)}, " f"faiss_index={index.ntotal}" ) # Spot check: verify first and last entries match if saved_ids[0] != all_variable_ids[0]: raise RuntimeError(f"ID order mismatch: first entry {saved_ids[0]} != {all_variable_ids[0]}") if saved_ids[-1] != all_variable_ids[-1]: raise RuntimeError(f"ID order mismatch: last entry {saved_ids[-1]} != {all_variable_ids[-1]}") if saved_metadata[0]['variable_id'] != all_variable_ids[0]: raise RuntimeError(f"Metadata mismatch: first entry {saved_metadata[0]['variable_id']} != {all_variable_ids[0]}") logger.info("โœ… Cross-validation passed") # Save build info build_info = { 'model_name': self.model_name, 'embedding_dimension': dimension, 'variable_count': len(all_embeddings), 'structure_type': 'concept_based' if is_refactored else 'temporal_based', 'source_file': canonical_path.name, 'build_timestamp': time.time(), 'index_type': 'faiss_flat_l2', 'survey_instances_processed': self.variables_stats['survey_instances_processed'], 'has_id_mapping': True, 'arrays_synchronized': True, 'cross_validation_passed': True } build_info_path = self.variables_dir / "build_info.json" with open(build_info_path, 'w') as f: json.dump(build_info, f, indent=2) logger.info(f"๐Ÿ’พ Build info saved: {build_info_path}") structure_note = "concept-based variables" if is_refactored else "temporal variables" logger.info(f"โœ… FAISS variables database complete: {len(all_embeddings)} {structure_note}") logger.info(f"โœ… Synchronized arrays: IDs, metadata, embeddings all length {len(all_variable_ids)}") def _build_variables_chromadb(self, canonical_path: Path): """Build concept-based variables database using ChromaDB""" logger.info("๐ŸŽฏ Processing canonical variables for ChromaDB...") # Clear existing collection if rebuilding try: self.variables_client.delete_collection("census_variables") self.variables_collection = self.variables_client.create_collection( "census_variables", metadata={"description": "Census concept-based variables for entity lookup"} ) logger.info("๐Ÿ—‘๏ธ Cleared existing variables collection") except: pass concepts, is_refactored = self._load_canonical_variables(canonical_path) logger.info(f"๐Ÿ“Š Found {len(concepts)} {'concepts' if is_refactored else 'temporal variables'}") # Process in batches concept_items = list(concepts.items()) if self.test_mode: concept_items = concept_items[:1000] logger.info(f"๐Ÿงช Test mode: Limited to {len(concept_items)} variables") batch_size = 1000 total_batches = (len(concept_items) + batch_size - 1) // batch_size for batch_num, i in enumerate(range(0, len(concept_items), batch_size)): batch = concept_items[i:i + batch_size] logger.info(f"๐Ÿ”„ Processing ChromaDB batch {batch_num + 1}/{total_batches}") ids = [] texts = [] metadatas = [] for variable_id, concept_data in batch: text, metadata = self._create_concept_embedding_text(variable_id, concept_data, is_refactored) ids.append(variable_id) texts.append(text) metadatas.append(metadata) # Add to ChromaDB self.variables_collection.add( documents=texts, metadatas=metadatas, ids=ids ) self.variables_stats['concepts_processed'] += len(batch) structure_note = "concept-based variables" if is_refactored else "temporal variables" logger.info(f"โœ… ChromaDB variables database complete: {self.variables_stats['concepts_processed']} {structure_note}") def _build_methodology_database(self, rebuild: bool = False): """Build the methodology database from documentation files""" logger.info("๐Ÿ“š Building methodology database...") # Clear existing collection if rebuilding if rebuild: try: self.methodology_client.delete_collection("census_methodology") self.methodology_collection = self.methodology_client.create_collection( "census_methodology", metadata={"description": "Census methodology and documentation"} ) logger.info("๐Ÿ—‘๏ธ Cleared existing methodology collection") except: pass # Find documentation files doc_extensions = ['.pdf', '.md', '.txt', '.html', '.csv', '.xlsx'] doc_files = [] for ext in doc_extensions: pattern = f"**/*{ext}" files = list(self.source_dir.glob(pattern)) doc_files.extend(files) # Filter out build artifacts and focus on documentation exclude_patterns = [ 'canonical_variables.json', 'canonical_variables_refactored.json', 'failed_variables_retry.json', # ADDED: Exclude variable retry files 'acs1_raw.json', 'acs5_raw.json', 'raw_data', 'data_dumps', '__pycache__', '.git', 'node_modules', 'build', 'dist', '.DS_Store' ] doc_files = [] for f in self.source_dir.rglob('*'): if (f.is_file() and f.suffix.lower() in ['.pdf', '.md', '.txt', '.html', '.htm', '.csv', '.xlsx', '.Rmd'] and # REMOVED .json not any(exclude in str(f).lower() for exclude in exclude_patterns) and not any(part.startswith('.') for part in f.parts)): # Skip very large files that will hit OpenAI token limits if f.stat().st_size > 5 * 1024 * 1024: # Skip files > 5MB continue # Skip very large files in test mode if self.test_mode and f.stat().st_size > 1 * 1024 * 1024: continue doc_files.append(f) if not doc_files: logger.warning(f"No documentation files found in {self.source_dir}") return logger.info(f"๐Ÿ“„ Found {len(doc_files)} documentation files") # Process files in parallel if self.workers > 1: self._process_methodology_parallel(doc_files) else: self._process_methodology_sequential(doc_files) logger.info(f"โœ… Methodology database complete: {self.methodology_stats['chunks_created']} chunks from {self.methodology_stats['files_processed']} files") def _process_methodology_sequential(self, doc_files: List[Path]): """Process methodology files sequentially with clean progress""" print(f"Processing {len(doc_files)} methodology files...") for i, file_path in enumerate(doc_files): try: # Show file progress every 10 files if (i + 1) % 10 == 0 or i == 0 or i == len(doc_files) - 1: show_progress(i + 1, len(doc_files), "File Processing") chunks = process_single_file(file_path, self.embedding_model if not self.use_openai else self.openai_client, self.use_openai) if chunks: # Add to ChromaDB in batches batch_size = 500 for j in range(0, len(chunks), batch_size): batch = chunks[j:j + batch_size] texts = [c['text'] for c in batch] ids = [c['id'] for c in batch] metadatas = [c['metadata'] for c in batch] embeddings = [c['embedding'] for c in batch] self.methodology_collection.add( documents=texts, embeddings=embeddings, metadatas=metadatas, ids=ids ) self.methodology_stats['files_processed'] += 1 self.methodology_stats['chunks_created'] += len(chunks) except Exception as e: logger.error(f"Error processing {file_path}: {e}") self.methodology_stats['errors'] += 1 def _process_methodology_parallel(self, doc_files: List[Path]): """Process methodology files in parallel""" logger.info(f"๐Ÿ”„ Processing methodology files with {self.workers} workers...") # Create temporary directory for worker results temp_dir = Path("temp_methodology") temp_dir.mkdir(exist_ok=True) # Split files among workers files_per_worker = (len(doc_files) + self.workers - 1) // self.workers worker_args = [] for i in range(self.workers): start_idx = i * files_per_worker end_idx = min((i + 1) * files_per_worker, len(doc_files)) files_batch = doc_files[start_idx:end_idx] if files_batch: worker_args.append((i, files_batch, temp_dir, self.use_openai)) # Process in parallel with ProcessPoolExecutor(max_workers=self.workers) as executor: results = list(executor.map(process_methodology_worker, worker_args)) # Aggregate results for result in results: for key in self.methodology_stats: self.methodology_stats[key] += result[key] # Merge worker results into ChromaDB self._merge_methodology_results(temp_dir) # Cleanup self._cleanup_temp_files(temp_dir) def _merge_methodology_results(self, temp_dir: Path): """Merge worker results into ChromaDB""" logger.info("๐Ÿ”— Merging worker results into methodology database...") total_merged = 0 for temp_file in temp_dir.glob("worker_*.json"): with open(temp_file) as f: chunks = json.load(f) if not chunks: continue # Add to ChromaDB in batches batches = (len(chunks) + 499) // 500 for j in range(0, len(chunks), 500): batch = chunks[j:j + 500] batch_num = j // 500 + 1 texts = [c['text'] for c in batch] ids = [c['id'] for c in batch] metadatas = [c['metadata'] for c in batch] embeddings = [c['embedding'] for c in batch] self.methodology_collection.add( documents=texts, embeddings=embeddings, metadatas=metadatas, ids=ids ) total_merged += len(chunks) logger.info(f" โœ… Merged {len(chunks)} chunks from {temp_file.name}") logger.info(f"๐ŸŽ‰ Methodology merge complete: {total_merged} total chunks merged") def _cleanup_temp_files(self, temp_dir): """Clean up temporary files""" for temp_file in temp_dir.glob("worker_*.json"): temp_file.unlink() if temp_dir.exists() and not any(temp_dir.iterdir()): temp_dir.rmdir() def _display_final_stats(self, build_time): """Display simple completion message""" print(f"๐ŸŽ‰ Build complete in {build_time:.1f}s!") if self.build_mode in ['variables', 'both']: print(f" Variables: {self.variables_stats['concepts_processed']:,} concepts") if self.build_mode in ['methodology', 'both']: print(f" Methodology: {self.methodology_stats['chunks_created']:,} chunks from {self.methodology_stats['files_processed']:,} files") if self.methodology_stats['errors'] > 0: print(f" Errors: {self.methodology_stats['errors']}") def main(): parser = argparse.ArgumentParser(description='Concept-Based Dual-Path Knowledge Base Builder') parser.add_argument('--variables-only', action='store_true', help='Build only variables database') parser.add_argument('--methodology-only', action='store_true', help='Build only methodology database') parser.add_argument('--both', action='store_true', help='Build both databases') parser.add_argument('--faiss', action='store_true', help='Use FAISS for variables database (faster loading)') parser.add_argument('--rebuild', action='store_true', help='Force rebuild existing databases') parser.add_argument('--test-mode', action='store_true', help='Test with subset of data') parser.add_argument('--source-dir', default='source-docs', help='Source directory') parser.add_argument('--variables-dir', default='variables-db', help='Variables database directory') parser.add_argument('--methodology-dir', default='methodology-db', help='Methodology database directory') parser.add_argument('--model', default='sentence-transformers/all-mpnet-base-v2', help='Embedding model') parser.add_argument('--use-openai', action='store_true', help='Use OpenAI embeddings instead of SentenceTransformers') parser.add_argument('--workers', type=int, default=5, help='Number of parallel workers') args = parser.parse_args() # Determine build mode if args.variables_only: build_mode = 'variables' elif args.methodology_only: build_mode = 'methodology' elif args.both: build_mode = 'both' else: # Default to both if no mode specified build_mode = 'both' logger.info("No build mode specified, defaulting to --both") # Validate FAISS usage if args.faiss and build_mode == 'methodology': logger.warning("FAISS flag ignored - only applies to variables database") args.faiss = False builder = ConceptBasedKnowledgeBuilder( source_dir=Path(args.source_dir), build_mode=build_mode, variables_dir=Path(args.variables_dir), methodology_dir=Path(args.methodology_dir), test_mode=args.test_mode, model_name=args.model, workers=args.workers, use_faiss=args.faiss, use_openai=args.use_openai ) builder.build_knowledge_bases(rebuild=args.rebuild) logger.info("๐Ÿš€ Concept-based dual-path knowledge base build completed!") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server