Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
geographic_handler_broken.py31.8 kB
#!/usr/bin/env python3 """ Complete Geographic Handler - Fixed for Actual Database Schema Supports ALL Census geography levels with spatial analysis capabilities SCHEMA FIXES: - Removed funcstat filtering (column doesn't exist) - Fixed cbsa_type mapping ('1'=Metro, '2'=Micro) - Fixed county adjacency column names - Handles actual database structure COMPLETE COVERAGE: - 32,285 places (cities, towns, villages) - 3,222 counties (parishes, boroughs) - 935 CBSAs (metro/micro statistical areas) - 33,791 ZCTAs (ZIP Code Tabulation Areas) - County adjacency for spatial queries - Hot cache for instant lookups GEOGRAPHY LEVELS SUPPORTED: - us: National level - state: States + DC + territories - county: Counties, parishes, boroughs - place: Incorporated places - zcta: ZIP Code Tabulation Areas - cbsa: Metropolitan/micropolitan areas - tract: Census tracts (with boundary data) """ import sqlite3 import logging from pathlib import Path from typing import Dict, Any, Optional, List, Tuple import re import json logger = logging.getLogger(__name__) class CompleteGeographicHandler: """Complete geographic resolution with full gazetteer dataset""" def __init__(self, db_path: Optional[Path] = None): if db_path is None: # Auto-detect geography database - check correct path current_dir = Path(__file__).parent project_root = current_dir.parent.parent # Check both possible locations possible_paths = [ project_root / "knowledge-base" / "geo-db" / "geography.db", project_root / "knowledge-base" / "geography.db" ] for path in possible_paths: if path.exists(): db_path = path break self.db_path = Path(db_path) if db_path else None self.conn = None self.hot_cache = {} # Initialize database and validate completeness self._init_complete_database() self._load_hot_cache() self._validate_geographic_coverage() def _init_complete_database(self): """Initialize complete geographic database with all tables""" try: if not self.db_path or not self.db_path.exists(): raise FileNotFoundError(f"Complete geography database not found: {self.db_path}") self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row # Verify all required tables exist cursor = self.conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} required_tables = { 'places', # Cities, towns, villages 'counties', # Counties, parishes, boroughs 'states', # States + DC + territories 'cbsas', # Metropolitan/micropolitan areas 'zctas', # ZIP Code Tabulation Areas 'county_adjacency', # Spatial relationships 'name_variations' # Fuzzy matching support } missing_tables = required_tables - tables if missing_tables: raise ValueError(f"Database missing required tables: {missing_tables}") logger.info(f"✅ Complete geographic database connected: {self.db_path}") logger.info(f"✅ All {len(required_tables)} required tables present") except Exception as e: logger.error(f"❌ Failed to initialize complete geographic database: {e}") raise def _validate_geographic_coverage(self): """Validate completeness of geographic coverage""" try: cursor = self.conn.cursor() # Count records in each table coverage = {} cursor.execute("SELECT COUNT(*) FROM places") coverage['places'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM counties") coverage['counties'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM states") coverage['states'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM cbsas") coverage['cbsas'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM zctas") coverage['zctas'] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM county_adjacency") coverage['adjacencies'] = cursor.fetchone()[0] # Log coverage statistics logger.info("🌍 Complete Geographic Coverage:") logger.info(f" Places: {coverage['places']:,} (cities, towns, villages)") logger.info(f" Counties: {coverage['counties']:,} (counties, parishes, boroughs)") logger.info(f" States: {coverage['states']:,} (states + DC + territories)") logger.info(f" CBSAs: {coverage['cbsas']:,} (metro/micro areas)") logger.info(f" ZCTAs: {coverage['zctas']:,} (ZIP code areas)") logger.info(f" County adjacencies: {coverage['adjacencies']:,} (spatial relationships)") # Validate expected ranges expected_minimums = { 'places': 25000, # Should have ~32K places 'counties': 3000, # Should have ~3.2K counties 'states': 50, # Should have 50+ (states + territories) 'cbsas': 900, # Should have ~935 CBSAs 'zctas': 30000, # Should have ~33K ZCTAs 'adjacencies': 15000 # Should have ~19K adjacencies } for table, count in coverage.items(): if table in expected_minimums and count < expected_minimums[table]: logger.warning(f"⚠️ Low {table} count: {count} (expected >{expected_minimums[table]})") else: logger.info(f"✅ {table.title()} coverage validated") self.coverage_stats = coverage except Exception as e: logger.error(f"❌ Geographic coverage validation failed: {e}") self.coverage_stats = {} def _load_hot_cache(self): """Load frequently accessed locations into hot cache - FIXED for actual schema""" if not self.conn: return try: cursor = self.conn.cursor() # Load major metropolitan areas for instant access # FIXED: Use actual cbsa_type values ('1' = Metro, '2' = Micro) cursor.execute(""" SELECT cbsa_code, name, cbsa_type, name_lower FROM cbsas WHERE cbsa_type = '1' ORDER BY CAST(COALESCE(land_area, 0) AS REAL) DESC LIMIT 100 """) for row in cursor.fetchall(): cache_key = f"cbsa:{row['name_lower']}" self.hot_cache[cache_key] = { 'geography_type': 'cbsa', 'cbsa_code': row['cbsa_code'], 'name': row['name'], 'cbsa_type': 'Metropolitan Statistical Area', # Convert to readable format 'resolution_method': 'hot_cache_cbsa' } # Load major cities for instant access # FIXED: Removed funcstat filter since column doesn't exist cursor.execute(""" SELECT place_fips, state_fips, state_abbrev, name, name_lower, population FROM places WHERE population IS NOT NULL ORDER BY CAST(COALESCE(population, 0) AS INTEGER) DESC LIMIT 500 """) for row in cursor.fetchall(): cache_key = f"place:{row['name_lower']},{row['state_abbrev'].lower()}" self.hot_cache[cache_key] = { 'geography_type': 'place', 'place_fips': row['place_fips'], 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['name'], 'population': row['population'], 'resolution_method': 'hot_cache_place' } # Load all states for instant access cursor.execute("SELECT state_fips, state_abbrev, state_name FROM states") for row in cursor.fetchall(): # Cache by full name cache_key = f"state:{row['state_name'].lower()}" self.hot_cache[cache_key] = { 'geography_type': 'state', 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['state_name'], 'resolution_method': 'hot_cache_state' } # Cache by abbreviation cache_key = f"state:{row['state_abbrev'].lower()}" self.hot_cache[cache_key] = { 'geography_type': 'state', 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['state_name'], 'resolution_method': 'hot_cache_state_abbrev' } logger.info(f"✅ Hot cache loaded: {len(self.hot_cache)} locations") except Exception as e: logger.warning(f"Failed to load hot cache: {e}") import traceback traceback.print_exc() self.hot_cache = {} def resolve_location(self, location_string: str) -> Dict[str, Any]: """ Complete location resolution supporting all Census geography levels Supports: - National: "United States", "USA", "national" - States: "New York", "NY", "California" - Places: "Austin, TX", "New York City", "St. Louis, MO" - Counties: "Cook County, IL", "Los Angeles County" - CBSAs: "San Francisco Bay Area", "Chicago metro", "Greater Boston" - ZCTAs: "90210", "10001", "ZIP code 78701" - Spatial: "counties near Baltimore", "places in Dallas metro" """ location = location_string.strip() # Handle national level if location.lower() in ['united states', 'usa', 'us', 'america', 'national', 'nationwide']: return { 'geography_type': 'us', 'state_fips': None, 'place_fips': None, 'county_fips': None, 'cbsa_code': None, 'zcta_code': None, 'name': 'United States', 'resolution_method': 'national_keyword' } # Multi-strategy resolution with hot cache priority strategies = [ self._hot_cache_lookup, self._zcta_lookup, # ZIP codes (high confidence patterns) self._cbsa_pattern_lookup, # Metro area patterns self._place_state_lookup, # "City, ST" format self._county_lookup, # County patterns self._state_lookup, # State names/abbreviations self._fuzzy_place_lookup, # Fuzzy place matching self._spatial_query_lookup # "near X", "in Y metro" ] for strategy in strategies: try: result = strategy(location) if result and 'error' not in result: return result except Exception as e: logger.warning(f"Strategy {strategy.__name__} failed for '{location}': {e}") continue # All strategies failed return self._fail_with_comprehensive_suggestions(location) def _hot_cache_lookup(self, location: str) -> Optional[Dict[str, Any]]: """Instant lookup from hot cache""" location_lower = location.lower() # Try direct cache hits cache_keys = [ f"place:{location_lower}", f"state:{location_lower}", f"cbsa:{location_lower}" ] for cache_key in cache_keys: if cache_key in self.hot_cache: return self.hot_cache[cache_key] return None def _zcta_lookup(self, location: str) -> Optional[Dict[str, Any]]: """ZIP Code Tabulation Area lookup""" # Pattern matching for ZIP codes zip_patterns = [ r'\b(\d{5})\b', # "90210" r'zip\s*code?\s*(\d{5})', # "ZIP code 90210" r'postal\s*code\s*(\d{5})', # "postal code 90210" r'zcta\s*(\d{5})' # "ZCTA 90210" ] for pattern in zip_patterns: match = re.search(pattern, location, re.IGNORECASE) if match: zcta_code = match.group(1) try: cursor = self.conn.cursor() cursor.execute(""" SELECT zcta_code, lat, lon FROM zctas WHERE zcta_code = ? """, (zcta_code,)) row = cursor.fetchone() if row: return { 'geography_type': 'zcta', 'zcta_code': row['zcta_code'], 'state_fips': None, # ZCTAs can cross state boundaries 'name': f"ZIP Code {row['zcta_code']}", 'lat': row['lat'], 'lon': row['lon'], 'resolution_method': 'zcta_pattern_match' } except Exception as e: logger.warning(f"ZCTA lookup failed for {zcta_code}: {e}") return None def _cbsa_pattern_lookup(self, location: str) -> Optional[Dict[str, Any]]: """Metropolitan/micropolitan statistical area lookup - FIXED for actual schema""" # Metro area patterns metro_patterns = [ r'(.+?)\s+metro(?:\s+area)?', # "Chicago metro" r'greater\s+(.+?)(?:\s|$)', # "Greater Boston" r'(.+?)\s+bay\s+area', # "San Francisco Bay Area" r'(.+?)\s+metropolitan\s+area', # "Dallas metropolitan area" r'(.+?)\s+msa', # "Austin MSA" r'(.+?)\s+region' # "Baltimore region" ] for pattern in metro_patterns: match = re.search(pattern, location, re.IGNORECASE) if match: metro_name = match.group(1).strip() try: cursor = self.conn.cursor() cursor.execute(""" SELECT cbsa_code, name, cbsa_type, lat, lon FROM cbsas WHERE name_lower LIKE ? OR name_lower LIKE ? ORDER BY CASE WHEN cbsa_type = '1' THEN 0 ELSE 1 END, LENGTH(name) LIMIT 1 """, (f"%{metro_name.lower()}%", f"{metro_name.lower()}%")) row = cursor.fetchone() if row: # Convert cbsa_type to readable format readable_type = 'Metropolitan Statistical Area' if row['cbsa_type'] == '1' else 'Micropolitan Statistical Area' return { 'geography_type': 'cbsa', 'cbsa_code': row['cbsa_code'], 'name': row['name'], 'cbsa_type': readable_type, 'lat': row['lat'], 'lon': row['lon'], 'resolution_method': 'cbsa_pattern_match' } except Exception as e: logger.warning(f"CBSA lookup failed for {metro_name}: {e}") return None def _place_state_lookup(self, location: str) -> Optional[Dict[str, Any]]: """City, State format lookup - FIXED to remove funcstat filter""" # Parse "City, ST" format place_state_pattern = r'^(.+?),\s*([A-Z]{2})$' match = re.match(place_state_pattern, location.strip()) if not match: return None place_name = match.group(1).strip() state_abbrev = match.group(2).upper() try: cursor = self.conn.cursor() # Exact match first - REMOVED funcstat filter cursor.execute(""" SELECT place_fips, state_fips, state_abbrev, name, population, lat, lon FROM places WHERE name_lower = ? AND state_abbrev = ? ORDER BY CAST(COALESCE(population, 0) AS INTEGER) DESC LIMIT 1 """, (place_name.lower(), state_abbrev)) row = cursor.fetchone() if row: return { 'geography_type': 'place', 'place_fips': row['place_fips'], 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['name'], 'population': row['population'], 'lat': row['lat'], 'lon': row['lon'], 'resolution_method': 'place_exact_match' } # Try with common place type suffixes place_types = ['city', 'town', 'village', 'borough'] for place_type in place_types: cursor.execute(""" SELECT place_fips, state_fips, state_abbrev, name, population, lat, lon FROM places WHERE name_lower = ? AND state_abbrev = ? ORDER BY CAST(COALESCE(population, 0) AS INTEGER) DESC LIMIT 1 """, (f"{place_name.lower()} {place_type}", state_abbrev)) row = cursor.fetchone() if row: return { 'geography_type': 'place', 'place_fips': row['place_fips'], 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['name'], 'population': row['population'], 'lat': row['lat'], 'lon': row['lon'], 'resolution_method': 'place_type_match' } except Exception as e: logger.warning(f"Place lookup failed for {place_name}, {state_abbrev}: {e}") return None def _county_lookup(self, location: str) -> Optional[Dict[str, Any]]: """County lookup with various naming patterns - FIXED schema""" # County patterns county_patterns = [ r'^(.+?)\s+county(?:\s*,\s*([A-Z]{2}))?$', # "Cook County, IL" r'^(.+?)\s+parish(?:\s*,\s*([A-Z]{2}))?$', # "Orleans Parish, LA" r'^(.+?)\s+borough(?:\s*,\s*([A-Z]{2}))?$' # "Bronx Borough, NY" ] for pattern in county_patterns: match = re.match(pattern, location.strip(), re.IGNORECASE) if match: county_name = match.group(1).strip() state_abbrev = match.group(2) try: cursor = self.conn.cursor() if state_abbrev: # State specified cursor.execute(""" SELECT county_fips, state_fips, state_abbrev, name, lat, lon FROM counties WHERE name_lower LIKE ? AND state_abbrev = ? ORDER BY LENGTH(name) LIMIT 1 """, (f"%{county_name.lower()}%", state_abbrev.upper())) else: # No state specified - find best match cursor.execute(""" SELECT county_fips, state_fips, state_abbrev, name, lat, lon FROM counties WHERE name_lower LIKE ? ORDER BY LENGTH(name) LIMIT 1 """, (f"%{county_name.lower()}%",)) row = cursor.fetchone() if row: return { 'geography_type': 'county', 'county_fips': row['county_fips'], 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['name'], 'lat': row['lat'], 'lon': row['lon'], 'resolution_method': 'county_pattern_match' } except Exception as e: logger.warning(f"County lookup failed for {county_name}: {e}") return None def _state_lookup(self, location: str) -> Optional[Dict[str, Any]]: """State lookup by name or abbreviation""" try: cursor = self.conn.cursor() # Try abbreviation first if len(location) == 2 and location.isalpha(): cursor.execute(""" SELECT state_fips, state_abbrev, state_name FROM states WHERE state_abbrev = ? """, (location.upper(),)) else: # Try full name cursor.execute(""" SELECT state_fips, state_abbrev, state_name FROM states WHERE LOWER(state_name) = ? """, (location.lower(),)) row = cursor.fetchone() if row: return { 'geography_type': 'state', 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['state_name'], 'resolution_method': 'state_direct_match' } except Exception as e: logger.warning(f"State lookup failed for {location}: {e}") return None def _fuzzy_place_lookup(self, location: str) -> Optional[Dict[str, Any]]: """Fuzzy matching for place names - FIXED to remove funcstat""" try: cursor = self.conn.cursor() # Generate name variations variations = self._generate_name_variations(location) for variation in variations: cursor.execute(""" SELECT place_fips, state_fips, state_abbrev, name, population, lat, lon FROM places WHERE name_lower = ? ORDER BY CAST(COALESCE(population, 0) AS INTEGER) DESC LIMIT 1 """, (variation.lower(),)) row = cursor.fetchone() if row: return { 'geography_type': 'place', 'place_fips': row['place_fips'], 'state_fips': row['state_fips'], 'state_abbrev': row['state_abbrev'], 'name': row['name'], 'population': row['population'], 'lat': row['lat'], 'lon': row['lon'], 'original_query': location, 'matched_variation': variation, 'resolution_method': 'fuzzy_place_match' } except Exception as e: logger.warning(f"Fuzzy place lookup failed for {location}: {e}") return None def _spatial_query_lookup(self, location: str) -> Optional[Dict[str, Any]]: """Handle spatial queries using county adjacency data""" # Spatial query patterns spatial_patterns = [ r'(?:counties|places)\s+near\s+(.+)', # "counties near Baltimore" r'(?:in|within)\s+(.+?)\s+metro', # "in Dallas metro" r'adjacent\s+to\s+(.+)', # "adjacent to Cook County" r'surrounding\s+(.+)' # "surrounding Austin" ] for pattern in spatial_patterns: match = re.search(pattern, location, re.IGNORECASE) if match: anchor_location = match.group(1).strip() # This would require complex spatial logic # For now, return a placeholder indicating spatial query detected return { 'geography_type': 'spatial', 'anchor_location': anchor_location, 'spatial_query': location, 'resolution_method': 'spatial_query_detected', 'note': 'Spatial queries require additional processing' } return None def _generate_name_variations(self, location: str) -> List[str]: """Generate common variations of location names""" variations = [location] location_lower = location.lower() # Saint <-> St. variations if location_lower.startswith('saint '): variations.append('St. ' + location[6:]) variations.append('St ' + location[6:]) elif location_lower.startswith('st. '): variations.append('Saint ' + location[4:]) elif location_lower.startswith('st '): variations.append('Saint ' + location[3:]) variations.append('St. ' + location[3:]) # Remove/add periods if '.' in location: variations.append(location.replace('.', '')) else: variations.append(location.replace(' St ', ' St. ')) # Mount <-> Mt. variations if location_lower.startswith('mount '): variations.append('Mt. ' + location[6:]) elif location_lower.startswith('mt. '): variations.append('Mount ' + location[4:]) return list(set(variations)) # Remove duplicates def _fail_with_comprehensive_suggestions(self, location: str) -> Dict[str, Any]: """Generate comprehensive suggestions when all resolution strategies fail""" suggestions = [] try: cursor = self.conn.cursor() # Find partial matches across all geography types search_term = f"%{location.lower()}%" # Places cursor.execute(""" SELECT 'place' as type, name || ', ' || state_abbrev as suggestion FROM places WHERE name_lower LIKE ? ORDER BY CAST(COALESCE(population, 0) AS INTEGER) DESC LIMIT 3 """, (search_term,)) suggestions.extend([f"{row['suggestion']}" for row in cursor.fetchall()]) # States cursor.execute(""" SELECT 'state' as type, state_name as suggestion FROM states WHERE LOWER(state_name) LIKE ? LIMIT 2 """, (search_term,)) suggestions.extend([row['suggestion'] for row in cursor.fetchall()]) # Counties cursor.execute(""" SELECT 'county' as type, name || ', ' || state_abbrev as suggestion FROM counties WHERE name_lower LIKE ? ORDER BY LENGTH(name) LIMIT 2 """, (search_term,)) suggestions.extend([row['suggestion'] for row in cursor.fetchall()]) # CBSAs cursor.execute(""" SELECT 'cbsa' as type, name as suggestion FROM cbsas WHERE name_lower LIKE ? ORDER BY CASE WHEN cbsa_type = '1' THEN 0 ELSE 1 END LIMIT 2 """, (search_term,)) suggestions.extend([row['suggestion'] for row in cursor.fetchall()]) except Exception as e: logger.warning(f"Error generating suggestions: {e}") return { 'error': f"Could not resolve location: {location}", 'resolution_method': 'comprehensive_failure', 'suggestions': suggestions[:8], # Limit to top 8 suggestions 'help': { 'formats': [ "City, ST (Austin, TX)", "State name (California)", "County name (Cook County, IL)", "ZIP code (90210)", "Metro area (San Francisco Bay Area)" ], 'coverage': f"Database contains {self.coverage_stats.get('places', 0):,} places, " f"{self.coverage_stats.get('counties', 0):,} counties, " f"{self.coverage_stats.get('cbsas', 0):,} metro areas" } } def get_adjacent_counties(self, county_fips: str, state_fips: str) -> List[Dict[str, Any]]: """Get counties adjacent to specified county - FIXED for actual schema""" if not self.conn: return [] try: cursor = self.conn.cursor() # FIXED: Use actual column names from county_adjacency table # Build the full county GEOID (state_fips + county_fips) county_geoid = f"{state_fips}{county_fips}" cursor.execute(""" SELECT DISTINCT ca.neighbor_geoid, ca.neighbor_name FROM county_adjacency ca WHERE ca.county_geoid = ? ORDER BY ca.neighbor_name """, (county_geoid,)) adjacent_counties = [] for row in cursor.fetchall(): # Parse neighbor_geoid to get state and county FIPS neighbor_geoid = row['neighbor_geoid'] if len(neighbor_geoid) == 5: neighbor_state_fips = neighbor_geoid[:2] neighbor_county_fips = neighbor

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server