"""
CME data fetching API routes.
"""
import asyncio
from datetime import datetime
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException, BackgroundTasks
import structlog
from src.data.ingestion.fetcher import CMEDataFetcher
from src.data.ingestion.parser import CMEDataParser
from src.data.ingestion.cme_api_client import EnhancedCMEDataFetcher, CMEEventContractsAPI
from src.data.storage.repository import ContractRepository, TradeRepository
from src.data.models import get_db, Contract, ContractType
from src.data.storage.cache import cache
logger = structlog.get_logger()
router = APIRouter()
# Global state for tracking fetch operations
fetch_status = {
"is_fetching": False,
"last_fetch": None,
"last_error": None,
"records_processed": 0
}
@router.get("/cme/status")
async def get_cme_status():
"""Get current CME data fetch status."""
try:
# Return basic status without database queries to avoid greenlet issues
return {
"status": "success",
"fetch_status": fetch_status,
"database_stats": {
"contracts": "N/A - Database connection issue",
"trades": "N/A - Database connection issue"
},
"message": "Status available, database stats disabled due to configuration",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error("cme_status_error", error=str(e))
return {
"status": "error",
"fetch_status": fetch_status,
"database_stats": {
"contracts": 0,
"trades": 0
},
"error": str(e),
"timestamp": datetime.now().isoformat()
}
@router.post("/cme/fetch")
async def fetch_cme_data(background_tasks: BackgroundTasks, force_refresh: bool = False, use_api: bool = True):
"""Initiate CME data fetch operation with API optimization."""
if fetch_status["is_fetching"]:
return {
"status": "already_running",
"message": "CME data fetch is already in progress",
"fetch_status": fetch_status
}
# Check cache for recent fetch (unless force refresh)
if not force_refresh:
last_fetch_key = "cme_last_fetch"
last_fetch = await cache.get(last_fetch_key)
if last_fetch:
last_fetch_time = datetime.fromisoformat(last_fetch)
minutes_since = (datetime.now() - last_fetch_time).total_seconds() / 60
if minutes_since < 15: # Don't fetch more than once per 15 minutes
return {
"status": "recent_fetch",
"message": f"Data was fetched {minutes_since:.1f} minutes ago",
"last_fetch": last_fetch
}
# Start background fetch with API optimization
if use_api:
background_tasks.add_task(perform_enhanced_cme_fetch)
method = "Enhanced API + CSV"
else:
background_tasks.add_task(perform_cme_fetch)
method = "Legacy CSV only"
return {
"status": "initiated",
"message": f"CME data fetch started using {method}",
"estimated_duration": "30-60 seconds",
"method": method
}
async def perform_cme_fetch():
"""Perform the actual CME data fetch and processing."""
global fetch_status
fetch_status["is_fetching"] = True
fetch_status["last_error"] = None
fetch_status["records_processed"] = 0
try:
logger.info("cme_fetch_started")
# Step 1: Fetch data from CME
fetcher = CMEDataFetcher()
csv_data = await fetcher.fetch_csv()
if not csv_data:
raise Exception("Failed to fetch data from CME")
logger.info("cme_data_fetched", size=len(csv_data))
# Step 2: Parse and validate data
parser = CMEDataParser()
df = parser.parse_csv(csv_data)
df_transformed = parser.transform_cme_data(df)
df_validated = parser.validate_data(df_transformed)
logger.info("cme_data_processed", rows=len(df_validated))
# Step 3: Store in database
async with get_db() as session:
# Process contracts first
contract_symbols = df_validated['contract_symbol'].unique()
contract_mapping = {}
for symbol in contract_symbols:
# Check if contract exists
contract = await ContractRepository.get_by_symbol(session, symbol)
if not contract:
# Create new contract
instrument_name = df_validated[df_validated['contract_symbol'] == symbol].iloc[0].get('instrument_long_name', symbol)
contract = Contract(
symbol=symbol,
description=instrument_name,
contract_type=ContractType.PREDICTION_MARKET,
is_active=True,
metadata={
"source": "cme_fetch",
"created_at": datetime.now().isoformat()
}
)
contract = await ContractRepository.create(session, contract)
logger.info("contract_created", symbol=symbol)
contract_mapping[symbol] = contract.id
# Process trades
trades = parser.to_trade_models(df_validated, contract_mapping)
# Batch insert trades (with duplicate handling)
new_trades = 0
for trade in trades:
existing = await TradeRepository.get_by_trade_id(session, trade.trade_id)
if not existing:
await TradeRepository.create(session, trade)
new_trades += 1
fetch_status["records_processed"] = new_trades
await session.commit()
logger.info("cme_data_stored", new_trades=new_trades, total_trades=len(trades))
# Update cache
await cache.set("cme_last_fetch", datetime.now().isoformat(), expire=3600)
fetch_status["last_fetch"] = datetime.now().isoformat()
logger.info("cme_fetch_completed", records=fetch_status["records_processed"])
except Exception as e:
fetch_status["last_error"] = str(e)
logger.error("cme_fetch_error", error=str(e))
finally:
fetch_status["is_fetching"] = False
async def perform_enhanced_cme_fetch():
"""Perform enhanced CME data fetch using Reference Data API + CSV."""
global fetch_status
fetch_status["is_fetching"] = True
fetch_status["last_error"] = None
fetch_status["records_processed"] = 0
try:
logger.info("enhanced_cme_fetch_started")
# Use enhanced fetcher with API optimization
enhanced_fetcher = EnhancedCMEDataFetcher()
combined_data = await enhanced_fetcher.fetch_optimized_data()
# Check if fetch returned valid data
if not combined_data:
raise Exception("Enhanced fetch returned no data")
if "error" in combined_data:
raise Exception(f"Enhanced fetch failed: {combined_data['error']}")
# Safely get data with defaults
api_data = combined_data.get("api_data") or {}
csv_data = combined_data.get("csv_data") or b""
# Log with safe access
api_contracts_count = len(api_data.get("contracts", [])) if api_data else 0
csv_size = len(csv_data) if csv_data else 0
logger.info("enhanced_cme_data_fetched",
api_contracts=api_contracts_count,
csv_size=csv_size)
# Process API data first (contracts and instruments)
async with get_db() as session:
new_contracts = 0
new_instruments = 0
if api_data and "contracts" in api_data and api_data["contracts"]:
# Process event contracts from API
for contract_info in api_data["contracts"]:
existing = await ContractRepository.get_by_symbol(session, contract_info["guid"])
if not existing:
contract = Contract(
symbol=contract_info["guid"],
description=contract_info.get("name", contract_info.get("description", "")),
contract_type=ContractType.PREDICTION_MARKET,
is_active=contract_info.get("status") == "ACTIVE",
metadata={
"source": "cme_api",
"subtype": contract_info.get("subtype"),
"fixPayout": contract_info.get("fixPayout"),
"currency": contract_info.get("currency"),
"exchange": contract_info.get("exchange"),
"created_at": datetime.now().isoformat()
}
)
await ContractRepository.create(session, contract)
new_contracts += 1
logger.info("api_contract_created", guid=contract_info["guid"])
# Process CSV trade data (if available)
new_trades = 0
if csv_data and len(csv_data) > 0:
try:
parser = CMEDataParser()
df = parser.parse_csv(csv_data)
if df is not None and not df.empty:
df_transformed = parser.transform_cme_data(df)
df_validated = parser.validate_data(df_transformed)
if df_validated is not None and not df_validated.empty:
# Get contract mapping for trade insertion
contract_symbols = df_validated['contract_symbol'].unique()
contract_mapping = {}
for symbol in contract_symbols:
contract = await ContractRepository.get_by_symbol(session, symbol)
if not contract:
# Create contract from CSV data
instrument_name = df_validated[df_validated['contract_symbol'] == symbol].iloc[0].get('instrument_long_name', symbol)
contract = Contract(
symbol=symbol,
description=instrument_name,
contract_type=ContractType.PREDICTION_MARKET,
is_active=True,
metadata={
"source": "cme_csv",
"created_at": datetime.now().isoformat()
}
)
contract = await ContractRepository.create(session, contract)
new_contracts += 1
contract_mapping[symbol] = contract.id
# Insert trades
trades = parser.to_trade_models(df_validated, contract_mapping)
for trade in trades:
existing = await TradeRepository.get_by_trade_id(session, trade.trade_id)
if not existing:
await TradeRepository.create(session, trade)
new_trades += 1
else:
logger.warning("CSV data validation failed or empty")
else:
logger.warning("CSV parsing failed or empty")
except Exception as csv_error:
logger.error("csv_processing_error", error=str(csv_error))
# Continue without failing the entire operation
else:
logger.info("No CSV data to process")
await session.commit()
fetch_status["records_processed"] = new_contracts + new_trades
logger.info("enhanced_cme_data_stored",
new_contracts=new_contracts,
new_trades=new_trades,
api_instruments=len(api_data.get("instruments", [])))
# Update cache with enhanced metadata
await cache.set("cme_last_fetch", datetime.now().isoformat(), ttl=3600)
await cache.set("cme_last_method", "enhanced_api_csv", ttl=3600)
fetch_status["last_fetch"] = datetime.now().isoformat()
logger.info("enhanced_cme_fetch_completed", records=fetch_status["records_processed"])
except Exception as e:
fetch_status["last_error"] = str(e)
logger.error("enhanced_cme_fetch_error", error=str(e))
finally:
fetch_status["is_fetching"] = False
@router.get("/cme/sample-data")
async def get_sample_data(limit: int = 10):
"""Get sample of recent CME data."""
async with get_db() as session:
# Get recent contracts
contracts = await ContractRepository.get_recent(session, limit=limit)
# Get recent trades
trades = await TradeRepository.get_recent(session, limit=limit)
return {
"status": "success",
"sample_contracts": [
{
"symbol": c.symbol,
"description": c.description,
"type": c.contract_type.value,
"active": c.is_active
} for c in contracts
],
"sample_trades": [
{
"trade_id": t.trade_id,
"timestamp": t.timestamp.isoformat(),
"price": float(t.price),
"volume": t.volume,
"contract_symbol": next((c.symbol for c in contracts if c.id == t.contract_id), "unknown")
} for t in trades
]
}
@router.delete("/cme/clear-cache")
async def clear_cme_cache():
"""Clear CME-related cache entries."""
try:
# Clear relevant cache keys
cache_keys = [
"cme_last_fetch",
"trading_data:*", # Trading data cache
"contract:*", # Contract cache
]
cleared = 0
for key_pattern in cache_keys:
if "*" in key_pattern:
# Pattern matching keys (Redis scan)
keys = await cache.scan(key_pattern)
for key in keys:
await cache.delete(key)
cleared += 1
else:
await cache.delete(key_pattern)
cleared += 1
return {
"status": "success",
"message": f"Cleared {cleared} cache entries"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@router.get("/cme/api-preview")
async def preview_api_data(limit: int = 5):
"""Preview CME Reference Data API results."""
try:
api_client = CMEEventContractsAPI()
# Get a small sample of event contracts
contracts = await api_client.fetch_event_contracts()
if not contracts:
return {
"status": "error",
"message": "Failed to fetch from CME Reference Data API"
}
# Get detailed info for first few contracts
preview_data = {
"total_contracts": len(contracts),
"sample_contracts": [],
"api_features": [
"Real-time contract metadata",
"Event contract classification (subtype=EVENT)",
"Fixed payout information",
"Tradeable reference product links",
"Comprehensive instrument details"
],
"optimization_benefits": [
"Faster contract discovery",
"More accurate metadata",
"Real-time status updates",
"Enhanced market structure data",
"Better integration with CME ecosystem"
]
}
# Sample first few contracts with enhanced details
for contract in contracts[:limit]:
contract_preview = {
"guid": contract.get("guid"),
"name": contract.get("name"),
"description": contract.get("description"),
"status": contract.get("status"),
"subtype": contract.get("subtype"),
"fixPayout": contract.get("fixPayout"),
"currency": contract.get("currency"),
"exchange": contract.get("exchange")
}
# Try to get instruments for this contract
instruments = await api_client.fetch_contract_instruments(contract.get("guid", ""))
if instruments:
contract_preview["instruments_count"] = len(instruments)
contract_preview["sample_instruments"] = [
{
"name": inst.get("name"),
"strike_price": inst.get("strikePrice"),
"type": inst.get("type")
} for inst in instruments[:3]
]
preview_data["sample_contracts"].append(contract_preview)
return {
"status": "success",
"data": preview_data,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error("cme_api_preview_error", error=str(e))
return {
"status": "error",
"message": f"API preview failed: {str(e)}"
}