Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
llm_concept_reviewer.py27.3 kB
#!/usr/bin/env python3 import json import asyncio import argparse import logging import csv from pathlib import Path from typing import Dict, List, Any from datetime import datetime import openai import anthropic import re # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class LLMConceptReviewer: def __init__(self, openai_api_key: str, anthropic_api_key: str, mode: str = "review"): self.openai_client = openai.AsyncOpenAI(api_key=openai_api_key) self.anthropic_client = anthropic.AsyncAnthropic(api_key=anthropic_api_key) self.mode = mode # Initialize results tracking self.review_results = { 'total_reviewed': 0, 'approved': 0, 'modified': 0, 'rejected': 0, 'processing_errors': [] } # Define prompts for different modes self.prompts = { "review": """You are validating Census concept mappings. Your job is to determine if a concept is accurately mapped to the right Census tables and statistical methods. CONCEPT TO VALIDATE: - Label: {label} - Universe: {universe} - Statistical Method: {stat_method} - Mapped Census Tables: {census_tables} - Definition: {definition} VALIDATION CRITERIA: 1. ACCURACY: Does this concept mapping correctly represent what Census actually measures? 2. METHODOLOGY: Is the statistical method appropriate for this universe? 3. TABLES: Are the mapped tables correct for this concept? 4. FEASIBILITY: Can this actually be calculated from available Census data? Respond in JSON format: {{ "validation_decision": "approve|modify|reject", "reasoning": "Detailed explanation of your decision", "suggested_changes": "If modify: specific changes needed", "confidence": 0.0-1.0, "alternative_tables": ["table1", "table2"] if current mapping is wrong, "complexity_rating": "simple|moderate|complex" }}""", "negative_knowledge": """You are analyzing rejected Census concepts to create a negative knowledge taxonomy for user guidance. REJECTED CONCEPT: - Label: {label} - Universe: {universe} - Definition: {definition} - Original Rejection Reason: {reasoning} CATEGORIZE THE REJECTION TYPE: - methodologically_flawed: Universe/method combinations that don't work statistically - better_routed: Available elsewhere with higher quality (BLS, CDC, etc.) - reliability_issues: Exists but has known problems (sample sizes, margins of error) - universe_mismatch: Concept valid but not available for requested geography/population - temporal_limitations: Data exists but not at requested frequency/recency - mapping_error: Concept valid but assigned to wrong Census tables CREATE USER GUIDANCE: - What to tell users who request this concept - Why this approach is problematic - What alternative approaches to suggest - When (if ever) this concept might be appropriate Respond in JSON format: {{ "rejection_category": "methodologically_flawed|better_routed|reliability_issues|universe_mismatch|temporal_limitations|mapping_error", "severity": "hard_stop|warning|caution", "user_message": "Clear message to show users requesting this concept", "why_problematic": "Technical explanation of the issue", "alternatives": ["Alternative approach 1", "Alternative approach 2"], "appropriate_contexts": "When this concept might still be valid (if any)", "routing_suggestion": "census|bls|cdc|other|unavailable" }}""", "complex_analysis": """You are analyzing complex statistical concepts that require sophisticated implementation strategies. These concepts passed validation but need detailed implementation guidance. COMPLEX CONCEPT: - Label: {label} - Universe: {universe} - Statistical Method: {stat_method} - Census Tables: {census_tables} - Definition: {definition} - Implementation Notes: {implementation_notes} PROVIDE DETAILED ANALYSIS: 1. CALCULATION STRATEGY: Step-by-step approach for computing this measure 2. METHODOLOGICAL CHALLENGES: Potential issues during implementation 3. RELIABILITY THRESHOLDS: When results become unreliable (geography, sample size, etc.) 4. PRECISION ESTIMATES: Expected margins of error, confidence intervals 5. ALTERNATIVE APPROACHES: Simpler proxies or different methodologies 6. USE CASE GUIDANCE: When to use vs. when to avoid this measure Respond in JSON format: {{ "calculation_strategy": "Step-by-step implementation approach", "methodological_challenges": ["Challenge 1", "Challenge 2"], "reliability_thresholds": {{ "min_sample_size": 100, "min_geography": "county", "confidence_intervals": "90%|95%|99%" }}, "precision_estimates": "Expected margins of error", "alternative_approaches": ["Simpler proxy 1", "Different methodology 2"], "use_case_guidance": {{ "recommended_for": ["Use case 1", "Use case 2"], "avoid_for": ["Problematic use case 1", "Problematic use case 2"] }}, "implementation_complexity": "moderate|high|very_high" }}""" } def _load_rejected_concepts(self, ontology_file: str) -> List[Dict]: """Load rejected concepts from unified ontology""" with open(ontology_file, 'r') as f: ontology = json.load(f) rejected_concepts = ontology.get('rejected_concepts', []) logger.info(f"Loaded {len(rejected_concepts)} rejected concepts for negative knowledge analysis") return rejected_concepts def _load_complex_concepts(self, ontology_file: str) -> List[Dict]: """Load complex concepts from unified ontology""" with open(ontology_file, 'r') as f: ontology = json.load(f) # Get approved and modified concepts marked as complex complex_concepts = [] for concept in ontology.get('approved_concepts', []): if concept.get('complexity') == 'complex': complex_concepts.append(concept) for concept in ontology.get('modified_concepts', []): if concept.get('complexity') == 'complex': complex_concepts.append(concept) logger.info(f"Loaded {len(complex_concepts)} complex concepts for detailed analysis") return complex_concepts def load_concepts_from_category(self, category: str, concepts_dir: str = "../concepts") -> Dict[str, List[Dict]]: """Load all concepts (high/medium/low) for a category""" concepts_path = Path(concepts_dir) category_concepts = { 'high': [], 'medium': [], 'low': [] } # Load high confidence (auto-approved JSON) high_file = concepts_path / f"{category}.json" if high_file.exists(): try: with open(high_file, 'r') as f: data = json.load(f) category_concepts['high'] = data.get('concepts', []) except Exception as e: logger.warning(f"Error loading high confidence concepts for {category}: {e}") # Load medium confidence (review queue TXT) medium_file = concepts_path / f"{category}_review.txt" if medium_file.exists(): try: category_concepts['medium'] = self._parse_review_file(medium_file, category) except Exception as e: logger.warning(f"Error loading medium confidence concepts for {category}: {e}") # Load low confidence (CSV) low_file = concepts_path / f"{category}_low_confidence.csv" if low_file.exists(): try: category_concepts['low'] = self._parse_low_confidence_csv(low_file, category) except Exception as e: logger.warning(f"Error loading low confidence concepts for {category}: {e}") logger.info(f"Loaded {category}: {len(category_concepts['high'])} high, {len(category_concepts['medium'])} medium, {len(category_concepts['low'])} low confidence concepts") return category_concepts def _parse_review_file(self, file_path: Path, category: str) -> List[Dict]: """Parse review TXT file into concept dictionaries""" concepts = [] with open(file_path, 'r') as f: content = f.read() # Split by concept sections (assuming they're separated by double newlines or similar) concept_sections = re.split(r'\n\s*\n', content.strip()) for section in concept_sections: if not section.strip(): continue try: # Parse the section for key information lines = section.strip().split('\n') concept = { 'id': f"cendata:UnknownConcept_{len(concepts)}", 'label': 'Unknown Concept', 'universe': 'Unknown', 'stat_method': 'Unknown', 'definition': section, 'census_tables': [], 'status': 'medium_confidence', 'bucket': category } # Try to extract label from first line if lines: first_line = lines[0].strip() if first_line and not first_line.startswith('-'): concept['label'] = first_line concept['id'] = f"cendata:{first_line.replace(' ', '').replace('(', '').replace(')', '')}" concepts.append(concept) except Exception as e: logger.warning(f"Error parsing concept section: {e}") continue return concepts def _parse_low_confidence_csv(self, file_path: Path, category: str) -> List[Dict]: """Parse low confidence CSV into concept dictionaries""" concepts = [] with open(file_path, 'r') as f: reader = csv.DictReader(f) for row in reader: concept = { 'id': f"cendata:{row['label'].replace(' ', '').replace('(', '').replace(')', '')}", 'label': row['label'], 'confidence': float(row['confidence']), 'universe': row['universe'], 'stat_method': row['stat_method'], 'definition': row['definition'], 'census_tables': [], # Not provided in CSV 'status': 'low_confidence', 'bucket': category } concepts.append(concept) return concepts async def review_concept_with_llm(self, concept: Dict, use_openai: bool = True) -> Dict: """Review a single concept with LLM based on mode""" # Select prompt and prepare it based on mode if self.mode == "review": prompt = self.prompts["review"].format( label=concept.get('label', 'Unknown'), universe=concept.get('universe', 'Unknown'), stat_method=concept.get('stat_method', 'Unknown'), census_tables=concept.get('census_tables', []), definition=concept.get('definition', 'No definition provided') ) elif self.mode == "negative_knowledge": prompt = self.prompts["negative_knowledge"].format( label=concept.get('label', 'Unknown'), universe=concept.get('universe', 'Unknown'), definition=concept.get('definition', 'No definition provided'), reasoning=concept.get('reasoning', 'No reasoning provided') ) elif self.mode == "complex_analysis": prompt = self.prompts["complex_analysis"].format( label=concept.get('label', 'Unknown'), universe=concept.get('universe', 'Unknown'), stat_method=concept.get('stat_method', 'Unknown'), census_tables=concept.get('census_tables', []), definition=concept.get('definition', 'No definition provided'), implementation_notes=concept.get('implementation_notes', 'None provided') ) else: raise ValueError(f"Unknown mode: {self.mode}") try: # Call appropriate LLM if use_openai: model = "gpt-4o" response = await self.openai_client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are an expert statistical analyst with deep knowledge of US Census methodology. You understand survey design, sampling methodology, and the specific structure of American Community Survey data."}, {"role": "user", "content": prompt} ], temperature=0.1 ) review_text = response.choices[0].message.content else: # Use Claude Opus 4 for complex analysis, Sonnet for others if self.mode == "complex_analysis": model = "claude-opus-4-20250514" max_tokens = 2000 try: response = await self.anthropic_client.messages.create( model=model, max_tokens=max_tokens, system="You are an expert statistical analyst with deep knowledge of US Census methodology. You focus on the validity and soundness of statistical approaches across domains.", messages=[ {"role": "user", "content": prompt} ] ) review_text = response.content[0].text except Exception as e: if "model" in str(e).lower() or "not found" in str(e).lower(): logger.warning(f"⚠️ Claude Opus 4 not available, falling back to Sonnet 4 for complex analysis") model = "claude-sonnet-4-20250514" response = await self.anthropic_client.messages.create( model=model, max_tokens=max_tokens, system="You are an expert statistical analyst with deep knowledge of US Census methodology. You focus on the validity and soundness of statistical approaches across domains.", messages=[ {"role": "user", "content": prompt} ] ) review_text = response.content[0].text else: raise e else: model = "claude-3-5-sonnet-20241022" response = await self.anthropic_client.messages.create( model=model, max_tokens=1500, system="You are an expert statistical analyst with deep knowledge of US Census methodology. You focus on the validity and soundness of statistical approaches across domains.", messages=[ {"role": "user", "content": prompt} ] ) review_text = response.content[0].text # Parse JSON response try: # Extract JSON from response (handle markdown code blocks) json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', review_text, re.DOTALL) if json_match: review_text = json_match.group(1) review_result = json.loads(review_text) # Add metadata review_result['llm_used'] = model review_result['reviewed_at'] = datetime.now().isoformat() review_result['original_concept'] = concept review_result['mode'] = self.mode return review_result except json.JSONDecodeError as e: logger.error(f"JSON parsing error for {concept.get('label', 'unknown')}: {e}") logger.error(f"Raw response: {review_text}") return { 'validation_decision': 'ERROR', 'error': f"JSON parsing failed: {e}", 'raw_response': review_text, 'original_concept': concept } except Exception as e: logger.error(f"LLM review error for {concept.get('label', 'unknown')}: {e}") self.review_results['processing_errors'].append({ 'concept': concept.get('label', 'unknown'), 'error': str(e) }) return { 'validation_decision': 'ERROR', 'error': str(e), 'original_concept': concept } async def process_concepts(self, concepts: List[Dict]) -> Dict[str, Any]: """Process concepts based on mode""" all_reviewed_concepts = [] logger.info(f"\n🔍 Processing {len(concepts)} concepts in {self.mode} mode...") for i, concept in enumerate(concepts): # Skip high-confidence concepts in review mode only if self.mode == "review" and concept.get('confidence_level') == 'high': review_result = { 'validation_decision': 'approve', 'reasoning': 'Auto-approved (high confidence)', 'confidence': 1.0, 'complexity_rating': 'simple', 'original_concept': concept, 'reviewed_at': datetime.now().isoformat(), 'llm_used': 'auto' } else: # Alternate between OpenAI and Anthropic for review and negative_knowledge modes # Use only Anthropic for complex_analysis (Opus 4 preferred) if self.mode == "complex_analysis": use_openai = False else: use_openai = (i % 2 == 0) # Alternate for review and negative_knowledge review_result = await self.review_concept_with_llm(concept, use_openai) all_reviewed_concepts.append(review_result) # Update counters decision = review_result.get('validation_decision', 'ERROR') if decision == 'approve': self.review_results['approved'] += 1 elif decision == 'modify': self.review_results['modified'] += 1 elif decision == 'reject': self.review_results['rejected'] += 1 self.review_results['total_reviewed'] += 1 # Progress logging if (i + 1) % 10 == 0: logger.info(f"✅ Processed {i + 1}/{len(concepts)} concepts") # Rate limiting await asyncio.sleep(0.5) return all_reviewed_concepts def load_all_concepts(self, categories: List[str], concepts_dir: str) -> List[Dict]: """Load concepts from category files (original functionality)""" all_concepts = [] for category in categories: logger.info(f"\n🔍 Loading {category} concepts...") category_concepts = self.load_concepts_from_category(category, concepts_dir) # Flatten and add to all_concepts for confidence_level in ['high', 'medium', 'low']: for concept in category_concepts[confidence_level]: concept['category'] = category concept['confidence_level'] = confidence_level all_concepts.append(concept) return all_concepts async def run_review(self, categories: List[str], concepts_dir: str) -> Dict[str, Any]: """Run concept review (original mode)""" # Load all concepts all_concepts = self.load_all_concepts(categories, concepts_dir) # Process concepts all_reviewed_concepts = await self.process_concepts(all_concepts) # Organize results by decision results = { 'approved_concepts': [], 'modified_concepts': [], 'rejected_concepts': [], 'error_concepts': [], 'summary': self.review_results, 'generated_at': datetime.now().isoformat() } for review in all_reviewed_concepts: decision = review.get('validation_decision', 'ERROR') if decision == 'approve': results['approved_concepts'].append(review) elif decision == 'modify': results['modified_concepts'].append(review) elif decision == 'reject': results['rejected_concepts'].append(review) else: results['error_concepts'].append(review) return results async def run_negative_knowledge(self, ontology_file: str) -> Dict[str, Any]: """Run negative knowledge analysis""" # Load rejected concepts rejected_concepts = self._load_rejected_concepts(ontology_file) # Process concepts analyzed_concepts = await self.process_concepts(rejected_concepts) # Organize results by rejection category results = { 'negative_knowledge_taxonomy': {}, 'analyzed_concepts': analyzed_concepts, 'summary': { 'total_analyzed': len(analyzed_concepts), 'categories': {} }, 'generated_at': datetime.now().isoformat() } # Group by rejection category for analysis in analyzed_concepts: category = analysis.get('rejection_category', 'uncategorized') if category not in results['negative_knowledge_taxonomy']: results['negative_knowledge_taxonomy'][category] = [] results['negative_knowledge_taxonomy'][category].append(analysis) # Update category counts if category not in results['summary']['categories']: results['summary']['categories'][category] = 0 results['summary']['categories'][category] += 1 return results async def run_complex_analysis(self, ontology_file: str) -> Dict[str, Any]: """Run complex concept analysis""" # Load complex concepts complex_concepts = self._load_complex_concepts(ontology_file) # Process concepts analyzed_concepts = await self.process_concepts(complex_concepts) # Organize results by complexity level results = { 'complex_implementation_strategies': analyzed_concepts, 'summary': { 'total_analyzed': len(analyzed_concepts), 'complexity_distribution': {} }, 'generated_at': datetime.now().isoformat() } # Group by implementation complexity for analysis in analyzed_concepts: complexity = analysis.get('implementation_complexity', 'unknown') if complexity not in results['summary']['complexity_distribution']: results['summary']['complexity_distribution'][complexity] = 0 results['summary']['complexity_distribution'][complexity] += 1 return results async def main(): parser = argparse.ArgumentParser(description='LLM-powered concept reviewer with multiple modes') parser.add_argument('--openai-api-key', required=True, help='OpenAI API key') parser.add_argument('--anthropic-api-key', required=True, help='Anthropic API key') parser.add_argument('--mode', choices=['review', 'negative_knowledge', 'complex_analysis'], default='review', help='Review mode') parser.add_argument('--concepts-dir', default='../concepts', help='Directory containing concept files') parser.add_argument('--ontology-file', help='Unified ontology file (required for negative_knowledge and complex_analysis modes)') parser.add_argument('--output', help='Output file path (auto-generated if not specified)') args = parser.parse_args() # Validate arguments if args.mode in ['negative_knowledge', 'complex_analysis'] and not args.ontology_file: parser.error(f"--ontology-file is required for {args.mode} mode") # Initialize reviewer reviewer = LLMConceptReviewer(args.openai_api_key, args.anthropic_api_key, args.mode) # Run based on mode if args.mode == 'review': categories = ['core_demographics', 'economics', 'education', 'geography', 'health_social', 'housing', 'specialized_populations', 'transportation'] results = await reviewer.run_review(categories, args.concepts_dir) default_output = 'COOS_Complete_Ontology.json' elif args.mode == 'negative_knowledge': results = await reviewer.run_negative_knowledge(args.ontology_file) default_output = 'COOS_Negative_Knowledge.json' elif args.mode == 'complex_analysis': results = await reviewer.run_complex_analysis(args.ontology_file) default_output = 'COOS_Complex_Analysis.json' # Save results output_file = args.output or default_output with open(output_file, 'w') as f: json.dump(results, f, indent=2) logger.info(f"\n✅ {args.mode.title()} complete! Results saved to {output_file}") # Print summary if args.mode == 'review': summary = results['summary'] logger.info(f"📊 Summary: {summary['total_reviewed']} concepts reviewed") logger.info(f" ✅ Approved: {summary['approved']}") logger.info(f" ✏️ Modified: {summary['modified']}") logger.info(f" ❌ Rejected: {summary['rejected']}") logger.info(f" 🚫 Errors: {len(summary['processing_errors'])}") elif args.mode == 'negative_knowledge': summary = results['summary'] logger.info(f"📊 Summary: {summary['total_analyzed']} rejected concepts analyzed") for category, count in summary['categories'].items(): logger.info(f" 📂 {category}: {count}") elif args.mode == 'complex_analysis': summary = results['summary'] logger.info(f"📊 Summary: {summary['total_analyzed']} complex concepts analyzed") for complexity, count in summary['complexity_distribution'].items(): logger.info(f" 🔧 {complexity}: {count}") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server