Skip to main content
Glama
jjsteffen23

CME Prediction Markets MCP Server

by jjsteffen23
parser.py4.81 kB
import pandas as pd import io from datetime import datetime from typing import List, Dict, Any, Optional import structlog from src.data.models import Trade, Contract from src.config import get_settings logger = structlog.get_logger() settings = get_settings() class CMEDataParser: """Parses and validates CME CSV data.""" REQUIRED_COLUMNS = [ 'trade_date', 'exec_ts', 'instrument_guid_long', 'instrument_long_name', 'count_clr_trade_id', 'qty', 'clr_px' ] def __init__(self): self.chunk_size = 10000 self.min_price = 0.01 self.max_price = 0.99 def parse_csv(self, csv_data: bytes) -> pd.DataFrame: """Parse CSV data with chunked processing.""" try: df = pd.read_csv( io.BytesIO(csv_data), parse_dates=['trade_date', 'exec_ts'], dtype={ 'instrument_guid_long': str, 'instrument_long_name': str, 'clr_px': float, 'qty': int, 'count_clr_trade_id': int, } ) logger.info("csv_parsed", rows=len(df)) return df except Exception as e: logger.error("csv_parse_error", error=str(e)) raise def transform_cme_data(self, df: pd.DataFrame) -> pd.DataFrame: """Transform CME data format to internal format.""" try: # Create standardized columns df_transformed = df.copy() # Map CME columns to internal format df_transformed['timestamp'] = df['exec_ts'] # Use execution timestamp as primary df_transformed['contract_symbol'] = df['instrument_guid_long'] # Use GUID as symbol df_transformed['price'] = df['clr_px'] # Clearing price df_transformed['volume'] = df['qty'] # Quantity traded df_transformed['trade_id'] = df['instrument_guid_long'].astype(str) + '_' + df['exec_ts'].dt.strftime('%Y%m%d_%H%M%S') # Keep CME-specific fields df_transformed['trade_date'] = df['trade_date'] df_transformed['instrument_long_name'] = df['instrument_long_name'] df_transformed['count_clr_trade_id'] = df['count_clr_trade_id'] logger.info("cme_data_transformed", rows=len(df_transformed)) return df_transformed except Exception as e: logger.error("cme_transform_error", error=str(e)) raise def validate_data(self, df: pd.DataFrame) -> pd.DataFrame: """Validate data integrity.""" initial_rows = len(df) # Check required columns missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns) if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") # Remove rows with missing data df = df.dropna(subset=self.REQUIRED_COLUMNS) # Validate price range df = df[ (df['price'] >= self.min_price) & (df['price'] <= self.max_price) ] # Remove duplicate trade_ids df = df.drop_duplicates(subset=['trade_id'], keep='first') # Validate timestamps df = df[df['timestamp'].notna()] final_rows = len(df) dropped = initial_rows - final_rows if dropped > 0: logger.warning("data_validation_dropped_rows", dropped=dropped) logger.info("data_validated", valid_rows=final_rows) return df def parse_incremental( self, csv_data: bytes, last_timestamp: Optional[datetime] = None ) -> pd.DataFrame: """Parse only new data since last fetch.""" df = self.parse_csv(csv_data) df = self.validate_data(df) if last_timestamp: df = df[df['timestamp'] > last_timestamp] logger.info("incremental_parse", new_rows=len(df)) return df def to_trade_models(self, df: pd.DataFrame, contract_mapping: Dict[str, int]) -> List[Trade]: """Convert DataFrame to Trade model instances.""" trades = [] for _, row in df.iterrows(): contract_id = contract_mapping.get(row['contract_symbol']) if not contract_id: logger.warning("unknown_contract", symbol=row['contract_symbol']) continue trade = Trade( trade_id=row['trade_id'], contract_id=contract_id, timestamp=row['timestamp'], price=row['price'], volume=row['volume'], side=row.get('side'), trade_type=row.get('trade_type'), ) trades.append(trade) return trades

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jjsteffen23/dk_mcp_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server