Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
build_geography_db.py30.8 kB
#!/usr/bin/env python3 """ Enhanced Census Gazetteer Database Builder with Reference Tables Integration. Builds proper hierarchical schema: Nation -> Regions -> Divisions -> States -> Counties -> Places Input: - Raw gazetteer files (places, counties, CBSAs, etc.) - Clean reference tables (states.csv, regions-divisions.csv, etc.) Output: Complete hierarchical geographic database """ import sqlite3 import csv import logging from pathlib import Path from typing import Dict, List, Optional import re logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class HierarchicalGazetteerProcessor: """Enhanced processor with proper hierarchical schema and reference tables.""" def __init__(self, source_dir: str, reference_dir: str, output_db: str): self.source_dir = Path(source_dir) self.reference_dir = Path(reference_dir) self.output_db = Path(output_db) self.conn = None # Auto-detect gazetteer files self.gazetteer_files = self._detect_gazetteer_files() logger.info(f"Enhanced Gazetteer processor initialized") logger.info(f"Source: {self.source_dir}") logger.info(f"Reference: {self.reference_dir}") logger.info(f"Output: {self.output_db}") def _detect_gazetteer_files(self) -> Dict[str, str]: """Auto-detect gazetteer files from directory.""" files = {} if not self.source_dir.exists(): logger.error(f"Source directory does not exist: {self.source_dir}") return files all_files = list(self.source_dir.glob("*.txt")) logger.info(f"Found {len(all_files)} .txt files in {self.source_dir}") for file_path in all_files: filename = file_path.name filename_lower = filename.lower() # Map based on file naming patterns if '_place_' in filename_lower and 'national' in filename_lower: files['places'] = filename elif '_counties_' in filename_lower and 'national' in filename_lower: files['counties'] = filename elif '_cbsa_' in filename_lower and 'national' in filename_lower: files['cbsas'] = filename elif '_zcta_' in filename_lower and 'national' in filename_lower: files['zctas'] = filename elif 'adjacency' in filename_lower: files['adjacency'] = filename logger.info(f"Detected gazetteer files: {list(files.keys())}") return files def build_database(self): """Build complete hierarchical geographic database.""" try: self._create_hierarchical_schema() self._load_reference_tables() self._process_counties() self._process_places() self._process_cbsas() self._process_zctas() self._process_adjacency() self._create_indexes() self._validate_hierarchy() logger.info("✅ Hierarchical geographic database built successfully") except Exception as e: logger.error(f"❌ Database build failed: {e}") raise finally: if self.conn: self.conn.close() def _create_hierarchical_schema(self): """Create proper hierarchical schema with foreign keys.""" if self.output_db.exists(): self.output_db.unlink() logger.info("Removed existing database") self.conn = sqlite3.connect(self.output_db) # Regions table (top level) self.conn.execute(""" CREATE TABLE regions ( region_code TEXT PRIMARY KEY, region_name TEXT NOT NULL ) """) # Divisions table (under regions) self.conn.execute(""" CREATE TABLE divisions ( division_code TEXT PRIMARY KEY, division_name TEXT NOT NULL, region_code TEXT NOT NULL, FOREIGN KEY (region_code) REFERENCES regions(region_code) ) """) # States table (under divisions) - THE MISSING PIECE self.conn.execute(""" CREATE TABLE states ( state_fips TEXT PRIMARY KEY, state_abbrev TEXT UNIQUE NOT NULL, state_name TEXT NOT NULL, region_code TEXT NOT NULL, division_code TEXT NOT NULL, FOREIGN KEY (region_code) REFERENCES regions(region_code), FOREIGN KEY (division_code) REFERENCES divisions(division_code) ) """) # Counties table (under states) - ENHANCED with FK self.conn.execute(""" CREATE TABLE counties ( county_fips TEXT NOT NULL, state_fips TEXT NOT NULL, name TEXT NOT NULL, name_lower TEXT NOT NULL, state_abbrev TEXT NOT NULL, lat REAL, lon REAL, land_area REAL, water_area REAL, PRIMARY KEY (county_fips, state_fips), FOREIGN KEY (state_fips) REFERENCES states(state_fips) ) """) # Places table (under states/counties) - ENHANCED with FK self.conn.execute(""" CREATE TABLE places ( place_fips TEXT NOT NULL, state_fips TEXT NOT NULL, county_fips TEXT, -- NEW: Link to county (nullable for cross-county places) name TEXT NOT NULL, name_lower TEXT NOT NULL, state_abbrev TEXT NOT NULL, lat REAL, lon REAL, land_area REAL, water_area REAL, population INTEGER, PRIMARY KEY (place_fips, state_fips), FOREIGN KEY (state_fips) REFERENCES states(state_fips), FOREIGN KEY (county_fips, state_fips) REFERENCES counties(county_fips, state_fips) ) """) # CBSAs (cross-boundary metro areas) self.conn.execute(""" CREATE TABLE cbsas ( cbsa_code TEXT PRIMARY KEY, name TEXT NOT NULL, name_lower TEXT NOT NULL, cbsa_type TEXT, lat REAL, lon REAL, land_area REAL, water_area REAL ) """) # ZIP Code Tabulation Areas (cross-boundary) self.conn.execute(""" CREATE TABLE zctas ( zcta_code TEXT PRIMARY KEY, lat REAL, lon REAL, land_area REAL, water_area REAL ) """) # Name variations for fuzzy matching self.conn.execute(""" CREATE TABLE name_variations ( canonical_name TEXT NOT NULL, variation TEXT NOT NULL, geography_type TEXT NOT NULL, place_fips TEXT, state_fips TEXT, PRIMARY KEY (variation, geography_type) ) """) # County adjacency for spatial queries self.conn.execute(""" CREATE TABLE county_adjacency ( county_geoid TEXT NOT NULL, county_name TEXT NOT NULL, neighbor_geoid TEXT NOT NULL, neighbor_name TEXT NOT NULL, PRIMARY KEY (county_geoid, neighbor_geoid), FOREIGN KEY (county_geoid) REFERENCES counties(county_fips), FOREIGN KEY (neighbor_geoid) REFERENCES counties(county_fips) ) """) logger.info("✅ Hierarchical schema created with proper foreign keys") def _load_reference_tables(self): """Load clean reference data from CSV files.""" logger.info("Loading reference tables...") # Load regions regions_file = self.reference_dir / "regions-divisions.csv" if regions_file.exists(): with open(regions_file, 'r') as f: reader = csv.DictReader(f) regions = {} for row in reader: region_code = row['region_code'] region_name = row['region_name'] division_code = row['division_code'] division_name = row['division_name'] # Insert region (deduplicated) if region_code not in regions: self.conn.execute(""" INSERT OR REPLACE INTO regions (region_code, region_name) VALUES (?, ?) """, (region_code, region_name)) regions[region_code] = True # Insert division self.conn.execute(""" INSERT OR REPLACE INTO divisions (division_code, division_name, region_code) VALUES (?, ?, ?) """, (division_code, division_name, region_code)) logger.info("✅ Loaded regions and divisions") # Load states with hierarchy states_file = self.reference_dir / "states.csv" if states_file.exists(): with open(states_file, 'r') as f: reader = csv.DictReader(f) states_count = 0 for row in reader: self.conn.execute(""" INSERT OR REPLACE INTO states (state_fips, state_abbrev, state_name, region_code, division_code) VALUES (?, ?, ?, ?, ?) """, (row['state_fips'], row['state_abbrev'], row['state_name'], row['region_code'], row['division_code'])) states_count += 1 logger.info(f"✅ Loaded {states_count} states with hierarchy") self.conn.commit() def _process_counties(self): """Process counties with proper foreign key relationships.""" counties_file = self.source_dir / self.gazetteer_files.get('counties', '') if not counties_file.exists(): logger.warning(f"Counties file not found: {counties_file}") return logger.info(f"Processing counties with hierarchy from {counties_file}") count = 0 with open(counties_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='\t') for row in reader: if len(row) < 10: continue try: state_abbrev = row[0].strip() geoid = row[1].strip() name = row[3].strip() lat = float(row[8]) if row[8].strip() else None lon = float(row[9]) if row[9].strip() else None land_area = float(row[4]) if row[4].strip() else None water_area = float(row[5]) if row[5].strip() else None # Extract state and county FIPS from GEOID if len(geoid) >= 5: state_fips = geoid[:2] county_fips = geoid[2:] else: continue name_lower = name.lower() self.conn.execute(""" INSERT OR REPLACE INTO counties (county_fips, state_fips, name, name_lower, state_abbrev, lat, lon, land_area, water_area) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (county_fips, state_fips, name, name_lower, state_abbrev, lat, lon, land_area, water_area)) # Add name variations self._add_name_variations(name, 'county', county_fips, state_fips) count += 1 if count % 100 == 0: logger.info(f"Processed {count} counties...") except (ValueError, IndexError) as e: logger.warning(f"Skipped malformed county row: {e}") continue self.conn.commit() logger.info(f"✅ Processed {count} counties with state foreign keys") def _process_places(self): """Process places with enhanced hierarchy and county linking.""" places_file = self.source_dir / self.gazetteer_files['places'] if not places_file.exists(): logger.warning(f"Places file not found: {places_file}") return logger.info(f"Processing places with hierarchy from {places_file}") # Build state abbreviation to FIPS mapping state_abbrev_to_fips = { 'AL': '01', 'AK': '02', 'AZ': '04', 'AR': '05', 'CA': '06', 'CO': '08', 'CT': '09', 'DE': '10', 'FL': '12', 'GA': '13', 'HI': '15', 'ID': '16', 'IL': '17', 'IN': '18', 'IA': '19', 'KS': '20', 'KY': '21', 'LA': '22', 'ME': '23', 'MD': '24', 'MA': '25', 'MI': '26', 'MN': '27', 'MS': '28', 'MO': '29', 'MT': '30', 'NE': '31', 'NV': '32', 'NH': '33', 'NJ': '34', 'NM': '35', 'NY': '36', 'NC': '37', 'ND': '38', 'OH': '39', 'OK': '40', 'OR': '41', 'PA': '42', 'RI': '44', 'SC': '45', 'SD': '46', 'TN': '47', 'TX': '48', 'UT': '49', 'VT': '50', 'VA': '51', 'WA': '53', 'WV': '54', 'WI': '55', 'WY': '56', 'DC': '11', 'PR': '72' } count = 0 with open(places_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='\t') header = next(reader) # Skip header row for row in reader: if len(row) < 12: continue try: state_abbrev = row[0].strip() geoid = row[1].strip() name = row[3].strip() funcstat = row[5].strip() lat = float(row[10]) if row[10].strip() else None lon = float(row[11]) if row[11].strip() else None land_area = float(row[6]) if row[6].strip() else None water_area = float(row[7]) if row[7].strip() else None # Skip non-active places if funcstat not in ['A', 'S']: continue # Get state FIPS from abbreviation state_fips = state_abbrev_to_fips.get(state_abbrev) if not state_fips: continue # Extract place FIPS from GEOID if len(geoid) >= 7: place_fips = geoid[2:] # Remove state part else: continue name_lower = name.lower() # TODO: Add county linking logic here for places # For now, leaving county_fips as NULL county_fips = None self.conn.execute(""" INSERT OR REPLACE INTO places (place_fips, state_fips, county_fips, name, name_lower, state_abbrev, lat, lon, land_area, water_area) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (place_fips, state_fips, county_fips, name, name_lower, state_abbrev, lat, lon, land_area, water_area)) # Add name variations self._add_name_variations(name, 'place', place_fips, state_fips) count += 1 if count % 1000 == 0: logger.info(f"Processed {count} places...") except (ValueError, IndexError) as e: logger.warning(f"Skipped malformed place row: {e}") continue self.conn.commit() logger.info(f"✅ Processed {count} places with state foreign keys") def _process_cbsas(self): """Process CBSAs (unchanged from original).""" cbsas_file = self.source_dir / self.gazetteer_files.get('cbsas', '') if not cbsas_file.exists(): logger.warning(f"CBSAs file not found: {cbsas_file}") return logger.info(f"Processing CBSAs from {cbsas_file}") count = 0 with open(cbsas_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='\t') for row in reader: if len(row) < 10: continue try: cbsa_code = row[1].strip() name = row[2].strip() cbsa_type = row[3].strip() lat = float(row[8]) if row[8].strip() else None lon = float(row[9]) if row[9].strip() else None land_area = float(row[4]) if row[4].strip() else None water_area = float(row[5]) if row[5].strip() else None name_lower = name.lower() self.conn.execute(""" INSERT OR REPLACE INTO cbsas (cbsa_code, name, name_lower, cbsa_type, lat, lon, land_area, water_area) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (cbsa_code, name, name_lower, cbsa_type, lat, lon, land_area, water_area)) self._add_name_variations(name, 'cbsa', cbsa_code, None) count += 1 if count % 100 == 0: logger.info(f"Processed {count} CBSAs...") except (ValueError, IndexError) as e: logger.warning(f"Skipped malformed CBSA row: {e}") continue self.conn.commit() logger.info(f"✅ Processed {count} CBSAs") def _process_zctas(self): """Process ZCTAs (unchanged from original).""" zctas_file = self.source_dir / self.gazetteer_files.get('zctas', '') if not zctas_file.exists(): logger.info("ZCTAs file not found - skipping ZIP code support") return logger.info(f"Processing ZCTAs from {zctas_file}") count = 0 with open(zctas_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='\t') for row in reader: if len(row) < 7: continue try: zcta_code = row[0].strip() lat = float(row[5]) if row[5].strip() else None lon = float(row[6]) if row[6].strip() else None land_area = float(row[1]) if row[1].strip() else None water_area = float(row[2]) if row[2].strip() else None self.conn.execute(""" INSERT OR REPLACE INTO zctas (zcta_code, lat, lon, land_area, water_area) VALUES (?, ?, ?, ?, ?) """, (zcta_code, lat, lon, land_area, water_area)) count += 1 if count % 1000 == 0: logger.info(f"Processed {count} ZCTAs...") except (ValueError, IndexError) as e: logger.warning(f"Skipped malformed ZCTA row: {e}") continue self.conn.commit() logger.info(f"✅ Processed {count} ZIP code areas") def _process_adjacency(self): """Process county adjacency (unchanged from original).""" adjacency_file = self.source_dir / self.gazetteer_files.get('adjacency', '') if not adjacency_file.exists(): logger.warning(f"Adjacency file not found: {adjacency_file}") return logger.info(f"Processing county adjacency from {adjacency_file}") count = 0 with open(adjacency_file, 'r', encoding='utf-8') as f: reader = csv.reader(f, delimiter='|') # Skip header if it exists first_row = next(reader, None) if first_row and 'County Name' in first_row[0]: pass # Header row, already consumed else: # No header, process this row if first_row and len(first_row) >= 4: county_name = first_row[0].strip() county_geoid = first_row[1].strip() neighbor_name = first_row[2].strip() neighbor_geoid = first_row[3].strip() if county_geoid != neighbor_geoid: self.conn.execute(""" INSERT OR REPLACE INTO county_adjacency (county_geoid, county_name, neighbor_geoid, neighbor_name) VALUES (?, ?, ?, ?) """, (county_geoid, county_name, neighbor_geoid, neighbor_name)) count += 1 # Process remaining rows for row in reader: if len(row) < 4: continue try: county_name = row[0].strip() county_geoid = row[1].strip() neighbor_name = row[2].strip() neighbor_geoid = row[3].strip() if county_geoid == neighbor_geoid: continue self.conn.execute(""" INSERT OR REPLACE INTO county_adjacency (county_geoid, county_name, neighbor_geoid, neighbor_name) VALUES (?, ?, ?, ?) """, (county_geoid, county_name, neighbor_geoid, neighbor_name)) count += 1 if count % 1000 == 0: logger.info(f"Processed {count} adjacency relationships...") except (ValueError, IndexError) as e: logger.warning(f"Skipped malformed adjacency row: {e}") continue self.conn.commit() logger.info(f"✅ Processed {count} county adjacency relationships") def _add_name_variations(self, name: str, geo_type: str, place_fips: str = None, state_fips: str = None): """Add common name variations for fuzzy matching (unchanged from original).""" name_lower = name.lower() variations = set() # Saint/St. variations if 'saint ' in name_lower: variations.add(name_lower.replace('saint ', 'st. ')) variations.add(name_lower.replace('saint ', 'st ')) elif 'st. ' in name_lower: variations.add(name_lower.replace('st. ', 'saint ')) variations.add(name_lower.replace('st. ', 'st ')) elif 'st ' in name_lower and not name_lower.endswith('st'): variations.add(name_lower.replace('st ', 'saint ')) variations.add(name_lower.replace('st ', 'st. ')) # Remove common suffixes for matching suffixes = ['city', 'town', 'village', 'borough'] for suffix in suffixes: if name_lower.endswith(f' {suffix}'): variations.add(name_lower.replace(f' {suffix}', '')) # Add the original name variations.add(name_lower) # Insert variations for variation in variations: try: self.conn.execute(""" INSERT OR REPLACE INTO name_variations (canonical_name, variation, geography_type, place_fips, state_fips) VALUES (?, ?, ?, ?, ?) """, (name_lower, variation, geo_type, place_fips, state_fips)) except sqlite3.Error: pass # Skip duplicates def _create_indexes(self): """Create performance indexes including hierarchy indexes.""" logger.info("Creating hierarchical database indexes...") # Hierarchy indexes self.conn.execute("CREATE INDEX idx_states_region ON states(region_code)") self.conn.execute("CREATE INDEX idx_states_division ON states(division_code)") self.conn.execute("CREATE INDEX idx_states_abbrev ON states(state_abbrev)") # Places indexes self.conn.execute("CREATE INDEX idx_places_name_state ON places(name_lower, state_abbrev)") self.conn.execute("CREATE INDEX idx_places_state ON places(state_abbrev)") self.conn.execute("CREATE INDEX idx_places_fips ON places(place_fips)") # Counties indexes self.conn.execute("CREATE INDEX idx_counties_name_state ON counties(name_lower, state_abbrev)") self.conn.execute("CREATE INDEX idx_counties_state ON counties(state_abbrev)") # CBSAs indexes self.conn.execute("CREATE INDEX idx_cbsas_name ON cbsas(name_lower)") # County adjacency indexes self.conn.execute("CREATE INDEX idx_adjacency_county ON county_adjacency(county_geoid)") self.conn.execute("CREATE INDEX idx_adjacency_neighbor ON county_adjacency(neighbor_geoid)") # Name variations index self.conn.execute("CREATE INDEX idx_variations_lookup ON name_variations(variation, geography_type)") self.conn.commit() logger.info("✅ Hierarchical database indexes created") def _validate_hierarchy(self): """Validate hierarchical database structure and foreign key relationships.""" cursor = self.conn.cursor() # Count records by level cursor.execute("SELECT COUNT(*) FROM regions") regions_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM divisions") divisions_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM states") states_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM counties") counties_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM places") places_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM cbsas") cbsas_count = cursor.fetchone()[0] logger.info(f"Hierarchical database validation:") logger.info(f" Regions: {regions_count}") logger.info(f" Divisions: {divisions_count}") logger.info(f" States: {states_count}") logger.info(f" Counties: {counties_count:,}") logger.info(f" Places: {places_count:,}") logger.info(f" CBSAs: {cbsas_count:,}") # Test hierarchical queries logger.info("Testing hierarchical queries:") # Test: "Maryland" state lookup cursor.execute(""" SELECT state_name, region_code, division_code FROM states WHERE state_abbrev = 'MD' OR state_name = 'Maryland' """) md_result = cursor.fetchone() if md_result: logger.info(f" ✅ Maryland -> {md_result[0]}, Region {md_result[1]}, Division {md_result[2]}") # Test: "Counties in Maryland" cursor.execute(""" SELECT COUNT(*) FROM counties c JOIN states s ON c.state_fips = s.state_fips WHERE s.state_abbrev = 'MD' """) md_counties = cursor.fetchone()[0] logger.info(f" ✅ Counties in Maryland: {md_counties}") # Test: Sample city hierarchical lookup cursor.execute(""" SELECT p.name, p.state_abbrev, s.state_name, s.region_code FROM places p JOIN states s ON p.state_fips = s.state_fips WHERE p.name_lower = 'baltimore' AND p.state_abbrev = 'MD' """) baltimore_result = cursor.fetchone() if baltimore_result: logger.info(f" ✅ Baltimore hierarchical lookup: {baltimore_result}") # Database size db_size = self.output_db.stat().st_size / (1024 * 1024) logger.info(f"Database size: {db_size:.1f} MB") logger.info("🎯 Hierarchical database validation complete!") def main(): """Build the enhanced hierarchical geographic database.""" import argparse parser = argparse.ArgumentParser(description="Build Enhanced Hierarchical Census Database") parser.add_argument("--source", required=True, help="Directory containing gazetteer files") parser.add_argument("--reference", required=True, help="Directory containing reference CSV files") parser.add_argument("--output", required=True, help="Output SQLite database file") args = parser.parse_args() processor = HierarchicalGazetteerProcessor(args.source, args.reference, args.output) processor.build_database() print(f"✅ Enhanced hierarchical geographic database built: {args.output}") print("\nWhat this enables:") print(" • 'Maryland' -> Resolves via states table") print(" • 'Counties in Maryland' -> Works via foreign key joins") print(" • 'Baltimore, MD' -> Full hierarchical context") print(" • Regional analysis -> Nations → Regions → Divisions → States") print("\nNext step: Update python_census_api.py to use this hierarchical database") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server