Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
build-kb_old-openai-embeddings-method.py31.3 kB
#!/usr/bin/env python3 """ Knowledge Base Vectorization Script Processes source documents into ChromaDB vector database using OpenAI embeddings DEPENDENCIES: pip install PyPDF2 beautifulsoup4 markdown chromadb openai tenacity SETUP: export OPENAI_API_KEY="your_openai_api_key_here" Usage: cd knowledge-base/ python build-kb.py [--rebuild] [--test-mode] Arguments: --rebuild: Force rebuild of existing vector DB --test-mode: Process only a subset of documents for testing --source-dir: Source documents directory (default: source-docs) --output-dir: Output vector database directory (default: vector-db) Examples: # Test with subset of documents cd knowledge-base/ python build-kb.py --test-mode # Full knowledge base build python build-kb.py # Force complete rebuild python build-kb.py --rebuild Cost Estimate: Expected cost: $13-20 for ~900MB corpus using text-embedding-3-large """ import os import sys import json import logging import argparse import hashlib from pathlib import Path from typing import List, Dict, Any, Optional import time import random from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # Document processing imports import PyPDF2 from bs4 import BeautifulSoup import markdown import re # Vector DB and embeddings import chromadb from chromadb.config import Settings import openai # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - KB-BUILD - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class KnowledgeBaseBuilder: """ Builds Census expertise knowledge base from source documents. Processes PDFs, HTML, markdown, and text files into a ChromaDB vector database using OpenAI embeddings for high-quality semantic search. """ def __init__(self, source_dir: Path, output_dir: Path, test_mode: bool = False): """Initialize the knowledge base builder.""" self.source_dir = Path(source_dir) self.output_dir = Path(output_dir) self.test_mode = test_mode # OpenAI client self.openai_client = openai.OpenAI() # Document processing stats self.stats = { 'files_processed': 0, 'chunks_created': 0, 'embedding_calls': 0, 'total_tokens': 0, 'errors': 0 } # Initialize ChromaDB self._init_vector_db() logger.info(f"Knowledge base builder initialized") logger.info(f"Source directory: {self.source_dir}") logger.info(f"Output directory: {self.output_dir}") logger.info(f"Test mode: {self.test_mode}") def _init_vector_db(self): """Initialize ChromaDB client and collection.""" try: # Create output directory self.output_dir.mkdir(parents=True, exist_ok=True) # Initialize ChromaDB client self.chroma_client = chromadb.PersistentClient( path=str(self.output_dir), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Create or get collection collection_name = "census_knowledge" try: self.collection = self.chroma_client.get_collection(collection_name) logger.info(f"Loaded existing collection: {collection_name}") except Exception: # ChromaDB raises various exceptions for missing collections self.collection = self.chroma_client.create_collection( name=collection_name, metadata={"description": "Census expertise knowledge base"} ) logger.info(f"Created new collection: {collection_name}") except Exception as e: logger.error(f"Failed to initialize ChromaDB: {str(e)}") raise def build_knowledge_base(self, rebuild: bool = False): """Main entry point to build the knowledge base.""" logger.info("=" * 60) logger.info("BUILDING CENSUS KNOWLEDGE BASE") logger.info("=" * 60) start_time = time.time() try: # Check if rebuild is needed if rebuild: logger.info("Rebuild requested - clearing existing collection") # Delete the collection and recreate it try: self.chroma_client.delete_collection(self.collection.name) logger.info("Deleted existing collection") except ValueError: pass # Collection doesn't exist except Exception as e: logger.error(f"Failed to delete collection: {e}") raise # Recreate the collection self.collection = self.chroma_client.create_collection( name="census_knowledge", metadata={"description": "Census expertise knowledge base"} ) logger.info("Created fresh collection for rebuild") existing_count = self.collection.count() if existing_count > 0 and not rebuild: logger.info(f"Collection already has {existing_count} documents") response = input("Continue building? (y/n): ") if response.lower() != 'y': logger.info("Build cancelled") return # Process all source documents self._process_source_documents() # Generate build manifest self._generate_build_manifest() # Display final statistics build_time = time.time() - start_time self._display_final_stats(build_time) logger.info("=" * 60) logger.info("KNOWLEDGE BASE BUILD COMPLETE") logger.info("=" * 60) except Exception as e: logger.error(f"Knowledge base build failed: {str(e)}") raise def _process_source_documents(self): """Process all source documents by category.""" # Define document categories and their priorities categories = { 'tidycensus-complete': {'priority': 1, 'max_files': 50 if self.test_mode else None}, 'tigris-complete': {'priority': 2, 'max_files': 20 if self.test_mode else None}, 'census-r-book': {'priority': 1, 'max_files': 30 if self.test_mode else None}, 'census-methodology': {'priority': 1, 'max_files': 10 if self.test_mode else None}, 'variable-definitions': {'priority': 1, 'max_files': 15 if self.test_mode else None}, 'equity-guidance': {'priority': 1, 'max_files': 15 if self.test_mode else None}, 'geographic-reference': {'priority': 2, 'max_files': 10 if self.test_mode else None}, 'data-privacy': {'priority': 2, 'max_files': 5 if self.test_mode else None}, 'training-best-practices': {'priority': 1, 'max_files': 10 if self.test_mode else None} } # Process each category for category, config in categories.items(): category_path = self.source_dir / category if category_path.exists(): logger.info(f"Processing category: {category}") self._process_category(category_path, category, config) else: logger.warning(f"Category directory not found: {category}") def _process_category(self, category_path: Path, category_name: str, config: Dict): """Process all files in a document category.""" files_processed = 0 max_files = config.get('max_files') # Get all files in category (recursively) all_files = [] for pattern in ['*.pdf', '*.html', '*.md', '*.txt', '*.Rmd']: all_files.extend(category_path.rglob(pattern)) # Sort by priority (smaller files first for testing) if self.test_mode: all_files.sort(key=lambda f: f.stat().st_size) for file_path in all_files: if max_files and files_processed >= max_files: logger.info(f"Reached max files limit for {category_name}: {max_files}") break try: # Skip hidden files and directories if any(part.startswith('.') for part in file_path.parts): continue # Skip very large files in test mode if self.test_mode and file_path.stat().st_size > 5 * 1024 * 1024: # 5MB logger.info(f"Skipping large file in test mode: {file_path.name}") continue logger.info(f"Processing: {file_path.relative_to(self.source_dir)}") self._process_document(file_path, category_name) files_processed += 1 # Rate limiting for OpenAI API time.sleep(0.1) except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") self.stats['errors'] += 1 continue logger.info(f"Completed category {category_name}: {files_processed} files processed") def _process_document(self, file_path: Path, category: str): """Process a single document into chunks and embeddings.""" try: # Extract text based on file type if file_path.suffix.lower() == '.pdf': text = self._extract_pdf_text(file_path) elif file_path.suffix.lower() in ['.html', '.htm']: text = self._extract_html_text(file_path) elif file_path.suffix.lower() in ['.md', '.rmd']: text = self._extract_markdown_text(file_path) else: # Plain text with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text = f.read() if not text or len(text.strip()) < 100: logger.warning(f"Insufficient text extracted from {file_path.name}") return # Create chunks chunks = self._create_chunks(text, file_path, category) if not chunks: logger.warning(f"No chunks created from {file_path.name}") return # Generate embeddings and store self._store_chunks(chunks) self.stats['files_processed'] += 1 logger.info(f"Processed {file_path.name}: {len(chunks)} chunks created") except Exception as e: logger.error(f"Error processing document {file_path}: {str(e)}") raise def _extract_pdf_text(self, file_path: Path) -> str: """Extract text from PDF file.""" try: text = "" with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() if page_text: # Guard against None text += f"\n--- Page {page_num + 1} ---\n{page_text}" return text except Exception as e: logger.error(f"PDF extraction failed for {file_path}: {str(e)}") return "" def _extract_html_text(self, file_path: Path) -> str: """Extract text from HTML file.""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: html_content = f.read() soup = BeautifulSoup(html_content, 'html.parser') # Remove script and style elements for script in soup(["script", "style", "nav", "header", "footer"]): script.decompose() # Extract text text = soup.get_text() # Clean up whitespace lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) return text except Exception as e: logger.error(f"HTML extraction failed for {file_path}: {str(e)}") return "" def _extract_markdown_text(self, file_path: Path) -> str: """Extract text from Markdown file.""" try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: md_content = f.read() # Convert markdown to HTML then extract text html = markdown.markdown(md_content) soup = BeautifulSoup(html, 'html.parser') return soup.get_text() except Exception as e: logger.error(f"Markdown extraction failed for {file_path}: {str(e)}") return "" def _create_chunks(self, text: str, file_path: Path, category: str) -> List[Dict[str, Any]]: """Split document into chunks for embedding with robust size handling.""" chunks = [] # Clean text text = re.sub(r'\s+', ' ', text).strip() if len(text) < 100: return chunks # Optimized chunking strategy for technical documents chunk_size = 800 # Target size max_chunk_size = 1200 # Hard limit to stay under token limits overlap = 200 # 25% overlap for context preservation # Multi-level splitting: paragraphs → sentences → words paragraphs = text.split('\n\n') current_chunk = "" chunk_num = 0 for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue # If paragraph itself is too large, split by sentences if len(paragraph) > max_chunk_size: sentences = re.split(r'[.!?]+', paragraph) for sentence in sentences: sentence = sentence.strip() if not sentence: continue # If sentence is still too large, split by words (last resort) if len(sentence) > max_chunk_size: words = sentence.split() current_sentence = "" for word in words: if len(current_sentence + " " + word) > max_chunk_size and current_sentence: # Store the current sentence chunk if len(current_sentence.strip()) > 100: chunks.append(self._create_chunk_metadata( current_sentence.strip(), file_path, category, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_words = current_sentence.split()[-20:] # Last 20 words current_sentence = " ".join(overlap_words) + " " + word else: current_sentence += " " + word if current_sentence else word # Add final sentence chunk if len(current_sentence.strip()) > 100: if len(current_chunk + " " + current_sentence) <= max_chunk_size: current_chunk += " " + current_sentence if current_chunk else current_sentence else: # Store current chunk first if len(current_chunk.strip()) > 100: chunks.append(self._create_chunk_metadata( current_chunk.strip(), file_path, category, chunk_num )) chunk_num += 1 current_chunk = current_sentence else: # Normal sentence, try to add to current chunk if len(current_chunk + " " + sentence) > chunk_size and current_chunk: # Store current chunk if len(current_chunk.strip()) > 100: chunks.append(self._create_chunk_metadata( current_chunk.strip(), file_path, category, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk current_chunk = overlap_text + " " + sentence else: current_chunk += " " + sentence if current_chunk else sentence else: # Normal paragraph processing if len(current_chunk + " " + paragraph) > chunk_size and current_chunk: # Store current chunk if len(current_chunk.strip()) > 100: chunks.append(self._create_chunk_metadata( current_chunk.strip(), file_path, category, chunk_num )) chunk_num += 1 # Start new chunk with overlap overlap_text = current_chunk[-overlap:] if len(current_chunk) > overlap else current_chunk current_chunk = overlap_text + " " + paragraph else: current_chunk += " " + paragraph if current_chunk else paragraph # Add final chunk if len(current_chunk.strip()) > 100: # Ensure final chunk isn't too large if len(current_chunk) > max_chunk_size: # Split final chunk if needed words = current_chunk.split() while len(" ".join(words)) > max_chunk_size and len(words) > 10: chunk_words = words[:len(words)//2] chunks.append(self._create_chunk_metadata( " ".join(chunk_words), file_path, category, chunk_num )) chunk_num += 1 words = words[len(words)//2:] if words: chunks.append(self._create_chunk_metadata( " ".join(words), file_path, category, chunk_num )) else: chunks.append(self._create_chunk_metadata( current_chunk.strip(), file_path, category, chunk_num )) return chunks def _create_chunk_metadata(self, text: str, file_path: Path, category: str, chunk_num: int) -> Dict[str, Any]: """Create metadata for a text chunk.""" # Generate unique ID with full hash for better traceability content_hash = hashlib.md5(text.encode()).hexdigest() chunk_id = f"{file_path.stem}_{chunk_num}_{content_hash}" return { 'id': chunk_id, 'text': text, 'metadata': { 'source_file': str(file_path.relative_to(self.source_dir)), 'category': category, 'chunk_number': chunk_num, 'file_name': file_path.name, 'file_type': file_path.suffix, 'text_length': len(text) } } @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((openai.RateLimitError, openai.APIError)) ) def _generate_embeddings_with_retry(self, texts: List[str]) -> List[List[float]]: """Generate embeddings with retry logic and rate limiting.""" try: response = self.openai_client.embeddings.create( input=texts, model="text-embedding-3-large" ) return [item.embedding for item in response.data] except openai.RateLimitError as e: logger.warning(f"Rate limit hit, backing off: {e}") raise except openai.APIError as e: logger.warning(f"API error, retrying: {e}") raise def _store_chunks(self, chunks: List[Dict[str, Any]]): """Generate embeddings and store chunks in ChromaDB.""" if not chunks: return try: # Prepare data for batch processing texts = [chunk['text'] for chunk in chunks] chunk_ids = [chunk['id'] for chunk in chunks] metadatas = [chunk['metadata'] for chunk in chunks] # Generate embeddings using OpenAI logger.info(f"Generating embeddings for {len(texts)} chunks...") embeddings = [] batch_size = 100 # OpenAI batch limit for i in range(0, len(texts), batch_size): batch_texts = texts[i:i + batch_size] # Skip any texts that are too long (fallback protection) safe_texts = [] safe_ids = [] safe_metadatas = [] for j, text in enumerate(batch_texts): if len(text) > 6000: # Conservative token limit logger.warning(f"Skipping oversized chunk: {len(text)} characters") continue safe_texts.append(text) safe_ids.append(chunk_ids[i + j]) safe_metadatas.append(metadatas[i + j]) if not safe_texts: logger.warning(f"No valid chunks in batch {i//batch_size + 1}, skipping") continue # Use retry mechanism for embeddings batch_embeddings = self._generate_embeddings_with_retry(safe_texts) embeddings.extend(batch_embeddings) # Store the safe chunks with matching IDs and metadata if batch_embeddings: self.collection.add( documents=safe_texts, embeddings=batch_embeddings, metadatas=safe_metadatas, ids=safe_ids ) # Track usage (approximate since we're using retry) self.stats['embedding_calls'] += 1 self.stats['total_tokens'] += len(' '.join(safe_texts).split()) # Rough estimate # Rate limiting time.sleep(0.5 + random.uniform(0, 0.5)) # Add jitter to avoid thundering herd # Count successful embeddings self.stats['chunks_created'] += len([e for e in embeddings if e]) logger.info(f"Stored {len([e for e in embeddings if e])} valid chunks in vector database") except Exception as e: logger.error(f"Error storing chunks: {str(e)}") raise def _generate_build_manifest(self): """Generate build manifest with statistics and metadata.""" manifest = { 'build_timestamp': time.time(), 'build_date': time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime()), 'test_mode': self.test_mode, 'stats': self.stats, 'collection_info': { 'name': self.collection.name, 'document_count': self.collection.count(), }, 'openai_model': 'text-embedding-3-large', 'source_directory': str(self.source_dir), 'output_directory': str(self.output_dir) } # Save manifest manifest_path = self.output_dir / 'build_manifest.json' with open(manifest_path, 'w') as f: json.dump(manifest, f, indent=2) logger.info(f"Build manifest saved: {manifest_path}") def _display_final_stats(self, build_time: float): """Display final build statistics.""" total_cost = (self.stats['total_tokens'] / 1000000) * 0.13 # $0.13 per 1M tokens logger.info("BUILD STATISTICS:") logger.info(f" Files processed: {self.stats['files_processed']}") logger.info(f" Chunks created: {self.stats['chunks_created']}") logger.info(f" Embedding API calls: {self.stats['embedding_calls']}") logger.info(f" Total tokens: {self.stats['total_tokens']:,}") logger.info(f" Estimated cost: ${total_cost:.4f}") logger.info(f" Build time: {build_time:.2f} seconds") logger.info(f" Errors: {self.stats['errors']}") final_count = self.collection.count() logger.info(f" Final collection size: {final_count} documents") def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Build Census Knowledge Base') parser.add_argument('--rebuild', action='store_true', help='Force rebuild of existing vector DB') parser.add_argument('--test-mode', action='store_true', help='Process only a subset of documents for testing') parser.add_argument('--source-dir', type=str, default='source-docs', help='Source documents directory') parser.add_argument('--output-dir', type=str, default='../data/vector_db', help='Output vector database directory (default: ../data/vector_db)') args = parser.parse_args() # Check for OpenAI API key if not os.getenv('OPENAI_API_KEY'): logger.error("OPENAI_API_KEY environment variable not set") sys.exit(1) # Validate source directory source_dir = Path(args.source_dir) if not source_dir.exists(): logger.error(f"Source directory not found: {source_dir}") sys.exit(1) # Build knowledge base try: builder = KnowledgeBaseBuilder( source_dir=source_dir, output_dir=Path(args.output_dir), test_mode=args.test_mode ) builder.build_knowledge_base(rebuild=args.rebuild) logger.info("Knowledge base build completed successfully!") except KeyboardInterrupt: logger.info("Build interrupted by user") sys.exit(1) except Exception as e: logger.error(f"Build failed: {str(e)}") sys.exit(1) if __name__ == "__main__": main() def test_smoke_build_pipeline(): """ Smoke test: verify document processing and chunking without OpenAI calls. Run with: python -c "from build_kb import test_smoke_build_pipeline; test_smoke_build_pipeline()" """ import tempfile # Create temporary directories with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) source_dir = temp_path / "test_source" output_dir = temp_path / "test_output" # Create test category and document test_category = source_dir / "test_category" test_category.mkdir(parents=True) test_file = test_category / "test_doc.md" test_content = """# Test Document This is a test document for the knowledge base builder. It contains multiple paragraphs to test chunking and text extraction. ## Section 1 This section has some content about testing the document processing pipeline. The chunking algorithm should split this appropriately into manageable pieces. ## Section 2 This is another section with different content for testing purposes. It should create separate chunks for better retrieval and processing. ## Section 3 Additional content to ensure we have enough text for multiple chunks. This helps verify that the chunking logic is working correctly. """ test_file.write_text(test_content) try: # Test just the processing pipeline without embeddings builder = KnowledgeBaseBuilder( source_dir=source_dir, output_dir=output_dir, test_mode=True ) # Test text extraction extracted_text = builder._extract_markdown_text(test_file) assert len(extracted_text) > 100, f"Insufficient text extracted: {len(extracted_text)} chars" # Test chunking chunks = builder._create_chunks(extracted_text, test_file, "test_category") assert len(chunks) > 0, f"No chunks created from {len(extracted_text)} chars of text" assert all(len(chunk['text']) >= 100 for chunk in chunks), "Some chunks are too small" # Test metadata creation for i, chunk in enumerate(chunks): assert 'id' in chunk, f"Chunk {i} missing ID" assert 'text' in chunk, f"Chunk {i} missing text" assert 'metadata' in chunk, f"Chunk {i} missing metadata" assert chunk['metadata']['category'] == 'test_category', f"Wrong category in chunk {i}" print("✅ Smoke test passed!") print(f" Text extracted: {len(extracted_text)} characters") print(f" Chunks created: {len(chunks)}") print(f" Average chunk size: {sum(len(c['text']) for c in chunks) // len(chunks)} chars") print(f" Sample chunk ID: {chunks[0]['id'][:50]}...") except Exception as e: print(f"❌ Smoke test failed: {e}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server