Skip to main content
Glama
jjsteffen23

CME Prediction Markets MCP Server

by jjsteffen23
cme_api_client.py11.5 kB
""" Enhanced CME API client for Event Contracts using Reference Data API. """ import aiohttp import asyncio from datetime import datetime, timedelta from typing import Optional, Dict, Any, List import structlog import json from src.config import get_settings logger = structlog.get_logger() settings = get_settings() class CMEEventContractsAPI: """CME Reference Data API client for Event Contracts.""" def __init__(self): self.base_url = "https://api.refdata.cmegroup.com/v3" self.timeout = aiohttp.ClientTimeout(total=60) self.retry_attempts = getattr(settings, 'CME_RETRY_ATTEMPTS', 3) async def fetch_event_contracts(self) -> Optional[List[Dict[str, Any]]]: """Fetch all event contracts using Reference Data API.""" try: url = f"{self.base_url}/products" params = { "subtype": "EVENT", # Filter for Event Contracts only "limit": 1000, # Increase limit for comprehensive data "includeExpired": "false" # Only active contracts } async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() contracts = data.get("data", []) logger.info("cme_event_contracts_fetched", count=len(contracts)) return contracts else: logger.error("cme_api_error", status=response.status, url=url) return None except Exception as e: logger.error("cme_api_fetch_error", error=str(e)) return None async def fetch_contract_instruments(self, product_guid: str) -> Optional[List[Dict[str, Any]]]: """Fetch instruments (options) for a specific event contract.""" try: url = f"{self.base_url}/products/{product_guid}/instruments" async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url) as response: if response.status == 200: data = await response.json() instruments = data.get("data", []) logger.info("cme_instruments_fetched", product_guid=product_guid, count=len(instruments)) return instruments else: logger.error("cme_instruments_error", status=response.status, product_guid=product_guid) return None except Exception as e: logger.error("cme_instruments_fetch_error", product_guid=product_guid, error=str(e)) return None async def fetch_reference_product(self, product_guid: str) -> Optional[Dict[str, Any]]: """Fetch the tradeable reference product for an event contract.""" try: # First get the product to find the referenceProduct link url = f"{self.base_url}/products/{product_guid}" async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(url) as response: if response.status == 200: data = await response.json() # Check for referenceProduct link links = data.get("_links", {}) ref_product_link = links.get("referenceProduct", {}).get("href") if ref_product_link: # Extract GUID from the link ref_guid = ref_product_link.split("/")[-1] # Fetch the reference product details async with session.get(f"{self.base_url}/products/{ref_guid}") as ref_response: if ref_response.status == 200: ref_data = await ref_response.json() logger.info("cme_reference_product_fetched", product_guid=product_guid, ref_guid=ref_guid) return ref_data return None except Exception as e: logger.error("cme_reference_product_error", product_guid=product_guid, error=str(e)) return None async def fetch_comprehensive_event_data(self) -> Dict[str, Any]: """Fetch comprehensive event contract data including instruments and references.""" try: logger.info("cme_comprehensive_fetch_started") # Step 1: Get all event contracts event_contracts = await self.fetch_event_contracts() if not event_contracts: return {"error": "Failed to fetch event contracts"} comprehensive_data = { "contracts": [], "instruments": [], "reference_products": [], "fetch_timestamp": datetime.now().isoformat(), "total_contracts": len(event_contracts) } # Step 2: For each contract, get instruments and reference products for contract in event_contracts[:50]: # Limit to 50 for initial optimization product_guid = contract.get("guid") if not product_guid: continue # Add contract info contract_info = { "guid": product_guid, "name": contract.get("name"), "description": contract.get("description"), "status": contract.get("status"), "subtype": contract.get("subtype"), "fixPayout": contract.get("fixPayout"), "currency": contract.get("currency"), "exchange": contract.get("exchange") } comprehensive_data["contracts"].append(contract_info) # Get instruments for this contract instruments = await self.fetch_contract_instruments(product_guid) if instruments: for instrument in instruments: instrument_info = { "product_guid": product_guid, "instrument_guid": instrument.get("guid"), "name": instrument.get("name"), "strike_price": instrument.get("strikePrice"), "expiry": instrument.get("expiry"), "type": instrument.get("type"), "status": instrument.get("status") } comprehensive_data["instruments"].append(instrument_info) # Get reference product (tradeable futures) reference_product = await self.fetch_reference_product(product_guid) if reference_product: ref_info = { "event_contract_guid": product_guid, "reference_guid": reference_product.get("guid"), "name": reference_product.get("name"), "description": reference_product.get("description"), "exchange": reference_product.get("exchange") } comprehensive_data["reference_products"].append(ref_info) # Add small delay to avoid rate limiting await asyncio.sleep(0.1) logger.info("cme_comprehensive_fetch_completed", contracts=len(comprehensive_data["contracts"]), instruments=len(comprehensive_data["instruments"]), references=len(comprehensive_data["reference_products"])) return comprehensive_data except Exception as e: logger.error("cme_comprehensive_fetch_error", error=str(e)) return {"error": str(e)} async def fetch_market_data_symbols(self) -> List[str]: """Get list of symbols for market data queries.""" try: contracts = await self.fetch_event_contracts() if not contracts: return [] symbols = [] for contract in contracts: # Extract symbol from name or use GUID symbol = contract.get("name", "").replace(" ", "_").upper() if symbol: symbols.append(symbol) return symbols[:100] # Limit to 100 most recent except Exception as e: logger.error("cme_symbols_fetch_error", error=str(e)) return [] class EnhancedCMEDataFetcher: """Enhanced fetcher that combines API and CSV approaches.""" def __init__(self): self.api_client = CMEEventContractsAPI() self.csv_url = getattr(settings, 'CME_DATA_URL', "https://www.cmegroup.com/market-data/files/Event_Contract_Swaps_TS.csv") self.retry_attempts = getattr(settings, 'CME_RETRY_ATTEMPTS', 3) async def fetch_optimized_data(self) -> Dict[str, Any]: """Fetch data using optimized approach - API for metadata, CSV for trades.""" try: logger.info("enhanced_cme_fetch_started") # Parallel fetch: API for contract metadata, CSV for trade data api_task = self.api_client.fetch_comprehensive_event_data() csv_task = self.fetch_csv_data() api_data, csv_data = await asyncio.gather(api_task, csv_task, return_exceptions=True) result = { "fetch_timestamp": datetime.now().isoformat(), "api_data": api_data if not isinstance(api_data, Exception) else {"error": str(api_data)}, "csv_data": csv_data if not isinstance(csv_data, Exception) else {"error": str(csv_data)}, "optimization": "combined_api_csv" } logger.info("enhanced_cme_fetch_completed") return result except Exception as e: logger.error("enhanced_cme_fetch_error", error=str(e)) return {"error": str(e)} async def fetch_csv_data(self) -> Optional[bytes]: """Fallback CSV fetch for trade data.""" try: async with aiohttp.ClientSession() as session: async with session.get(self.csv_url) as response: if response.status == 200: data = await response.read() logger.info("cme_csv_fetched", size=len(data)) return data return None except Exception as e: logger.error("cme_csv_fetch_error", error=str(e)) return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jjsteffen23/dk_mcp_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server