Skip to main content
Glama
brockwebb

Open Census MCP Server

by brockwebb
geographic_handler.py9.76 kB
#!/usr/bin/env python3 """ Geographic Handler - FIXED - Simple and Effective Search Key fix: Use LOWER(name) instead of non-existent name_lower column Simplified search logic: find everything that matches, let Claude decide """ import sqlite3 import logging from pathlib import Path from typing import Dict, List, Optional, Any import re logger = logging.getLogger(__name__) class GeographicHandler: """ Database-driven geographic resolution with FIXED search patterns. """ def __init__(self, geography_db_path: str): """Initialize with path to geography database.""" self.db_path = Path(geography_db_path) if not self.db_path.exists(): raise FileNotFoundError(f"Geography database not found: {self.db_path}") self.conn = sqlite3.connect(str(self.db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row logger.info(f"✅ GeographicHandler initialized with {self.db_path}") def search_locations(self, location: str, max_results: int = 10) -> List[Dict[str, Any]]: """ Simple, effective search: find everything that matches, let Claude decide. """ if not location or not location.strip(): return [] location = location.strip() logger.info(f"🔍 Searching for: '{location}'") results = [] # Extract state context (e.g. "St. Louis, MO" -> state = "MO") state_filter = None if ',' in location: parts = location.split(',') if len(parts) == 2: main_location = parts[0].strip() potential_state = parts[1].strip().upper() if len(potential_state) == 2: state_filter = potential_state location = main_location # Search places results.extend(self._search_places(location, state_filter, max_results // 2)) # Search counties results.extend(self._search_counties(location, state_filter, max_results // 2)) # Search CBSAs results.extend(self._search_cbsas(location, max_results // 4)) # Search ZCTAs if it looks like a ZIP if self._is_zip_code(location): results.extend(self._search_zctas(location, max_results // 4)) # Remove duplicates and sort by confidence unique_results = self._deduplicate_results(results) sorted_results = sorted(unique_results, key=lambda x: x['confidence'], reverse=True) logger.info(f"✅ Found {len(sorted_results)} matches for '{location}'") return sorted_results[:max_results] def _search_places(self, name: str, state_filter: Optional[str], limit: int) -> List[Dict[str, Any]]: """Search places with simple LIKE matching.""" cursor = self.conn.cursor() if state_filter: query = """ SELECT 'place' as geography_type, name, state_abbrev, state_fips, place_fips FROM places WHERE LOWER(name) LIKE LOWER(?) AND state_abbrev = ? ORDER BY name LIMIT ? """ params = [f"%{name}%", state_filter, limit] else: query = """ SELECT 'place' as geography_type, name, state_abbrev, state_fips, place_fips FROM places WHERE LOWER(name) LIKE LOWER(?) ORDER BY name LIMIT ? """ params = [f"%{name}%", limit] cursor.execute(query, params) results = [] for row in cursor.fetchall(): confidence = self._calculate_confidence(name, row['name']) # Boost confidence for state matches if state_filter and row['state_abbrev'] == state_filter: confidence += 0.2 results.append({ 'geography_type': 'place', 'name': row['name'], 'state_abbrev': row['state_abbrev'], 'state_fips': row['state_fips'], 'place_fips': row['place_fips'], 'confidence': min(1.0, confidence) }) return results def _search_counties(self, name: str, state_filter: Optional[str], limit: int) -> List[Dict[str, Any]]: """Search counties with simple LIKE matching.""" cursor = self.conn.cursor() if state_filter: query = """ SELECT 'county' as geography_type, name, state_abbrev, state_fips, county_fips FROM counties WHERE LOWER(name) LIKE LOWER(?) AND state_abbrev = ? ORDER BY name LIMIT ? """ params = [f"%{name}%", state_filter, limit] else: query = """ SELECT 'county' as geography_type, name, state_abbrev, state_fips, county_fips FROM counties WHERE LOWER(name) LIKE LOWER(?) ORDER BY name LIMIT ? """ params = [f"%{name}%", limit] cursor.execute(query, params) results = [] for row in cursor.fetchall(): confidence = self._calculate_confidence(name, row['name']) # Boost confidence for state matches if state_filter and row['state_abbrev'] == state_filter: confidence += 0.2 results.append({ 'geography_type': 'county', 'name': row['name'], 'state_abbrev': row['state_abbrev'], 'state_fips': row['state_fips'], 'county_fips': row['county_fips'], 'confidence': min(1.0, confidence) }) return results def _search_cbsas(self, name: str, limit: int) -> List[Dict[str, Any]]: """Search metro/micro areas.""" cursor = self.conn.cursor() query = """ SELECT 'cbsa' as geography_type, name, cbsa_code FROM cbsas WHERE LOWER(name) LIKE LOWER(?) ORDER BY name LIMIT ? """ cursor.execute(query, [f"%{name}%", limit]) results = [] for row in cursor.fetchall(): results.append({ 'geography_type': 'cbsa', 'name': row['name'], 'cbsa_code': row['cbsa_code'], 'confidence': self._calculate_confidence(name, row['name']) }) return results def _search_zctas(self, zip_code: str, limit: int) -> List[Dict[str, Any]]: """Search ZIP Code Tabulation Areas.""" clean_zip = re.sub(r'[^0-9]', '', zip_code) if len(clean_zip) != 5: return [] cursor = self.conn.cursor() cursor.execute("SELECT zcta_code FROM zctas WHERE zcta_code = ? LIMIT ?", [clean_zip, limit]) results = [] for row in cursor.fetchall(): results.append({ 'geography_type': 'zcta', 'name': f"ZIP Code {row['zcta_code']}", 'zcta_code': row['zcta_code'], 'confidence': 1.0 }) return results def _is_zip_code(self, location: str) -> bool: """Check if location looks like a ZIP code.""" clean_location = re.sub(r'[^0-9]', '', location) return len(clean_location) == 5 and clean_location.isdigit() def _calculate_confidence(self, query: str, match: str) -> float: """Simple confidence calculation.""" query_lower = query.lower().strip() match_lower = match.lower().strip() # Exact match if query_lower == match_lower: return 1.0 # Query is contained in match or vice versa if query_lower in match_lower: return 0.9 if match_lower in query_lower: return 0.8 # Partial overlap query_words = set(query_lower.split()) match_words = set(match_lower.split()) if query_words & match_words: # Any word overlap overlap = len(query_words & match_words) total = len(query_words | match_words) return 0.5 + (0.3 * overlap / total) return 0.3 # Minimal match def _deduplicate_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove duplicate results.""" seen = set() unique_results = [] for result in results: # Create unique key if result['geography_type'] == 'place': key = f"place_{result['state_fips']}_{result['place_fips']}" elif result['geography_type'] == 'county': key = f"county_{result['state_fips']}_{result['county_fips']}" elif result['geography_type'] == 'cbsa': key = f"cbsa_{result['cbsa_code']}" elif result['geography_type'] == 'zcta': key = f"zcta_{result['zcta_code']}" else: key = f"{result['geography_type']}_{result['name']}" if key not in seen: seen.add(key) unique_results.append(result) return unique_results def close(self): """Close database connection.""" if self.conn: self.conn.close() def __del__(self): """Cleanup database connection.""" self.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brockwebb/open-census-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server