import_csv.py•3.4 kB
#!/usr/bin/env python3
"""
Import CSV files into PostgreSQL database.
This script imports all CSV files from raw_data folders into the appropriate schemas.
"""
import os
import psycopg2
from dotenv import load_dotenv
import csv
# Load environment variables
load_dotenv()
# Database connection parameters
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': os.getenv('DB_PORT', '5432'),
'database': os.getenv('DB_NAME', 'mcp_database'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', 'postgres')
}
# Mapping of folders to schemas and tables
IMPORT_MAPPING = {
'cs': {
'customers.csv': 'cs.customers'
},
'dms': {
'documents.csv': 'dms.documents'
},
'los': {
'loans.csv': 'los.loans'
},
'mls': {
'contracts.csv': 'mls.contracts'
}
}
def import_csv_to_table(cursor, csv_path, table_name):
"""Import a CSV file into a database table."""
try:
with open(csv_path, 'r', encoding='utf-8') as f:
# Read first line to get headers
reader = csv.reader(f)
headers = next(reader)
# Reset file pointer
f.seek(0)
next(reader) # Skip header again
# Create COPY command
copy_sql = f"COPY {table_name} ({','.join(headers)}) FROM STDIN WITH CSV HEADER"
# Reset file for copy
f.seek(0)
cursor.copy_expert(copy_sql, f)
print(f"✓ Imported {csv_path} -> {table_name}")
return True
except Exception as e:
print(f"✗ Error importing {csv_path}: {e}")
return False
def main():
"""Main import function."""
print("Starting CSV import process...")
# Connect to database
try:
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
print(f"✓ Connected to database: {DB_CONFIG['database']}")
except Exception as e:
print(f"✗ Failed to connect to database: {e}")
return
raw_data_dir = 'raw_data'
total_imported = 0
total_failed = 0
# Import each CSV file
for folder, files in IMPORT_MAPPING.items():
folder_path = os.path.join(raw_data_dir, folder)
if not os.path.exists(folder_path):
print(f"⚠ Folder not found: {folder_path}")
continue
print(f"\nProcessing {folder} schema...")
for csv_file, table_name in files.items():
csv_path = os.path.join(folder_path, csv_file)
if not os.path.exists(csv_path):
print(f"⚠ File not found: {csv_path}")
continue
if import_csv_to_table(cursor, csv_path, table_name):
total_imported += 1
else:
total_failed += 1
# Commit changes
try:
conn.commit()
print(f"\n✓ Successfully committed all changes")
except Exception as e:
conn.rollback()
print(f"\n✗ Failed to commit: {e}")
# Close connection
cursor.close()
conn.close()
print(f"\n{'='*50}")
print(f"Import Summary:")
print(f" Successful: {total_imported}")
print(f" Failed: {total_failed}")
print(f"{'='*50}")
if __name__ == "__main__":
main()