Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
consolidate_enrichments.py13.2 kB
#!/usr/bin/env python3 """ Dataset Consolidation Script - Combine COOS, Early, and Bulk Enrichments Combines all enriched datasets into complete 2023 ACS Universe with priority-based deduplication and quality tracking. """ import json import logging from pathlib import Path from typing import Dict, Any, Optional import argparse from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class EnrichmentConsolidator: """Consolidate multiple enrichment datasets with quality tracking""" def __init__(self): self.quality_hierarchy = { 'coos': {'priority': 1, 'description': 'Research-grade, domain specialist ensemble'}, 'early': {'priority': 2, 'description': 'High-quality ensemble testing'}, 'bulk': {'priority': 3, 'description': 'Single-agent production coverage'} } self.consolidated_data = {} self.processing_stats = { 'total_variables': 0, 'duplicates_resolved': 0, 'quality_distribution': { 'coos': 0, 'early': 0, 'bulk': 0 } } def load_dataset(self, file_path: str, dataset_type: str) -> Dict[str, Any]: """Load enrichment dataset and standardize format""" logger.info(f"Loading {dataset_type} dataset from {file_path}") if not Path(file_path).exists(): logger.warning(f"File not found: {file_path}") return {} try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"Loaded {len(data)} variables from {dataset_type} dataset") return data except Exception as e: logger.error(f"Error loading {file_path}: {e}") return {} def standardize_record(self, var_id: str, data: Any, source_type: str) -> Dict[str, Any]: """Standardize enrichment record format across different sources""" # Base record structure record = { 'variable_id': var_id, 'source_type': source_type, 'quality_tier': self.quality_hierarchy[source_type]['description'], 'priority': self.quality_hierarchy[source_type]['priority'], 'processing_timestamp': datetime.now().isoformat() } # Handle different input formats if isinstance(data, dict): # Standard enrichment format record.update({ 'label': data.get('label', data.get('official_label', 'Unknown')), 'concept': data.get('concept', 'Unknown'), 'table_family': var_id[:3] if len(var_id) >= 3 else 'Unknown', 'survey': data.get('survey', 'ACS'), 'complexity': data.get('complexity', 'medium') }) # Extract enrichment content enrichment_text = self._extract_enrichment_text(data) record['enrichment_text'] = enrichment_text # Preserve metadata fields for field in ['agreement_score', 'processing_cost', 'analysis', 'methodology_notes']: if field in data: record[field] = data[field] else: # Fallback for unexpected formats record.update({ 'label': 'Unknown', 'concept': 'Unknown', 'table_family': var_id[:3] if len(var_id) >= 3 else 'Unknown', 'enrichment_text': str(data) if data else '', 'survey': 'ACS', 'complexity': 'unknown' }) return record def _extract_enrichment_text(self, data: Dict) -> str: """Extract enrichment text from various possible fields""" enrichment_parts = [] # Check various enrichment fields enrichment_sources = [ data.get('enrichment', ''), data.get('analysis', {}), data.get('summary', ''), data.get('description', '') ] for source in enrichment_sources: if isinstance(source, dict): # Extract from analysis dictionary for key in ['summary', 'concept', 'statistical_method', 'limitations', 'methodology_notes']: if key in source and source[key]: enrichment_parts.append(f"{key}: {source[key]}") elif isinstance(source, str) and source.strip(): enrichment_parts.append(source.strip()) return ' | '.join(enrichment_parts) if enrichment_parts else 'No enrichment available' def consolidate_datasets(self, datasets: Dict[str, Dict]) -> Dict[str, Any]: """Consolidate multiple datasets with priority-based deduplication""" logger.info("Starting dataset consolidation...") # Process datasets in priority order (highest priority first) for source_type in sorted(self.quality_hierarchy.keys(), key=lambda x: self.quality_hierarchy[x]['priority']): if source_type not in datasets or not datasets[source_type]: logger.info(f"No data for {source_type}, skipping...") continue dataset = datasets[source_type] logger.info(f"Processing {len(dataset)} variables from {source_type} dataset") # Handle both dict and list formats if isinstance(dataset, list): # List format - each item should have variable_id for item in dataset: if isinstance(item, dict) and 'variable_id' in item: var_id = item['variable_id'] data = item else: logger.warning(f"Skipping malformed item in {source_type}: {item}") continue elif isinstance(dataset, dict): # Dict format - iterate over items for var_id, data in dataset.items(): pass # Will be handled in the loop body below else: logger.error(f"Unsupported dataset format for {source_type}: {type(dataset)}") continue # Process each variable (works for both list and dict formats) items_to_process = [] if isinstance(dataset, list): for item in dataset: if isinstance(item, dict) and 'variable_id' in item: items_to_process.append((item['variable_id'], item)) else: items_to_process = list(dataset.items()) for var_id, data in items_to_process: # Standardize the record record = self.standardize_record(var_id, data, source_type) # Handle duplicates with priority-based resolution if var_id in self.consolidated_data: existing_priority = self.consolidated_data[var_id]['priority'] new_priority = record['priority'] if new_priority < existing_priority: # Lower number = higher priority logger.debug(f"Upgrading {var_id} from {self.consolidated_data[var_id]['source_type']} to {source_type}") self.consolidated_data[var_id] = record self.processing_stats['duplicates_resolved'] += 1 else: logger.debug(f"Keeping existing {var_id} from {self.consolidated_data[var_id]['source_type']}") else: self.consolidated_data[var_id] = record # Update quality distribution self.processing_stats['quality_distribution'][source_type] += 1 self.processing_stats['total_variables'] = len(self.consolidated_data) logger.info("Consolidation complete!") return self.consolidated_data def generate_quality_report(self) -> Dict[str, Any]: """Generate quality report showing source distribution and statistics""" report = { 'consolidation_summary': self.processing_stats, 'quality_tiers': self.quality_hierarchy, 'dataset_composition': {}, 'recommendations': [] } # Calculate composition percentages total = self.processing_stats['total_variables'] for source_type, count in self.processing_stats['quality_distribution'].items(): percentage = (count / total * 100) if total > 0 else 0 report['dataset_composition'][source_type] = { 'count': count, 'percentage': round(percentage, 2), 'description': self.quality_hierarchy[source_type]['description'] } # Generate recommendations coos_percentage = report['dataset_composition']['coos']['percentage'] if coos_percentage > 25: report['recommendations'].append("High COOS coverage - excellent for research applications") if self.processing_stats['duplicates_resolved'] > 0: report['recommendations'].append(f"Resolved {self.processing_stats['duplicates_resolved']} duplicates using priority hierarchy") return report def save_consolidated_dataset(self, output_path: str, include_metadata: bool = True): """Save consolidated dataset with optional metadata""" logger.info(f"Saving consolidated dataset to {output_path}") output_data = { 'variables': self.consolidated_data, 'consolidation_metadata': { 'created_at': datetime.now().isoformat(), 'processing_stats': self.processing_stats, 'quality_hierarchy': self.quality_hierarchy, 'total_variables': len(self.consolidated_data) } if include_metadata else None } # Remove metadata if not requested if not include_metadata: output_data = self.consolidated_data with open(output_path, 'w', encoding='utf-8') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) logger.info(f"Saved {len(self.consolidated_data)} consolidated variables") def main(): parser = argparse.ArgumentParser(description='Consolidate enrichment datasets') parser.add_argument('--coos-file', type=str, default='coos_enriched_results.json', help='COOS enrichment file (highest priority)') parser.add_argument('--early-file', type=str, default='../spatial_topology_discovery/enrichment_checkpoint.json', help='Early sample enrichment file (medium priority)') parser.add_argument('--bulk-file', type=str, default='bulk_enriched_results.json', help='Bulk enrichment file (lowest priority)') parser.add_argument('--output', type=str, default='2023_ACS_Enriched_Universe.json', help='Output file for consolidated dataset') parser.add_argument('--report', type=str, default='consolidation_report.json', help='Quality report output file') parser.add_argument('--include-metadata', action='store_true', default=True, help='Include consolidation metadata in output') args = parser.parse_args() # Initialize consolidator consolidator = EnrichmentConsolidator() # Load datasets datasets = { 'coos': consolidator.load_dataset(args.coos_file, 'coos'), 'early': consolidator.load_dataset(args.early_file, 'early'), 'bulk': consolidator.load_dataset(args.bulk_file, 'bulk') } # Consolidate consolidated = consolidator.consolidate_datasets(datasets) # Generate quality report quality_report = consolidator.generate_quality_report() # Save outputs consolidator.save_consolidated_dataset(args.output, args.include_metadata) with open(args.report, 'w', encoding='utf-8') as f: json.dump(quality_report, f, indent=2, ensure_ascii=False) # Print summary print("\n" + "="*60) print("CONSOLIDATION COMPLETE") print("="*60) print(f"Total variables: {quality_report['consolidation_summary']['total_variables']}") print(f"Duplicates resolved: {quality_report['consolidation_summary']['duplicates_resolved']}") print("\nQuality Distribution:") for source, info in quality_report['dataset_composition'].items(): print(f" {source.upper()}: {info['count']} variables ({info['percentage']:.1f}%)") print(f"\nOutputs:") print(f" Consolidated dataset: {args.output}") print(f" Quality report: {args.report}") print("="*60) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server