Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
link_variables_to_concepts.py26.4 kB
#!/usr/bin/env python3 """ link_variables_to_concepts.py - Step 3-B: Link 36K variables to 160 backbone concepts Uses sentence transformers to compute semantic similarity between variable descriptions and backbone concept definitions, then generates weighted links based on cosine similarity. Usage: python knowledge-base/scripts/link_variables_to_concepts.py Inputs: - knowledge-base/concepts/concept_backbone.ttl (160 backbone concepts) - 2023_ACS_Enriched_Universe.json (36,918 enriched variables) Output: - knowledge-base/concepts/variable_links.ttl """ import json import logging import pickle from pathlib import Path from typing import Dict, List, Tuple, Set from datetime import datetime import numpy as np from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import re # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - VAR_LINKER - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration SIMILARITY_THRESHOLD = 0.20 # Minimum cosine similarity to create a link MAX_CONCEPTS_PER_VARIABLE = 5 # Maximum backbone concepts per variable EMBEDDING_MODEL = "sentence-transformers/all-roberta-large-v1" # Match your existing setup BATCH_SIZE = 50 # Process variables in batches (match your concept_weight_extractor) EMBED_BATCH_SIZE = 32 # Embedding batch size class VariableConceptLinker: """Link variables to backbone concepts using semantic similarity""" def __init__(self): self.base_dir = Path(__file__).parent.parent self.backbone_path = self.base_dir / "concepts" / "concept_backbone.ttl" # Try multiple possible locations for the variables file possible_paths = [ self.base_dir / "scripts" / "2023_ACS_Enriched_Universe.json", # Found location Path("2023_ACS_Enriched_Universe.json"), Path("../2023_ACS_Enriched_Universe.json"), Path("data/2023_ACS_Enriched_Universe.json"), self.base_dir / "data" / "2023_ACS_Enriched_Universe.json", self.base_dir.parent / "2023_ACS_Enriched_Universe.json" ] self.variables_path = None for path in possible_paths: if path.exists(): self.variables_path = path break if self.variables_path is None: # If not found, use the first option as default for error message self.variables_path = possible_paths[0] self.output_path = self.base_dir / "concepts" / "variable_links.ttl" self.cache_path = self.base_dir / "concepts" / "embedding_cache.pkl" self.checkpoint_path = self.base_dir / "concepts" / "linking_checkpoint.json" # Load sentence transformer model logger.info(f"Loading sentence transformer model: {EMBEDDING_MODEL}") self.model = SentenceTransformer(EMBEDDING_MODEL) # Storage for concepts and variables self.backbone_concepts = [] # List of concept dicts self.concept_embeddings = None # Numpy array of embeddings self.variables = [] # List of variable dicts # Statistics tracking self.stats = { 'total_variables': 0, 'variables_linked': 0, 'variables_unlinked': 0, 'total_links': 0, 'avg_concepts_per_variable': 0.0, 'similarity_distribution': [] } def extract_backbone_concepts(self) -> List[Dict]: """Extract concept definitions from TTL backbone file""" logger.info(f"Extracting backbone concepts from {self.backbone_path}") if not self.backbone_path.exists(): raise FileNotFoundError(f"Backbone file not found: {self.backbone_path}") concepts = [] with open(self.backbone_path, 'r', encoding='utf-8') as f: content = f.read() # Split content into blocks starting with "cendata:" blocks = re.split(r'\ncendata:', content) for i, block in enumerate(blocks): if i == 0: # Skip header block continue # Add back the cendata: prefix block = 'cendata:' + block lines = block.strip().split('\n') if not lines: continue # Extract concept ID from first line first_line = lines[0].strip() if not first_line.startswith('cendata:'): continue concept_id = first_line.replace('cendata:', '').strip() # Initialize concept concept = {'id': concept_id, 'label': '', 'definition': ''} # Parse lines for prefLabel and definition in_definition = False definition_parts = [] for line in lines[1:]: line = line.strip() if 'skos:prefLabel' in line: # Extract label - it's enclosed in quotes if '"' in line: parts = line.split('"') if len(parts) >= 2: concept['label'] = parts[1].strip() elif 'skos:definition' in line: # Start of definition in_definition = True if '"' in line: # Definition starts on same line after_def = line.split('skos:definition', 1)[1].strip() if after_def.startswith('"'): definition_text = after_def[1:] # Remove opening quote if definition_text.endswith('";'): # Single line definition concept['definition'] = definition_text[:-2].strip() in_definition = False elif definition_text.endswith('"'): # Single line definition concept['definition'] = definition_text[:-1].strip() in_definition = False else: # Multi-line definition starts definition_parts = [definition_text] elif in_definition: # Continue collecting definition lines if line.endswith('";') or line.endswith('" ;'): # End of definition if line.endswith('";'): definition_parts.append(line[:-2]) else: definition_parts.append(line[:-3]) concept['definition'] = ' '.join(definition_parts).strip() in_definition = False definition_parts = [] elif line.endswith('"'): # End of definition without semicolon definition_parts.append(line[:-1]) concept['definition'] = ' '.join(definition_parts).strip() in_definition = False definition_parts = [] elif not line.startswith(('skos:', 'cendata:', 'dct:')): # Continuation of definition definition_parts.append(line) else: # Hit another property, end definition concept['definition'] = ' '.join(definition_parts).strip() in_definition = False definition_parts = [] # Finalize any remaining definition if in_definition and definition_parts: concept['definition'] = ' '.join(definition_parts).strip() # Clean up and add concept if we have useful data if concept['label'] or concept['definition']: # Clean up text concept['label'] = concept['label'].replace('\\"', '"').strip() concept['definition'] = concept['definition'].replace('\\"', '"').strip() concept['text_for_embedding'] = f"{concept['label']}. {concept['definition']}".strip() concepts.append(concept) logger.info(f"Extracted {len(concepts)} backbone concepts") if len(concepts) == 0: logger.warning("No concepts extracted - showing first few concept blocks:") for i, block in enumerate(blocks[1:6]): # Show first 5 concept blocks logger.warning(f"Block {i+1}: {block[:200]}...") else: logger.info(f"Sample concept: {concepts[0]['id']} - {concepts[0]['label'][:50]}...") return concepts def load_variables(self) -> List[Dict]: """Load enriched variables from consolidated JSON""" logger.info(f"Loading variables from {self.variables_path}") if not self.variables_path.exists(): raise FileNotFoundError(f"Variables file not found: {self.variables_path}") with open(self.variables_path, 'r', encoding='utf-8') as f: data = json.load(f) # Handle both direct format and nested "variables" key if 'variables' in data: variables_dict = data['variables'] else: variables_dict = data variables = [] for var_id, var_data in variables_dict.items(): # Extract the best available description for embedding description = self._get_best_description(var_data) if description and len(description.strip()) > 10: # Minimum description length variables.append({ 'id': var_id, 'label': var_data.get('label', ''), 'description': description, 'quality_tier': var_data.get('quality_tier', 'unknown'), 'source_type': var_data.get('source_type', 'unknown'), 'text_for_embedding': f"{var_data.get('label', '')}. {description}".strip() }) logger.info(f"Loaded {len(variables)} variables with valid descriptions") if len(variables) == 0: logger.warning("No variables loaded - checking data structure:") logger.warning(f"Top-level keys: {list(data.keys())}") if 'variables' in data: sample_vars = list(data['variables'].items())[:3] for var_id, var_data in sample_vars: logger.warning(f"Sample variable {var_id}: {list(var_data.keys())}") return variables def _get_best_description(self, var_data: Dict) -> str: """Extract the best available description from variable data""" # Based on concept_weight_extractor.py - check for enrichment_text field first # Direct enrichment_text field (from consolidated universe) if var_data.get('enrichment_text'): return var_data.get('enrichment_text') # Concept field if var_data.get('concept'): return var_data.get('concept') # Label field if var_data.get('label'): return var_data.get('label') # Nested enrichment structure (fallback) enrichment = var_data.get('enrichment', {}) if enrichment.get('analysis'): return enrichment.get('analysis') if enrichment.get('enhanced_description'): return enrichment.get('enhanced_description') # Legacy nested analysis structures for analysis_key in ['coos_analysis', 'bulk_analysis', 'early_analysis']: if analysis_key in var_data: analysis = var_data[analysis_key] if isinstance(analysis, dict) and 'enhanced_description' in analysis: return analysis['enhanced_description'] # Fallback to original description return var_data.get('description', '') def _progress_update(self, current: int, total: int, batch_stats: Dict = None): """Update progress on same line (from concept_weight_extractor)""" percentage = (current / total) * 100 progress_bar = "█" * int(percentage / 5) + "░" * (20 - int(percentage / 5)) stats_str = "" if batch_stats: avg_links = batch_stats.get('avg_links', 0) max_links = batch_stats.get('max_links', 0) zero_links = batch_stats.get('zero_links', 0) stats_str = f" | Avg: {avg_links:.1f} | Max: {max_links} | Zero: {zero_links}" # Use \r to overwrite the same line print(f"\r🔄 [{progress_bar}] {current:,}/{total:,} ({percentage:.1f}%){stats_str}", end='', flush=True) def _save_checkpoint(self, processed_index: int, processed_links: List[Dict]): """Save checkpoint data""" checkpoint = { 'processed_index': processed_index, 'processed_links': processed_links, 'timestamp': datetime.now().isoformat() } with open(self.checkpoint_path, 'w') as f: json.dump(checkpoint, f, indent=2) def _load_checkpoint(self) -> Tuple[int, List[Dict]]: """Load checkpoint data if exists""" if self.checkpoint_path.exists(): logger.info("Loading checkpoint...") with open(self.checkpoint_path, 'r') as f: checkpoint = json.load(f) return checkpoint.get('processed_index', -1), checkpoint.get('processed_links', []) return -1, [] def compute_embeddings(self) -> None: """Compute embeddings for concepts and variables with caching""" # Check for cached embeddings if self.cache_path.exists(): logger.info("Loading cached embeddings...") with open(self.cache_path, 'rb') as f: cache = pickle.load(f) # Verify cache validity if (len(cache.get('concept_embeddings', [])) == len(self.backbone_concepts) and len(cache.get('variable_embeddings', [])) == len(self.variables)): self.concept_embeddings = np.array(cache['concept_embeddings']) self.variable_embeddings = np.array(cache['variable_embeddings']) logger.info("Using cached embeddings") return logger.info("Computing fresh embeddings...") # Compute concept embeddings concept_texts = [c['text_for_embedding'] for c in self.backbone_concepts] logger.info(f"Computing embeddings for {len(concept_texts)} concepts...") self.concept_embeddings = self.model.encode(concept_texts, batch_size=32, show_progress_bar=True) # Compute variable embeddings in batches variable_texts = [v['text_for_embedding'] for v in self.variables] logger.info(f"Computing embeddings for {len(variable_texts)} variables...") self.variable_embeddings = self.model.encode(variable_texts, batch_size=EMBED_BATCH_SIZE, show_progress_bar=True) # Cache embeddings logger.info("Caching embeddings for future use...") cache = { 'concept_embeddings': self.concept_embeddings.tolist(), 'variable_embeddings': self.variable_embeddings.tolist(), 'timestamp': datetime.now().isoformat() } with open(self.cache_path, 'wb') as f: pickle.dump(cache, f) logger.info("Embeddings computed and cached") def compute_similarities_and_links(self) -> List[Dict]: """Compute similarities and generate variable-concept links with checkpointing""" logger.info("Computing similarities and generating links...") # Load checkpoint if exists last_processed_index, existing_links = self._load_checkpoint() start_index = last_processed_index + 1 if start_index > 0: logger.info(f"Resuming from variable {start_index:,} (checkpoint found)") # Compute cosine similarities: variables x concepts (batched processing) all_links = existing_links.copy() variables_with_links = len(set(link['variable_id'] for link in existing_links)) total_variables = len(self.variables) # Process in batches for memory efficiency for batch_start in range(start_index, total_variables, BATCH_SIZE): batch_end = min(batch_start + BATCH_SIZE, total_variables) batch_variables = self.variables[batch_start:batch_end] # Get embeddings for this batch batch_var_embeddings = self.variable_embeddings[batch_start:batch_end] # Compute similarities for this batch similarities_batch = cosine_similarity(batch_var_embeddings, self.concept_embeddings) # Process each variable in the batch batch_links = [] batch_stats = {'avg_links': 0, 'max_links': 0, 'zero_links': 0} links_count = [] for i, variable in enumerate(batch_variables): var_similarities = similarities_batch[i] # Find concepts above threshold above_threshold = np.where(var_similarities >= SIMILARITY_THRESHOLD)[0] if len(above_threshold) == 0: batch_stats['zero_links'] += 1 links_count.append(0) continue # Sort by similarity and take top N sorted_indices = above_threshold[np.argsort(-var_similarities[above_threshold])] top_concepts = sorted_indices[:MAX_CONCEPTS_PER_VARIABLE] # Create links for this variable variable_links = [] for concept_idx in top_concepts: similarity = var_similarities[concept_idx] concept = self.backbone_concepts[concept_idx] link = { 'variable_id': variable['id'], 'concept_id': concept['id'], 'concept_label': concept['label'], 'similarity': float(similarity), 'weight': float(similarity) # Use similarity as weight } variable_links.append(link) self.stats['similarity_distribution'].append(float(similarity)) # Normalize weights to sum to 1.0 total_weight = sum(link['weight'] for link in variable_links) if total_weight > 0: for link in variable_links: link['normalized_weight'] = link['weight'] / total_weight batch_links.extend(variable_links) links_count.append(len(variable_links)) variables_with_links += 1 # Update batch statistics if links_count: batch_stats['avg_links'] = np.mean(links_count) batch_stats['max_links'] = np.max(links_count) # Add to all links all_links.extend(batch_links) # Save checkpoint every batch self._save_checkpoint(batch_end - 1, all_links) # Progress update self._progress_update(batch_end, total_variables, batch_stats) print() # New line after progress bar # Update final statistics self.stats['total_variables'] = len(self.variables) self.stats['variables_linked'] = variables_with_links self.stats['variables_unlinked'] = len(self.variables) - variables_with_links self.stats['total_links'] = len(all_links) self.stats['avg_concepts_per_variable'] = len(all_links) / max(variables_with_links, 1) logger.info(f"Generated {len(all_links)} links for {variables_with_links} variables") return all_links def generate_ttl_output(self, links: List[Dict]) -> None: """Generate TTL file with variable-concept links""" logger.info(f"Generating TTL output: {self.output_path}") # TTL header ttl_content = f"""# Variable-Concept Links - Generated {datetime.now().isoformat()} # Links 36K ACS variables to 160 backbone concepts via semantic similarity @prefix skos: <http://www.w3.org/2004/02/skos/core#> . @prefix dct: <http://purl.org/dc/terms/> . @prefix xsd: <http://www.w3.org/2001/XMLSchema#> . @prefix cendata: <https://example.org/cendata/> . # Link predicates cendata:hasConcept a owl:ObjectProperty ; rdfs:label "has concept" ; rdfs:comment "Links a variable to a backbone concept" . cendata:conceptWeight a owl:DatatypeProperty ; rdfs:label "concept weight" ; rdfs:comment "Semantic similarity weight between variable and concept" ; rdfs:range xsd:decimal . cendata:conceptWeightNormalized a owl:DatatypeProperty ; rdfs:label "normalized concept weight" ; rdfs:comment "Concept weight normalized so all weights for a variable sum to 1.0" ; rdfs:range xsd:decimal . # Variable-concept links """ # Group links by variable for cleaner output links_by_variable = {} for link in links: var_id = link['variable_id'] if var_id not in links_by_variable: links_by_variable[var_id] = [] links_by_variable[var_id].append(link) # Generate TTL for each variable for var_id, var_links in links_by_variable.items(): ttl_content += f"cendata:{var_id}\n" # Add concept links for i, link in enumerate(var_links): ttl_content += f" cendata:hasConcept cendata:{link['concept_id']} ;\n" ttl_content += f" cendata:conceptWeight \"{link['weight']:.4f}\"^^xsd:decimal ;\n" ttl_content += f" cendata:conceptWeightNormalized \"{link['normalized_weight']:.4f}\"^^xsd:decimal" if i < len(var_links) - 1: ttl_content += " ;\n" else: ttl_content += " .\n\n" # Write output file self.output_path.parent.mkdir(parents=True, exist_ok=True) with open(self.output_path, 'w', encoding='utf-8') as f: f.write(ttl_content) logger.info(f"TTL output written: {self.output_path.stat().st_size:,} bytes") def generate_summary_report(self) -> None: """Generate summary statistics and report""" logger.info("\n📊 Variable-Concept Linking Summary:") logger.info(f" Total variables processed: {self.stats['total_variables']:,}") logger.info(f" Variables with links: {self.stats['variables_linked']:,}") logger.info(f" Variables without links: {self.stats['variables_unlinked']:,}") logger.info(f" Link coverage: {self.stats['variables_linked']/self.stats['total_variables']*100:.1f}%") logger.info(f" Total concept links: {self.stats['total_links']:,}") logger.info(f" Average concepts per variable: {self.stats['avg_concepts_per_variable']:.2f}") if self.stats['similarity_distribution']: similarities = np.array(self.stats['similarity_distribution']) logger.info(f" Similarity distribution:") logger.info(f" Min: {similarities.min():.3f}") logger.info(f" Mean: {similarities.mean():.3f}") logger.info(f" Max: {similarities.max():.3f}") logger.info(f" Median: {np.median(similarities):.3f}") def run_linking_pipeline(self) -> None: """Execute the complete variable-concept linking pipeline""" logger.info("🚀 Starting Variable-Concept Linking Pipeline (Step 3-B)") try: # Load data self.backbone_concepts = self.extract_backbone_concepts() self.variables = self.load_variables() if len(self.backbone_concepts) == 0: raise ValueError("No backbone concepts loaded") if len(self.variables) == 0: raise ValueError("No variables loaded") # Compute embeddings self.compute_embeddings() # Generate links links = self.compute_similarities_and_links() # Generate output self.generate_ttl_output(links) # Generate summary report self.generate_summary_report() # Clean up checkpoint if self.checkpoint_path.exists(): self.checkpoint_path.unlink() logger.info("Checkpoint file cleaned up") logger.info("\n🎯 Next steps:") logger.info(" 1. Load variable_links.ttl into your graph store") logger.info(" 2. Test concept-based variable retrieval queries") logger.info(" 3. Integrate with existing cendata-extension-complete.ttl") logger.info(" 4. Implement probabilistic concept weighting in search") logger.info(f"\n✅ Variable-Concept Linking Complete!") except Exception as e: logger.error(f"Pipeline failed: {e}") raise def main(): """Main execution function""" linker = VariableConceptLinker() linker.run_linking_pipeline() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server