Skip to main content
Glama
jjsteffen23

CME Prediction Markets MCP Server

by jjsteffen23
load_sample_data.py5.5 kB
#!/usr/bin/env python3 """ Load sample data from CME CSV into the database. """ import asyncio import csv from datetime import datetime import structlog from sqlalchemy import text from src.config import get_settings from src.data.models import init_db, get_db from src.data.storage.repository import ContractRepository, TradeRepository from src.data.models import Contract, Trade, ContractType logger = structlog.get_logger() async def load_sample_data(): """Load sample data from CME CSV file.""" # Initialize database await init_db() logger.info("Loading CME sample data...") # Read CSV file csv_file = "Event_Contract_Swaps_TS.csv" try: with open(csv_file, 'r') as f: reader = csv.DictReader(f) rows = list(reader) logger.info("csv_loaded", rows=len(rows)) except Exception as e: logger.error("csv_load_error", error=str(e)) return contracts_created = 0 trades_created = 0 async with get_db() as session: # Get unique contracts unique_contracts = {} for row in rows: guid = row['instrument_guid_long'] if guid not in unique_contracts: unique_contracts[guid] = { 'guid': guid, 'name': row['instrument_long_name'] } logger.info("unique_contracts_found", count=len(unique_contracts)) # Create contracts for guid, contract_data in unique_contracts.items(): try: # Check if contract already exists result = await session.execute( text("SELECT id FROM contracts WHERE symbol = :symbol"), {"symbol": guid} ) existing = result.first() if existing: logger.info("contract_exists", symbol=guid) continue # Create new contract contract = Contract( symbol=guid, description=contract_data['name'], contract_type=ContractType.EVENT, settlement_date=datetime(2025, 12, 22), is_active=True, is_settled=False, instrument_guid_long=guid, instrument_long_name=contract_data['name'] ) session.add(contract) await session.flush() contracts_created += 1 logger.info("contract_created", symbol=contract.symbol, id=contract.id) except Exception as e: logger.error("contract_creation_error", error=str(e), symbol=guid) await session.commit() # Create trades (limit to first 100 for performance) for row in rows[:100]: try: # Get contract result = await session.execute( text("SELECT id FROM contracts WHERE symbol = :symbol"), {"symbol": row['instrument_guid_long']} ) contract_result = result.first() if not contract_result: continue contract_id = contract_result[0] # Parse timestamps exec_ts_str = row['exec_ts'].replace(' UTC', '') exec_ts = datetime.fromisoformat(exec_ts_str.replace(' ', 'T')) trade_date = datetime.strptime(row['trade_date'], '%Y-%m-%d') # Create unique trade ID trade_id = f"{row['instrument_guid_long']}_{row['count_clr_trade_id']}_{exec_ts.strftime('%Y%m%d_%H%M%S')}" # Check if trade exists result = await session.execute( text("SELECT id FROM trades WHERE trade_id = :trade_id"), {"trade_id": trade_id} ) if result.first(): continue # Create trade trade = Trade( trade_id=trade_id, contract_id=contract_id, instrument_guid_long=row['instrument_guid_long'], count_clr_trade_id=int(row['count_clr_trade_id']), trade_date=trade_date, timestamp=exec_ts, price=float(row['clr_px']), volume=int(row['qty']), side="BUY", trade_type="MARKET" ) session.add(trade) trades_created += 1 except Exception as e: logger.error("trade_creation_error", error=str(e), row=row) await session.commit() logger.info("sample_data_loaded", contracts=contracts_created, trades=trades_created) print(f"✅ Loaded {contracts_created} contracts and {trades_created} trades from CME data") if __name__ == "__main__": print("🚀 Starting CME data loading...") try: asyncio.run(load_sample_data()) print("✅ Data loading completed successfully!") except Exception as e: print(f"❌ Error loading data: {e}") import traceback traceback.print_exc()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jjsteffen23/dk_mcp_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server