Skip to main content
Glama
jezweb

Australian Postcodes MCP Server

data_loader.py9.29 kB
#!/usr/bin/env python3 """Data loader for Australian Postcodes database.""" import sys import asyncio import logging import csv from pathlib import Path from typing import Dict, List, Any import httpx from metaphone import doublemetaphone # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from database import Database from utils.config import Config logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DataLoader: """Load postcode data into database.""" def __init__(self): """Initialize data loader.""" self.database = Database() Config.ensure_directories() async def download_data(self) -> Path: """Download CSV data from GitHub.""" csv_path = Config.DATA_DIR / "postcodes.csv" logger.info(f"Downloading data from {Config.DATA_URL}") async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(Config.DATA_URL) response.raise_for_status() with open(csv_path, 'wb') as f: f.write(response.content) logger.info(f"Downloaded data to {csv_path}") return csv_path def parse_csv(self, csv_path: Path) -> List[Dict[str, Any]]: """Parse CSV file and extract relevant fields.""" records = [] with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: # Extract and clean data record = { 'postcode': row.get('postcode', '').strip(), 'locality': row.get('locality', '').strip(), 'state': row.get('state', '').strip().upper(), 'longitude': self._parse_float(row.get('long') or row.get('Long_precise')), 'latitude': self._parse_float(row.get('lat') or row.get('Lat_precise')), 'lga_name': row.get('lgaregion', '').strip() or None, 'lga_code': row.get('lgacode', '').strip() or None, 'sa3_name': row.get('sa3name', '').strip() or None, 'sa3_code': row.get('sa3', '').strip() or None, 'sa4_name': row.get('sa4name', '').strip() or None, 'sa4_code': row.get('sa4', '').strip() or None, 'region': row.get('region', '').strip() or None, 'electoral_division': row.get('electorate', '').strip() or None, 'altitude': self._parse_int(row.get('altitude')), 'phn_name': row.get('phn_name', '').strip() or None, 'phn_code': row.get('phn_code', '').strip() or None, } # Skip records without essential fields if record['postcode'] and record['locality'] and record['state']: records.append(record) logger.info(f"Parsed {len(records)} records from CSV") return records def _parse_float(self, value: str) -> float: """Parse float value from string.""" if not value: return None try: return float(value) except (ValueError, TypeError): return None def _parse_int(self, value: str) -> int: """Parse int value from string.""" if not value: return None try: return int(value) except (ValueError, TypeError): return None async def load_data(self, records: List[Dict[str, Any]]): """Load records into database.""" await self.database.init_schema() async with self.database.get_connection() as conn: # Clear existing data await conn.execute("DELETE FROM postcodes") await conn.execute("DELETE FROM phonetic_codes") # Insert postcodes insert_query = """ INSERT OR IGNORE INTO postcodes ( postcode, locality, state, longitude, latitude, lga_name, lga_code, sa3_name, sa3_code, sa4_name, sa4_code, region, electoral_division, altitude, phn_name, phn_code ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ # Batch insert batch_size = 500 for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] await conn.executemany(insert_query, [ ( r['postcode'], r['locality'], r['state'], r['longitude'], r['latitude'], r['lga_name'], r['lga_code'], r['sa3_name'], r['sa3_code'], r['sa4_name'], r['sa4_code'], r['region'], r['electoral_division'], r['altitude'], r['phn_name'], r['phn_code'] ) for r in batch ]) if (i + batch_size) % 2000 == 0: logger.info(f"Inserted {min(i + batch_size, len(records))} records") await conn.commit() logger.info("All postcodes inserted") # Generate phonetic codes await self._generate_phonetic_codes(conn, records) # Update FTS index await conn.execute(""" INSERT INTO postcodes_fts(locality, state, lga_name) SELECT locality, state, lga_name FROM postcodes """) await conn.commit() logger.info("FTS index updated") async def _generate_phonetic_codes(self, conn, records: List[Dict[str, Any]]): """Generate phonetic codes for fuzzy matching.""" logger.info("Generating phonetic codes...") # Get unique locality-state combinations unique_localities = {} for record in records: key = (record['locality'], record['state']) if key not in unique_localities: unique_localities[key] = True # Generate phonetic codes phonetic_query = """ INSERT OR IGNORE INTO phonetic_codes ( locality, state, primary_code, secondary_code ) VALUES (?, ?, ?, ?) """ phonetic_records = [] for (locality, state) in unique_localities.keys(): primary, secondary = doublemetaphone(locality) phonetic_records.append(( locality, state, primary or "", secondary or "" )) # Batch insert await conn.executemany(phonetic_query, phonetic_records) await conn.commit() logger.info(f"Generated {len(phonetic_records)} phonetic codes") async def verify_data(self): """Verify loaded data.""" stats = await self.database.get_statistics() logger.info("Database statistics:") logger.info(f" Total records: {stats['total_records']}") logger.info(f" Unique postcodes: {stats['unique_postcodes']}") logger.info(f" Unique suburbs: {stats['unique_suburbs']}") logger.info(f" Unique LGAs: {stats['unique_lgas']}") logger.info("Records by state:") for state, data in stats['by_state'].items(): logger.info(f" {state}: {data['postcodes']} postcodes, {data['suburbs']} suburbs") async def run(self, update: bool = False): """Run the data loading process.""" try: # Check if database exists and has data if not update and Config.DATABASE_PATH.exists(): stats = await self.database.get_statistics() if stats['total_records'] > 0: logger.info(f"Database already contains {stats['total_records']} records") logger.info("Use --update flag to force update") return # Download data csv_path = await self.download_data() # Parse CSV records = self.parse_csv(csv_path) # Load into database await self.load_data(records) # Verify await self.verify_data() logger.info("Data loading completed successfully!") except Exception as e: logger.error(f"Error loading data: {e}") raise finally: await self.database.disconnect() async def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser(description="Load Australian Postcodes data") parser.add_argument('--update', action='store_true', help='Force update existing data') args = parser.parse_args() loader = DataLoader() await loader.run(update=args.update) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jezweb/australian-postcodes-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server