#!/usr/bin/env python3
import json
import boto3
import os
import glob
from datetime import datetime
import uuid
import argparse
import sys
from pathlib import Path
# Add parent directory to path so we can import from app
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.core.config import settings
def parse_date(date_str: str) -> datetime:
"""Parse a date string from JSON file."""
try:
return datetime.fromisoformat(date_str)
except ValueError:
# Handle different date formats if needed
print(f"Warning: Could not parse date {date_str}")
return datetime.now()
def process_file(file_path: str, dynamodb_table):
"""Process a single JSON file and upload events to DynamoDB."""
file_name = os.path.basename(file_path)
print(f"Processing {file_name}...")
with open(file_path, 'r', encoding='utf-8') as f:
events = json.load(f)
if not events:
print(f"No events found in {file_name}, skipping.")
return 0
count = 0
with dynamodb_table.batch_writer() as batch:
for event in events:
# Skip if no date or empty event
if not event.get('date'):
continue
# Format the item for DynamoDB
item = {
'id': str(uuid.uuid4()),
'date': event.get('date'),
'title': event.get('title', ''),
'location': event.get('location', ''),
'artists': event.get('artists', []),
'url': event.get('url', '')
}
batch.put_item(Item=item)
count += 1
print(f"Imported {count} events from {file_name}")
return count
def main():
parser = argparse.ArgumentParser(description='Import Berghain event data to DynamoDB')
parser.add_argument('--table', type=str, help='DynamoDB table name (overrides settings)')
parser.add_argument('--region', type=str, help='AWS region (overrides settings)')
parser.add_argument('--path', type=str, default='../events', help='Path to JSON files directory')
parser.add_argument('--endpoint-url', type=str, help='DynamoDB endpoint URL (for local development)')
args = parser.parse_args()
# Use arguments or fallback to settings
table_name = args.table or settings.DYNAMODB_TABLE
region = args.region or settings.AWS_REGION
print(f"Connecting to DynamoDB table '{table_name}' in region '{region}'...")
# Configure DynamoDB client
dynamodb_kwargs = {'region_name': region}
if args.endpoint_url:
dynamodb_kwargs['endpoint_url'] = args.endpoint_url
print(f"Using local DynamoDB at: {args.endpoint_url}")
dynamodb = boto3.resource('dynamodb', **dynamodb_kwargs)
table = dynamodb.Table(table_name)
# Get all JSON files in the specified directory
json_pattern = os.path.join(args.path, 'berghain_events_*.json')
json_files = glob.glob(json_pattern)
if not json_files:
print(f"No JSON files found in {args.path}")
return
print(f"Found {len(json_files)} JSON files")
# Process each file
total_events = 0
for file_path in sorted(json_files):
events_count = process_file(file_path, table)
total_events += events_count
print(f"Import complete. Imported {total_events} events from {len(json_files)} files.")
if __name__ == "__main__":
main()