Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
table_keyword_generator.py28 kB
#!/usr/bin/env python3 """ Production Table Keyword Generator using GPT-4.1 Generates search-optimized keywords for ACS tables with production-grade infrastructure: - Exponential backoff and retry logic - Safe resume with checkpoint system - Parallel processing with rate limiting - Comprehensive error handling and logging - Safe file handling (never overwrites original catalog) """ import json import openai import time import os from pathlib import Path from typing import List, Dict, Optional import logging from threading import Lock from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass import argparse logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class KeywordResult: """Container for keyword generation results""" table_id: str primary_keywords: List[str] secondary_keywords: List[str] summary: str processing_cost: float processing_time: float timestamp: float error: Optional[str] = None class ProductionTableKeywordGenerator: """Production-grade table keyword generator with robust error handling""" def __init__(self, openai_api_key: str, rate_limit_rpm: int = 1000): """Initialize with API key and rate limiting""" self.openai_client = openai.OpenAI(api_key=openai_api_key) # Rate limiting self.rate_limit_rpm = rate_limit_rpm self.min_delay = 60.0 / rate_limit_rpm # seconds between calls self.last_call_time = 0 self.call_lock = Lock() # Statistics self.total_cost = 0.0 self.successful_count = 0 self.failed_count = 0 self.retry_count = 0 logger.info(f"Initialized with rate limit: {rate_limit_rpm} RPM") def deduplicate_keywords(self, keywords: List[str]) -> List[str]: """Remove near-duplicate keywords using string similarity""" if not keywords: return keywords def similarity(a: str, b: str) -> float: """Calculate simple word overlap similarity""" words_a = set(a.lower().split()) words_b = set(b.lower().split()) intersection = words_a.intersection(words_b) union = words_a.union(words_b) return len(intersection) / len(union) if union else 0 deduplicated = [] for keyword in keywords: # Check if this keyword is too similar to any existing keyword is_duplicate = False for existing in deduplicated: if similarity(keyword, existing) > 0.7: # 70% word overlap threshold # Keep the shorter, cleaner version if len(keyword) < len(existing): deduplicated.remove(existing) deduplicated.append(keyword) is_duplicate = True break if not is_duplicate: deduplicated.append(keyword) return deduplicated def _estimate_cost(self, prompt: str, response: str) -> float: """Estimate cost for GPT-4.1-mini""" # GPT-4.1-mini pricing (current rates) input_cost_per_1m = 0.150 # $0.150 per 1M input tokens output_cost_per_1m = 0.600 # $0.600 per 1M output tokens # Rough token estimation (4 characters per token) input_tokens = len(prompt) / 4 output_tokens = len(response) / 4 input_cost = (input_tokens / 1_000_000) * input_cost_per_1m output_cost = (output_tokens / 1_000_000) * output_cost_per_1m return input_cost + output_cost def _rate_limited_call(self, prompt: str, max_retries: int = 3) -> str: """Make rate-limited API call with exponential backoff""" # Rate limiting with self.call_lock: time_since_last = time.time() - self.last_call_time if time_since_last < self.min_delay: sleep_time = self.min_delay - time_since_last logger.debug(f"Rate limit: sleeping {sleep_time:.2f}s") time.sleep(sleep_time) self.last_call_time = time.time() # Retry with exponential backoff for attempt in range(max_retries): try: response = self.openai_client.chat.completions.create( model="gpt-4-1106-preview", # GPT-4.1 messages=[{"role": "user", "content": prompt}], max_tokens=500, temperature=0.1 ) response_text = response.choices[0].message.content.strip() # Track cost cost = self._estimate_cost(prompt, response_text) self.total_cost += cost return response_text except Exception as e: error_str = str(e).lower() if "rate_limit" in error_str or "429" in error_str: # Rate limit hit - exponential backoff wait_time = (2 ** attempt) * 30 # 30s, 60s, 120s logger.warning(f"Rate limit hit (attempt {attempt+1}/{max_retries}), waiting {wait_time}s...") time.sleep(wait_time) self.retry_count += 1 continue elif "503" in error_str or "502" in error_str or "server" in error_str: # Server error - retry with backoff wait_time = (2 ** attempt) * 10 # 10s, 20s, 40s logger.warning(f"Server error (attempt {attempt+1}/{max_retries}), waiting {wait_time}s...") time.sleep(wait_time) self.retry_count += 1 continue else: # Other error - don't retry logger.error(f"API error: {e}") raise e # All retries exhausted raise Exception(f"Failed after {max_retries} retries") def generate_table_keywords(self, table_id: str, title: str, universe: str, concept: str) -> KeywordResult: """Generate keywords for a single table with error handling""" start_time = time.time() # Expert-refined prompt following ACS specialist recommendations prompt = f"""Given the ACS table details below, generate 3-5 **primary keywords**, 2-4 **secondary keywords**, and a **one-sentence summary**. - Primary keywords should represent the main concepts and most likely search terms. - Secondary keywords should cover alternative phrasings and related but less central ideas. - Summary should be a single, plain-language sentence explaining what the table contains. - Avoid excessive duplication and keep each keyword distinct unless necessary for accuracy. - Use language an informed layperson would use. - Return as JSON only, no additional text. Example output: ```json {{ "primary_keywords": ["poverty rate", "below poverty line", "poverty by age"], "secondary_keywords": ["economic hardship", "low income population"], "summary": "Population data showing poverty status broken down by age and sex for people whose poverty status can be determined." }} ``` **Table Title:** "{title}" **Concept:** "{concept}" **Universe:** "{universe}" Return only the JSON object, no additional text.""" try: # Make API call with retries response_text = self._rate_limited_call(prompt) # Clean up potential markdown formatting if response_text.startswith('```json'): response_text = response_text[7:] if response_text.endswith('```'): response_text = response_text[:-3] response_text = response_text.strip() # Parse JSON response keywords_data = json.loads(response_text) # Validate structure if not isinstance(keywords_data, dict): raise ValueError("Response is not a dictionary") required_fields = ["primary_keywords", "secondary_keywords", "summary"] if not all(field in keywords_data for field in required_fields): raise ValueError(f"Missing required fields: {required_fields}") # Clean up keywords primary = [kw.strip() for kw in keywords_data["primary_keywords"] if kw.strip()] secondary = [kw.strip() for kw in keywords_data["secondary_keywords"] if kw.strip()] # Enhanced deduplication primary = self.deduplicate_keywords(primary) secondary = self.deduplicate_keywords(secondary) # Get summary summary = keywords_data.get("summary", "").strip() self.successful_count += 1 processing_time = time.time() - start_time return KeywordResult( table_id=table_id, primary_keywords=primary[:5], # Limit primary to 5 secondary_keywords=secondary[:3], # Limit secondary to 3 summary=summary, processing_cost=self.total_cost, # Running total processing_time=processing_time, timestamp=time.time() ) except json.JSONDecodeError as e: self.failed_count += 1 error_msg = f"JSON decode error: {e}" logger.error(f"{table_id}: {error_msg}") return KeywordResult( table_id=table_id, primary_keywords=[], secondary_keywords=[], summary="", processing_cost=0.0, processing_time=time.time() - start_time, timestamp=time.time(), error=error_msg ) except Exception as e: self.failed_count += 1 error_msg = f"Generation error: {str(e)}" logger.error(f"{table_id}: {error_msg}") return KeywordResult( table_id=table_id, primary_keywords=[], secondary_keywords=[], summary="", processing_cost=0.0, processing_time=time.time() - start_time, timestamp=time.time(), error=error_msg ) def load_existing_results(self, output_file: str) -> Dict[str, Dict]: """Load existing results for safe resume""" if not os.path.exists(output_file): return {} try: with open(output_file, 'r') as f: existing_data = json.load(f) logger.info(f"Loaded {len(existing_data)} existing results for resume") return existing_data except Exception as e: logger.warning(f"Could not load existing results: {e}") return {} def save_checkpoint(self, results: List[KeywordResult], output_file: str): """Save checkpoint during processing (results only, not enhanced catalog)""" try: # Convert results to simple dict format for checkpointing results_dict = {} for result in results: results_dict[result.table_id] = { "primary_keywords": result.primary_keywords, "secondary_keywords": result.secondary_keywords, "summary": result.summary, "processing_time": result.processing_time, "timestamp": result.timestamp, "error": result.error } # Atomic write temp_file = output_file + '.tmp' with open(temp_file, 'w') as f: json.dump(results_dict, f, indent=2) os.rename(temp_file, output_file) logger.info(f"Checkpoint saved: {len([r for r in results if not r.error])} successful results") except Exception as e: logger.error(f"Failed to save checkpoint: {e}") def save_enhanced_catalog(self, results: List[KeywordResult], original_catalog_file: str, output_catalog_file: str): """Save enhanced catalog with keywords without overwriting original""" try: # Load original catalog with open(original_catalog_file, 'r') as f: catalog_data = json.load(f) # Create lookup of results by table_id results_dict = {result.table_id: result for result in results if not result.error} # Enhance tables with keywords enhanced_tables = [] integrated_count = 0 for table in catalog_data['tables']: table_id = table['table_id'] # Create copy to avoid modifying original enhanced_table = table.copy() if table_id in results_dict: result = results_dict[table_id] enhanced_table['search_keywords'] = { 'primary_keywords': result.primary_keywords, 'secondary_keywords': result.secondary_keywords, 'summary': result.summary, 'metadata': { 'generated_by': 'gpt-4.1-production', 'processing_time': result.processing_time, 'timestamp': result.timestamp, 'processing_cost': result.processing_cost } } integrated_count += 1 enhanced_tables.append(enhanced_table) # Update metadata enhanced_catalog = catalog_data.copy() enhanced_catalog['tables'] = enhanced_tables enhanced_catalog['metadata'] = catalog_data['metadata'].copy() enhanced_catalog['metadata']['search_keywords_generated'] = True enhanced_catalog['metadata']['keyword_generation_timestamp'] = time.time() enhanced_catalog['metadata']['keyword_integration_count'] = integrated_count enhanced_catalog['metadata']['enhanced_catalog_version'] = 'keywords_v1.0' # Atomic write to new file temp_file = output_catalog_file + '.tmp' with open(temp_file, 'w') as f: json.dump(enhanced_catalog, f, indent=2) os.rename(temp_file, output_catalog_file) logger.info(f"✅ Enhanced catalog saved to: {output_catalog_file}") logger.info(f"✅ Integrated keywords for {integrated_count}/{len(enhanced_tables)} tables") logger.info(f"🔒 Original catalog preserved: {original_catalog_file}") return True except Exception as e: logger.error(f"Failed to save enhanced catalog: {e}") return False def process_tables_parallel(self, tables: List[Dict], output_file: str, max_workers: int = 5, checkpoint_interval: int = 50) -> List[KeywordResult]: """Process tables in parallel with checkpoints""" results = [] processed_count = 0 # Process in batches for checkpointing batch_size = checkpoint_interval for batch_start in range(0, len(tables), batch_size): batch_end = min(batch_start + batch_size, len(tables)) batch_tables = tables[batch_start:batch_end] logger.info(f"Processing batch {batch_start+1}-{batch_end} of {len(tables)}") # Process batch in parallel batch_results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tables in batch future_to_table = { executor.submit( self.generate_table_keywords, table['table_id'], table['title'], table['universe'], table['concept'] ): table for table in batch_tables } # Collect results as they complete for future in as_completed(future_to_table): table = future_to_table[future] try: result = future.result(timeout=120) # 2 minute timeout batch_results.append(result) processed_count += 1 if result.error: logger.error(f"❌ {table['table_id']}: {result.error}") else: logger.info(f"✅ {table['table_id']}: {len(result.primary_keywords)} primary, {len(result.secondary_keywords)} secondary") except Exception as e: logger.error(f"❌ {table['table_id']}: Timeout or executor error: {e}") # Create error result error_result = KeywordResult( table_id=table['table_id'], primary_keywords=[], secondary_keywords=[], summary="", processing_cost=0.0, processing_time=0.0, timestamp=time.time(), error=f"Executor error: {str(e)}" ) batch_results.append(error_result) processed_count += 1 results.extend(batch_results) # Save checkpoint self.save_checkpoint(results, output_file) # Progress report success_rate = self.successful_count / (self.successful_count + self.failed_count) * 100 if (self.successful_count + self.failed_count) > 0 else 0 avg_cost = self.total_cost / self.successful_count if self.successful_count > 0 else 0 logger.info(f"Progress: {processed_count}/{len(tables)} | " f"Success: {success_rate:.1f}% | " f"Avg cost: ${avg_cost:.4f} | " f"Total: ${self.total_cost:.2f} | " f"Retries: {self.retry_count}") return results def test_keyword_generation(): """Test keyword generation on 10 diverse sample tables""" # Get API key api_key = os.getenv('OPENAI_API_KEY') if not api_key: logger.error("OPENAI_API_KEY environment variable required") return generator = ProductionTableKeywordGenerator(api_key, rate_limit_rpm=1000) # 10 diverse test tables test_tables = [ { 'table_id': 'B16001', 'title': 'Language Spoken at Home by Ability to Speak English for the Population 5 Years and Over', 'universe': 'Population 5 years and over', 'concept': 'Language Spoken at Home' }, { 'table_id': 'B27001', 'title': 'Health Insurance Coverage Status by Sex by Age', 'universe': 'Civilian noninstitutionalized population', 'concept': 'Health Insurance Coverage' }, { 'table_id': 'B28002', 'title': 'Presence and Types of Internet Subscriptions in Household', 'universe': 'Households', 'concept': 'Internet Subscription' }, { 'table_id': 'B21001', 'title': 'Sex by Age by Veteran Status for the Civilian Population 18 Years and Over', 'universe': 'Civilian population 18 years and over', 'concept': 'Veteran Status' }, { 'table_id': 'B18101', 'title': 'Sex by Age by Disability Status', 'universe': 'Civilian noninstitutionalized population', 'concept': 'Disability Status' }, { 'table_id': 'B10051', 'title': 'Grandparents Living with Own Grandchildren Under 18 Years by Responsibility for Own Grandchildren', 'universe': 'Population 30 years and over', 'concept': 'Grandparent Caregivers' }, { 'table_id': 'B05002', 'title': 'Place of Birth by Nativity and Citizenship Status', 'universe': 'Total population', 'concept': 'Nativity and Citizenship' }, { 'table_id': 'B26001', 'title': 'Group Quarters Population by Sex by Age', 'universe': 'Population in group quarters', 'concept': 'Group Quarters' }, { 'table_id': 'B25034', 'title': 'Year Structure Built', 'universe': 'Housing units', 'concept': 'Year Structure Built' }, { 'table_id': 'B13016', 'title': 'Women 15 to 50 Years Who Had a Birth in the Past 12 Months by Marital Status and Age', 'universe': 'Women 15 to 50 years', 'concept': 'Fertility' } ] print("🧪 TESTING PRODUCTION TABLE KEYWORD GENERATION") print("=" * 70) print("Features: GPT-4.1, deduplication, summaries, error handling\n") results = [] for table in test_tables: print(f"📊 TABLE {table['table_id']}: {table['title'][:60]}...") result = generator.generate_table_keywords( table['table_id'], table['title'], table['universe'], table['concept'] ) if result.error: print(f"❌ Failed: {result.error}") else: print(f"🎯 Primary Keywords: {', '.join(result.primary_keywords)}") print(f"🔧 Secondary Keywords: {', '.join(result.secondary_keywords)}") print(f"📝 Summary: {result.summary}") print(f"💰 Cost: ${result.processing_cost:.4f}") results.append(result) print() # Final statistics print("📊 TEST COMPLETE") print(f"✅ Successful: {generator.successful_count}") print(f"❌ Failed: {generator.failed_count}") print(f"💰 Total cost: ${generator.total_cost:.4f}") print(f"🔄 Retries: {generator.retry_count}") def main(): """Main production execution""" parser = argparse.ArgumentParser(description='Production Table Keyword Generator') parser.add_argument('--catalog-file', default='table-catalog/table_catalog.json', help='Input table catalog') parser.add_argument('--output-file', default='table_keywords_results.json', help='Output results file') parser.add_argument('--enhanced-catalog-file', default='table-catalog/table_catalog_with_keywords.json', help='Output enhanced catalog file') parser.add_argument('--test-only', action='store_true', help='Run test on 10 sample tables') parser.add_argument('--max-tables', type=int, help='Limit number of tables for testing') parser.add_argument('--resume', action='store_true', default=True, help='Resume from existing results') parser.add_argument('--force-restart', action='store_true', help='Delete existing results and start fresh') parser.add_argument('--checkpoint-interval', type=int, default=50, help='Save checkpoint every N tables') parser.add_argument('--max-workers', type=int, default=5, help='Parallel processing workers') parser.add_argument('--rate-limit', type=int, default=1000, help='API requests per minute') args = parser.parse_args() # Test mode if args.test_only: test_keyword_generation() return 0 # Get API key api_key = os.getenv('OPENAI_API_KEY') if not api_key: logger.error("OPENAI_API_KEY environment variable required") return 1 # Initialize generator generator = ProductionTableKeywordGenerator(api_key, rate_limit_rpm=args.rate_limit) # Load table catalog try: with open(args.catalog_file, 'r') as f: catalog_data = json.load(f) tables = catalog_data.get('tables', []) logger.info(f"Loaded {len(tables)} tables from catalog") except Exception as e: logger.error(f"Failed to load catalog: {e}") return 1 # Handle force restart if args.force_restart and os.path.exists(args.output_file): os.remove(args.output_file) logger.warning(f"🔥 Force restart: Deleted {args.output_file}") # Load existing results for resume existing_results = generator.load_existing_results(args.output_file) if args.resume else {} # Filter out already processed tables if existing_results: original_count = len(tables) tables = [t for t in tables if t['table_id'] not in existing_results] logger.info(f"Resume: Skipping {original_count - len(tables)} already processed tables") # Limit tables if requested if args.max_tables and args.max_tables < len(tables): tables = tables[:args.max_tables] logger.info(f"Limited to {args.max_tables} tables for testing") if not tables: logger.info("🎉 All tables already processed!") return 0 # Cost estimation estimated_cost = len(tables) * 0.002 # GPT-4.1-mini estimate logger.info(f"Processing {len(tables)} tables") logger.info(f"Estimated cost: ${estimated_cost:.2f}") # Confirmation if not args.resume: response = input(f"Generate keywords for {len(tables)} tables? (estimated cost ${estimated_cost:.2f}) [y/N]: ") if response.lower() != 'y': logger.info("Cancelled") return 0 # Process tables start_time = time.time() try: results = generator.process_tables_parallel( tables, args.output_file, max_workers=args.max_workers, checkpoint_interval=args.checkpoint_interval ) # Final save and enhanced catalog creation generator.save_checkpoint(results, args.output_file) # Create enhanced catalog without overwriting original logger.info("🔧 Creating enhanced catalog with keywords...") generator.save_enhanced_catalog(results, args.catalog_file, args.enhanced_catalog_file) except KeyboardInterrupt: logger.warning("⚠️ Processing interrupted by user") except Exception as e: logger.error(f"Processing failed: {e}") return 1 # Final report end_time = time.time() processing_time = end_time - start_time logger.info("✅ PRODUCTION RUN COMPLETE") logger.info(f"✅ Successful: {generator.successful_count}") logger.info(f"❌ Failed: {generator.failed_count}") logger.info(f"💰 Total cost: ${generator.total_cost:.2f}") logger.info(f"🔄 Retries: {generator.retry_count}") logger.info(f"⏱️ Processing time: {processing_time:.1f}s") logger.info(f"📁 Results saved to: {args.output_file}") logger.info(f"📁 Enhanced catalog saved to: {args.enhanced_catalog_file}") logger.info(f"🔒 Original catalog preserved: {args.catalog_file}") logger.info("💡 Next: Update table embeddings to include keywords") return 0 if __name__ == "__main__": exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server