Skip to main content
Glama
jjsteffen23

CME Prediction Markets MCP Server

by jjsteffen23
cme_routes.py18.7 kB
""" CME data fetching API routes. """ import asyncio from datetime import datetime from typing import Optional, Dict, Any from fastapi import APIRouter, HTTPException, BackgroundTasks import structlog from src.data.ingestion.fetcher import CMEDataFetcher from src.data.ingestion.parser import CMEDataParser from src.data.ingestion.cme_api_client import EnhancedCMEDataFetcher, CMEEventContractsAPI from src.data.storage.repository import ContractRepository, TradeRepository from src.data.models import get_db, Contract, ContractType from src.data.storage.cache import cache logger = structlog.get_logger() router = APIRouter() # Global state for tracking fetch operations fetch_status = { "is_fetching": False, "last_fetch": None, "last_error": None, "records_processed": 0 } @router.get("/cme/status") async def get_cme_status(): """Get current CME data fetch status.""" try: # Return basic status without database queries to avoid greenlet issues return { "status": "success", "fetch_status": fetch_status, "database_stats": { "contracts": "N/A - Database connection issue", "trades": "N/A - Database connection issue" }, "message": "Status available, database stats disabled due to configuration", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error("cme_status_error", error=str(e)) return { "status": "error", "fetch_status": fetch_status, "database_stats": { "contracts": 0, "trades": 0 }, "error": str(e), "timestamp": datetime.now().isoformat() } @router.post("/cme/fetch") async def fetch_cme_data(background_tasks: BackgroundTasks, force_refresh: bool = False, use_api: bool = True): """Initiate CME data fetch operation with API optimization.""" if fetch_status["is_fetching"]: return { "status": "already_running", "message": "CME data fetch is already in progress", "fetch_status": fetch_status } # Check cache for recent fetch (unless force refresh) if not force_refresh: last_fetch_key = "cme_last_fetch" last_fetch = await cache.get(last_fetch_key) if last_fetch: last_fetch_time = datetime.fromisoformat(last_fetch) minutes_since = (datetime.now() - last_fetch_time).total_seconds() / 60 if minutes_since < 15: # Don't fetch more than once per 15 minutes return { "status": "recent_fetch", "message": f"Data was fetched {minutes_since:.1f} minutes ago", "last_fetch": last_fetch } # Start background fetch with API optimization if use_api: background_tasks.add_task(perform_enhanced_cme_fetch) method = "Enhanced API + CSV" else: background_tasks.add_task(perform_cme_fetch) method = "Legacy CSV only" return { "status": "initiated", "message": f"CME data fetch started using {method}", "estimated_duration": "30-60 seconds", "method": method } async def perform_cme_fetch(): """Perform the actual CME data fetch and processing.""" global fetch_status fetch_status["is_fetching"] = True fetch_status["last_error"] = None fetch_status["records_processed"] = 0 try: logger.info("cme_fetch_started") # Step 1: Fetch data from CME fetcher = CMEDataFetcher() csv_data = await fetcher.fetch_csv() if not csv_data: raise Exception("Failed to fetch data from CME") logger.info("cme_data_fetched", size=len(csv_data)) # Step 2: Parse and validate data parser = CMEDataParser() df = parser.parse_csv(csv_data) df_transformed = parser.transform_cme_data(df) df_validated = parser.validate_data(df_transformed) logger.info("cme_data_processed", rows=len(df_validated)) # Step 3: Store in database async with get_db() as session: # Process contracts first contract_symbols = df_validated['contract_symbol'].unique() contract_mapping = {} for symbol in contract_symbols: # Check if contract exists contract = await ContractRepository.get_by_symbol(session, symbol) if not contract: # Create new contract instrument_name = df_validated[df_validated['contract_symbol'] == symbol].iloc[0].get('instrument_long_name', symbol) contract = Contract( symbol=symbol, description=instrument_name, contract_type=ContractType.PREDICTION_MARKET, is_active=True, metadata={ "source": "cme_fetch", "created_at": datetime.now().isoformat() } ) contract = await ContractRepository.create(session, contract) logger.info("contract_created", symbol=symbol) contract_mapping[symbol] = contract.id # Process trades trades = parser.to_trade_models(df_validated, contract_mapping) # Batch insert trades (with duplicate handling) new_trades = 0 for trade in trades: existing = await TradeRepository.get_by_trade_id(session, trade.trade_id) if not existing: await TradeRepository.create(session, trade) new_trades += 1 fetch_status["records_processed"] = new_trades await session.commit() logger.info("cme_data_stored", new_trades=new_trades, total_trades=len(trades)) # Update cache await cache.set("cme_last_fetch", datetime.now().isoformat(), expire=3600) fetch_status["last_fetch"] = datetime.now().isoformat() logger.info("cme_fetch_completed", records=fetch_status["records_processed"]) except Exception as e: fetch_status["last_error"] = str(e) logger.error("cme_fetch_error", error=str(e)) finally: fetch_status["is_fetching"] = False async def perform_enhanced_cme_fetch(): """Perform enhanced CME data fetch using Reference Data API + CSV.""" global fetch_status fetch_status["is_fetching"] = True fetch_status["last_error"] = None fetch_status["records_processed"] = 0 try: logger.info("enhanced_cme_fetch_started") # Use enhanced fetcher with API optimization enhanced_fetcher = EnhancedCMEDataFetcher() combined_data = await enhanced_fetcher.fetch_optimized_data() # Check if fetch returned valid data if not combined_data: raise Exception("Enhanced fetch returned no data") if "error" in combined_data: raise Exception(f"Enhanced fetch failed: {combined_data['error']}") # Safely get data with defaults api_data = combined_data.get("api_data") or {} csv_data = combined_data.get("csv_data") or b"" # Log with safe access api_contracts_count = len(api_data.get("contracts", [])) if api_data else 0 csv_size = len(csv_data) if csv_data else 0 logger.info("enhanced_cme_data_fetched", api_contracts=api_contracts_count, csv_size=csv_size) # Process API data first (contracts and instruments) async with get_db() as session: new_contracts = 0 new_instruments = 0 if api_data and "contracts" in api_data and api_data["contracts"]: # Process event contracts from API for contract_info in api_data["contracts"]: existing = await ContractRepository.get_by_symbol(session, contract_info["guid"]) if not existing: contract = Contract( symbol=contract_info["guid"], description=contract_info.get("name", contract_info.get("description", "")), contract_type=ContractType.PREDICTION_MARKET, is_active=contract_info.get("status") == "ACTIVE", metadata={ "source": "cme_api", "subtype": contract_info.get("subtype"), "fixPayout": contract_info.get("fixPayout"), "currency": contract_info.get("currency"), "exchange": contract_info.get("exchange"), "created_at": datetime.now().isoformat() } ) await ContractRepository.create(session, contract) new_contracts += 1 logger.info("api_contract_created", guid=contract_info["guid"]) # Process CSV trade data (if available) new_trades = 0 if csv_data and len(csv_data) > 0: try: parser = CMEDataParser() df = parser.parse_csv(csv_data) if df is not None and not df.empty: df_transformed = parser.transform_cme_data(df) df_validated = parser.validate_data(df_transformed) if df_validated is not None and not df_validated.empty: # Get contract mapping for trade insertion contract_symbols = df_validated['contract_symbol'].unique() contract_mapping = {} for symbol in contract_symbols: contract = await ContractRepository.get_by_symbol(session, symbol) if not contract: # Create contract from CSV data instrument_name = df_validated[df_validated['contract_symbol'] == symbol].iloc[0].get('instrument_long_name', symbol) contract = Contract( symbol=symbol, description=instrument_name, contract_type=ContractType.PREDICTION_MARKET, is_active=True, metadata={ "source": "cme_csv", "created_at": datetime.now().isoformat() } ) contract = await ContractRepository.create(session, contract) new_contracts += 1 contract_mapping[symbol] = contract.id # Insert trades trades = parser.to_trade_models(df_validated, contract_mapping) for trade in trades: existing = await TradeRepository.get_by_trade_id(session, trade.trade_id) if not existing: await TradeRepository.create(session, trade) new_trades += 1 else: logger.warning("CSV data validation failed or empty") else: logger.warning("CSV parsing failed or empty") except Exception as csv_error: logger.error("csv_processing_error", error=str(csv_error)) # Continue without failing the entire operation else: logger.info("No CSV data to process") await session.commit() fetch_status["records_processed"] = new_contracts + new_trades logger.info("enhanced_cme_data_stored", new_contracts=new_contracts, new_trades=new_trades, api_instruments=len(api_data.get("instruments", []))) # Update cache with enhanced metadata await cache.set("cme_last_fetch", datetime.now().isoformat(), ttl=3600) await cache.set("cme_last_method", "enhanced_api_csv", ttl=3600) fetch_status["last_fetch"] = datetime.now().isoformat() logger.info("enhanced_cme_fetch_completed", records=fetch_status["records_processed"]) except Exception as e: fetch_status["last_error"] = str(e) logger.error("enhanced_cme_fetch_error", error=str(e)) finally: fetch_status["is_fetching"] = False @router.get("/cme/sample-data") async def get_sample_data(limit: int = 10): """Get sample of recent CME data.""" async with get_db() as session: # Get recent contracts contracts = await ContractRepository.get_recent(session, limit=limit) # Get recent trades trades = await TradeRepository.get_recent(session, limit=limit) return { "status": "success", "sample_contracts": [ { "symbol": c.symbol, "description": c.description, "type": c.contract_type.value, "active": c.is_active } for c in contracts ], "sample_trades": [ { "trade_id": t.trade_id, "timestamp": t.timestamp.isoformat(), "price": float(t.price), "volume": t.volume, "contract_symbol": next((c.symbol for c in contracts if c.id == t.contract_id), "unknown") } for t in trades ] } @router.delete("/cme/clear-cache") async def clear_cme_cache(): """Clear CME-related cache entries.""" try: # Clear relevant cache keys cache_keys = [ "cme_last_fetch", "trading_data:*", # Trading data cache "contract:*", # Contract cache ] cleared = 0 for key_pattern in cache_keys: if "*" in key_pattern: # Pattern matching keys (Redis scan) keys = await cache.scan(key_pattern) for key in keys: await cache.delete(key) cleared += 1 else: await cache.delete(key_pattern) cleared += 1 return { "status": "success", "message": f"Cleared {cleared} cache entries" } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") @router.get("/cme/api-preview") async def preview_api_data(limit: int = 5): """Preview CME Reference Data API results.""" try: api_client = CMEEventContractsAPI() # Get a small sample of event contracts contracts = await api_client.fetch_event_contracts() if not contracts: return { "status": "error", "message": "Failed to fetch from CME Reference Data API" } # Get detailed info for first few contracts preview_data = { "total_contracts": len(contracts), "sample_contracts": [], "api_features": [ "Real-time contract metadata", "Event contract classification (subtype=EVENT)", "Fixed payout information", "Tradeable reference product links", "Comprehensive instrument details" ], "optimization_benefits": [ "Faster contract discovery", "More accurate metadata", "Real-time status updates", "Enhanced market structure data", "Better integration with CME ecosystem" ] } # Sample first few contracts with enhanced details for contract in contracts[:limit]: contract_preview = { "guid": contract.get("guid"), "name": contract.get("name"), "description": contract.get("description"), "status": contract.get("status"), "subtype": contract.get("subtype"), "fixPayout": contract.get("fixPayout"), "currency": contract.get("currency"), "exchange": contract.get("exchange") } # Try to get instruments for this contract instruments = await api_client.fetch_contract_instruments(contract.get("guid", "")) if instruments: contract_preview["instruments_count"] = len(instruments) contract_preview["sample_instruments"] = [ { "name": inst.get("name"), "strike_price": inst.get("strikePrice"), "type": inst.get("type") } for inst in instruments[:3] ] preview_data["sample_contracts"].append(contract_preview) return { "status": "success", "data": preview_data, "timestamp": datetime.now().isoformat() } except Exception as e: logger.error("cme_api_preview_error", error=str(e)) return { "status": "error", "message": f"API preview failed: {str(e)}" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jjsteffen23/dk_mcp_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server