#!/usr/bin/env python3
"""
Load sample data from CME CSV into the database.
"""
import asyncio
import csv
from datetime import datetime
import structlog
from sqlalchemy import text
from src.config import get_settings
from src.data.models import init_db, get_db
from src.data.storage.repository import ContractRepository, TradeRepository
from src.data.models import Contract, Trade, ContractType
logger = structlog.get_logger()
async def load_sample_data():
"""Load sample data from CME CSV file."""
# Initialize database
await init_db()
logger.info("Loading CME sample data...")
# Read CSV file
csv_file = "Event_Contract_Swaps_TS.csv"
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
logger.info("csv_loaded", rows=len(rows))
except Exception as e:
logger.error("csv_load_error", error=str(e))
return
contracts_created = 0
trades_created = 0
async with get_db() as session:
# Get unique contracts
unique_contracts = {}
for row in rows:
guid = row['instrument_guid_long']
if guid not in unique_contracts:
unique_contracts[guid] = {
'guid': guid,
'name': row['instrument_long_name']
}
logger.info("unique_contracts_found", count=len(unique_contracts))
# Create contracts
for guid, contract_data in unique_contracts.items():
try:
# Check if contract already exists
result = await session.execute(
text("SELECT id FROM contracts WHERE symbol = :symbol"),
{"symbol": guid}
)
existing = result.first()
if existing:
logger.info("contract_exists", symbol=guid)
continue
# Create new contract
contract = Contract(
symbol=guid,
description=contract_data['name'],
contract_type=ContractType.EVENT,
settlement_date=datetime(2025, 12, 22),
is_active=True,
is_settled=False,
instrument_guid_long=guid,
instrument_long_name=contract_data['name']
)
session.add(contract)
await session.flush()
contracts_created += 1
logger.info("contract_created", symbol=contract.symbol, id=contract.id)
except Exception as e:
logger.error("contract_creation_error", error=str(e), symbol=guid)
await session.commit()
# Create trades (limit to first 100 for performance)
for row in rows[:100]:
try:
# Get contract
result = await session.execute(
text("SELECT id FROM contracts WHERE symbol = :symbol"),
{"symbol": row['instrument_guid_long']}
)
contract_result = result.first()
if not contract_result:
continue
contract_id = contract_result[0]
# Parse timestamps
exec_ts_str = row['exec_ts'].replace(' UTC', '')
exec_ts = datetime.fromisoformat(exec_ts_str.replace(' ', 'T'))
trade_date = datetime.strptime(row['trade_date'], '%Y-%m-%d')
# Create unique trade ID
trade_id = f"{row['instrument_guid_long']}_{row['count_clr_trade_id']}_{exec_ts.strftime('%Y%m%d_%H%M%S')}"
# Check if trade exists
result = await session.execute(
text("SELECT id FROM trades WHERE trade_id = :trade_id"),
{"trade_id": trade_id}
)
if result.first():
continue
# Create trade
trade = Trade(
trade_id=trade_id,
contract_id=contract_id,
instrument_guid_long=row['instrument_guid_long'],
count_clr_trade_id=int(row['count_clr_trade_id']),
trade_date=trade_date,
timestamp=exec_ts,
price=float(row['clr_px']),
volume=int(row['qty']),
side="BUY",
trade_type="MARKET"
)
session.add(trade)
trades_created += 1
except Exception as e:
logger.error("trade_creation_error", error=str(e), row=row)
await session.commit()
logger.info("sample_data_loaded", contracts=contracts_created, trades=trades_created)
print(f"✅ Loaded {contracts_created} contracts and {trades_created} trades from CME data")
if __name__ == "__main__":
print("🚀 Starting CME data loading...")
try:
asyncio.run(load_sample_data())
print("✅ Data loading completed successfully!")
except Exception as e:
print(f"❌ Error loading data: {e}")
import traceback
traceback.print_exc()