import pandas as pd
import io
from datetime import datetime
from typing import List, Dict, Any, Optional
import structlog
from src.data.models import Trade, Contract
from src.config import get_settings
logger = structlog.get_logger()
settings = get_settings()
class CMEDataParser:
"""Parses and validates CME CSV data."""
REQUIRED_COLUMNS = [
'trade_date', 'exec_ts', 'instrument_guid_long', 'instrument_long_name', 'count_clr_trade_id', 'qty', 'clr_px'
]
def __init__(self):
self.chunk_size = 10000
self.min_price = 0.01
self.max_price = 0.99
def parse_csv(self, csv_data: bytes) -> pd.DataFrame:
"""Parse CSV data with chunked processing."""
try:
df = pd.read_csv(
io.BytesIO(csv_data),
parse_dates=['trade_date', 'exec_ts'],
dtype={
'instrument_guid_long': str,
'instrument_long_name': str,
'clr_px': float,
'qty': int,
'count_clr_trade_id': int,
}
)
logger.info("csv_parsed", rows=len(df))
return df
except Exception as e:
logger.error("csv_parse_error", error=str(e))
raise
def transform_cme_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform CME data format to internal format."""
try:
# Create standardized columns
df_transformed = df.copy()
# Map CME columns to internal format
df_transformed['timestamp'] = df['exec_ts'] # Use execution timestamp as primary
df_transformed['contract_symbol'] = df['instrument_guid_long'] # Use GUID as symbol
df_transformed['price'] = df['clr_px'] # Clearing price
df_transformed['volume'] = df['qty'] # Quantity traded
df_transformed['trade_id'] = df['instrument_guid_long'].astype(str) + '_' + df['exec_ts'].dt.strftime('%Y%m%d_%H%M%S')
# Keep CME-specific fields
df_transformed['trade_date'] = df['trade_date']
df_transformed['instrument_long_name'] = df['instrument_long_name']
df_transformed['count_clr_trade_id'] = df['count_clr_trade_id']
logger.info("cme_data_transformed", rows=len(df_transformed))
return df_transformed
except Exception as e:
logger.error("cme_transform_error", error=str(e))
raise
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate data integrity."""
initial_rows = len(df)
# Check required columns
missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns)
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Remove rows with missing data
df = df.dropna(subset=self.REQUIRED_COLUMNS)
# Validate price range
df = df[
(df['price'] >= self.min_price) &
(df['price'] <= self.max_price)
]
# Remove duplicate trade_ids
df = df.drop_duplicates(subset=['trade_id'], keep='first')
# Validate timestamps
df = df[df['timestamp'].notna()]
final_rows = len(df)
dropped = initial_rows - final_rows
if dropped > 0:
logger.warning("data_validation_dropped_rows", dropped=dropped)
logger.info("data_validated", valid_rows=final_rows)
return df
def parse_incremental(
self,
csv_data: bytes,
last_timestamp: Optional[datetime] = None
) -> pd.DataFrame:
"""Parse only new data since last fetch."""
df = self.parse_csv(csv_data)
df = self.validate_data(df)
if last_timestamp:
df = df[df['timestamp'] > last_timestamp]
logger.info("incremental_parse", new_rows=len(df))
return df
def to_trade_models(self, df: pd.DataFrame, contract_mapping: Dict[str, int]) -> List[Trade]:
"""Convert DataFrame to Trade model instances."""
trades = []
for _, row in df.iterrows():
contract_id = contract_mapping.get(row['contract_symbol'])
if not contract_id:
logger.warning("unknown_contract", symbol=row['contract_symbol'])
continue
trade = Trade(
trade_id=row['trade_id'],
contract_id=contract_id,
timestamp=row['timestamp'],
price=row['price'],
volume=row['volume'],
side=row.get('side'),
trade_type=row.get('trade_type'),
)
trades.append(trade)
return trades