Skip to main content
Glama
jjsteffen23

CME Prediction Markets MCP Server

by jjsteffen23
dk_mcp_1_md.md81.1 kB
# CME Prediction Markets MCP Server - Complete Implementation ## Table of Contents 1. [Configuration Files](#configuration-files) 2. [Database Models](#database-models) 3. [Data Infrastructure](#data-infrastructure) 4. [MCP Server Core](#mcp-server-core) 5. [Claim Verification](#claim-verification) 6. [Communication Integrations](#communication-integrations) 7. [API Routes](#api-routes) 8. [Utilities](#utilities) 9. [Main Application](#main-application) 10. [Scripts](#scripts) 11. [Testing](#testing) --- ## Configuration Files ### pyproject.toml ```toml [tool.poetry] name = "cme-mcp-server" version = "1.0.0" description = "MCP Server for CME Prediction Markets Claim Verification" authors = ["Your Team"] readme = "README.md" [tool.poetry.dependencies] python = "^3.11" fastapi = "^0.109.0" uvicorn = {extras = ["standard"], version = "^0.27.0"} sqlalchemy = "^2.0.25" alembic = "^1.13.1" asyncpg = "^0.29.0" psycopg2-binary = "^2.9.9" redis = "^5.0.1" pandas = "^2.2.0" numpy = "^1.26.3" pydantic = "^2.5.3" pydantic-settings = "^2.1.0" python-dateutil = "^2.8.2" requests = "^2.31.0" aiohttp = "^3.9.1" celery = "^5.3.6" slack-sdk = "^3.26.2" python-multipart = "^0.0.6" structlog = "^24.1.0" prometheus-client = "^0.19.0" httpx = "^0.26.0" spacy = "^3.7.2" python-dotenv = "^1.0.0" pytz = "^2024.1" beautifulsoup4 = "^4.12.3" lxml = "^5.1.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.4" pytest-asyncio = "^0.23.3" pytest-cov = "^4.1.0" faker = "^22.0.0" black = "^24.1.1" ruff = "^0.1.14" mypy = "^1.8.0" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" [tool.black] line-length = 100 target-version = ['py311'] [tool.ruff] line-length = 100 target-version = "py311" [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] ``` ### .env.example ```bash # Database DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/cme_mcp DATABASE_POOL_SIZE=20 DATABASE_MAX_OVERFLOW=10 # Redis REDIS_URL=redis://localhost:6379/0 REDIS_CACHE_TTL=300 # CME Data CME_DATA_URL=https://www.cmegroup.com/market-data/files/Event_Contract_Swaps_TS.csv CME_FETCH_INTERVAL=300 CME_RETRY_ATTEMPTS=3 # Slack SLACK_BOT_TOKEN=xoxb-your-token SLACK_APP_TOKEN=xapp-your-token SLACK_SIGNING_SECRET=your-signing-secret SLACK_ALERT_CHANNEL=#cme-alerts SLACK_VERIFICATION_CHANNEL=#cme-verifications # Email EMAIL_IMAP_SERVER=imap.gmail.com EMAIL_IMAP_PORT=993 EMAIL_SMTP_SERVER=smtp.gmail.com EMAIL_SMTP_PORT=587 EMAIL_USERNAME=your-email@example.com EMAIL_PASSWORD=your-app-password EMAIL_MONITORED_FOLDER=CME_Claims # Celery CELERY_BROKER_URL=redis://localhost:6379/1 CELERY_RESULT_BACKEND=redis://localhost:6379/2 # API API_HOST=0.0.0.0 API_PORT=8000 API_WORKERS=4 API_CORS_ORIGINS=["http://localhost:3000"] # Logging LOG_LEVEL=INFO LOG_FORMAT=json # Security SECRET_KEY=your-secret-key-change-in-production ADMIN_API_KEY=your-admin-api-key # Monitoring PROMETHEUS_PORT=9090 ENABLE_METRICS=true ``` ### docker-compose.yml ```yaml version: '3.8' services: postgres: image: timescale/timescaledb:latest-pg15 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: cme_mcp ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 3s retries: 5 app: build: . command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload volumes: - .:/app ports: - "8000:8000" environment: - DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/cme_mcp - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/1 depends_on: postgres: condition: service_healthy redis: condition: service_healthy env_file: - .env celery_worker: build: . command: celery -A src.integrations.workflows.tasks worker --loglevel=info volumes: - .:/app environment: - DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/cme_mcp - REDIS_URL=redis://redis:6379/0 - CELERY_BROKER_URL=redis://redis:6379/1 depends_on: - postgres - redis env_file: - .env celery_beat: build: . command: celery -A src.integrations.workflows.tasks beat --loglevel=info volumes: - .:/app environment: - DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/cme_mcp - CELERY_BROKER_URL=redis://redis:6379/1 depends_on: - postgres - redis env_file: - .env volumes: postgres_data: redis_data: ``` ### Dockerfile ```dockerfile FROM python:3.11-slim WORKDIR /app # Install system dependencies RUN apt-get update && apt-get install -y \ gcc \ postgresql-client \ curl \ && rm -rf /var/lib/apt/lists/* # Install Poetry RUN pip install poetry==1.7.1 # Copy dependency files COPY pyproject.toml poetry.lock ./ # Install dependencies RUN poetry config virtualenvs.create false \ && poetry install --no-interaction --no-ansi --no-root # Download spaCy model RUN python -m spacy download en_core_web_sm # Copy application code COPY . . # Install the project RUN poetry install --no-interaction --no-ansi # Run migrations on startup (in production, do this separately) CMD ["sh", "-c", "alembic upgrade head && uvicorn src.main:app --host 0.0.0.0 --port 8000"] ``` ### .gitignore ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # Virtual environments venv/ ENV/ env/ .venv # IDEs .vscode/ .idea/ *.swp *.swo *~ # Environment variables .env .env.local .env.*.local # Database *.db *.sqlite3 # Logs *.log logs/ # Testing .pytest_cache/ .coverage htmlcov/ .tox/ # Docker *.pid # OS .DS_Store Thumbs.db # Project specific data/raw/ data/processed/ celerybeat-schedule ``` --- ## Database Models ### src/data/models/database.py ```python from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import declarative_base from sqlalchemy.pool import NullPool from contextlib import asynccontextmanager from typing import AsyncGenerator import structlog from src.config import get_settings logger = structlog.get_logger() settings = get_settings() Base = declarative_base() # Create async engine engine = create_async_engine( settings.DATABASE_URL, echo=settings.LOG_LEVEL == "DEBUG", pool_size=settings.DATABASE_POOL_SIZE, max_overflow=settings.DATABASE_MAX_OVERFLOW, pool_pre_ping=True, ) # Create session maker AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, ) async def get_db() -> AsyncGenerator[AsyncSession, None]: """Dependency for getting async database sessions.""" async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def init_db(): """Initialize database tables.""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created") async def close_db(): """Close database connections.""" await engine.dispose() logger.info("Database connections closed") ``` ### src/data/models/contracts.py ```python from sqlalchemy import Column, String, DateTime, Date, Enum, Float, Boolean, Integer from sqlalchemy.orm import relationship from datetime import datetime import enum from src.data.models.database import Base class ContractType(str, enum.Enum): CRYPTO = "crypto" EQUITY = "equity" COMMODITY = "commodity" FX = "fx" EVENT = "event" OTHER = "other" class Contract(Base): __tablename__ = "contracts" id = Column(Integer, primary_key=True, index=True) symbol = Column(String(100), unique=True, index=True, nullable=False) description = Column(String(500), nullable=False) contract_type = Column(Enum(ContractType), nullable=False, index=True) settlement_date = Column(Date, nullable=False, index=True) is_yes_contract = Column(Boolean, default=True) paired_contract_symbol = Column(String(100), nullable=True) # Metadata created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) is_active = Column(Boolean, default=True, index=True) is_settled = Column(Boolean, default=False, index=True) # Relationships trades = relationship("Trade", back_populates="contract", cascade="all, delete-orphan") settlement = relationship("Settlement", back_populates="contract", uselist=False) def __repr__(self): return f"<Contract(symbol='{self.symbol}', type='{self.contract_type}')>" ``` ### src/data/models/trades.py ```python from sqlalchemy import Column, String, DateTime, Float, Integer, ForeignKey, Index from sqlalchemy.orm import relationship from datetime import datetime from src.data.models.database import Base class Trade(Base): __tablename__ = "trades" id = Column(Integer, primary_key=True, index=True) trade_id = Column(String(100), unique=True, index=True, nullable=False) contract_id = Column(Integer, ForeignKey("contracts.id"), nullable=False, index=True) # Trade data timestamp = Column(DateTime, nullable=False, index=True) price = Column(Float, nullable=False) volume = Column(Integer, nullable=False) # Additional metadata side = Column(String(10), nullable=True) # BUY, SELL trade_type = Column(String(20), nullable=True) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) # Relationships contract = relationship("Contract", back_populates="trades") __table_args__ = ( Index('idx_contract_timestamp', 'contract_id', 'timestamp'), Index('idx_timestamp_price', 'timestamp', 'price'), ) def __repr__(self): return f"<Trade(id='{self.trade_id}', price={self.price}, volume={self.volume})>" ``` ### src/data/models/settlements.py ```python from sqlalchemy import Column, String, DateTime, Float, Integer, ForeignKey, Boolean from sqlalchemy.orm import relationship from datetime import datetime from src.data.models.database import Base class Settlement(Base): __tablename__ = "settlements" id = Column(Integer, primary_key=True, index=True) contract_id = Column(Integer, ForeignKey("contracts.id"), unique=True, nullable=False) # Settlement data settlement_date = Column(DateTime, nullable=False, index=True) settlement_price = Column(Float, nullable=False) # $0.00 or $1.00 outcome = Column(Boolean, nullable=False) # True = YES, False = NO # Verification verified = Column(Boolean, default=False) verification_source = Column(String(200), nullable=True) notes = Column(String(1000), nullable=True) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships contract = relationship("Contract", back_populates="settlement") def __repr__(self): return f"<Settlement(contract_id={self.contract_id}, outcome={self.outcome})>" ``` ### src/data/models/__init__.py ```python from src.data.models.database import Base, get_db, init_db, close_db from src.data.models.contracts import Contract, ContractType from src.data.models.trades import Trade from src.data.models.settlements import Settlement __all__ = [ "Base", "get_db", "init_db", "close_db", "Contract", "ContractType", "Trade", "Settlement", ] ``` --- ## Data Infrastructure ### src/data/ingestion/fetcher.py ```python import aiohttp import asyncio from datetime import datetime from typing import Optional import structlog from src.config import get_settings logger = structlog.get_logger() settings = get_settings() class CMEDataFetcher: """Fetches CME prediction market data with retry logic.""" def __init__(self): self.url = settings.CME_DATA_URL self.retry_attempts = settings.CME_RETRY_ATTEMPTS self.timeout = aiohttp.ClientTimeout(total=60) async def fetch_csv(self) -> Optional[bytes]: """Fetch CSV data from CME with retry logic.""" for attempt in range(self.retry_attempts): try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(self.url) as response: if response.status == 200: data = await response.read() logger.info( "cme_data_fetched", size=len(data), attempt=attempt + 1, ) return data else: logger.warning( "cme_fetch_failed", status=response.status, attempt=attempt + 1, ) except asyncio.TimeoutError: logger.error("cme_fetch_timeout", attempt=attempt + 1) except Exception as e: logger.error("cme_fetch_error", error=str(e), attempt=attempt + 1) if attempt < self.retry_attempts - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff logger.error("cme_fetch_failed_all_attempts") return None async def save_to_file(self, data: bytes, filepath: str) -> bool: """Save fetched data to file.""" try: with open(filepath, 'wb') as f: f.write(data) logger.info("cme_data_saved", filepath=filepath) return True except Exception as e: logger.error("cme_save_error", error=str(e), filepath=filepath) return False ``` ### src/data/ingestion/parser.py ```python import pandas as pd import io from datetime import datetime from typing import List, Dict, Any, Optional import structlog from src.data.models import Trade, Contract from src.config import get_settings logger = structlog.get_logger() settings = get_settings() class CMEDataParser: """Parses and validates CME CSV data.""" REQUIRED_COLUMNS = [ 'timestamp', 'contract_symbol', 'price', 'volume', 'trade_id' ] def __init__(self): self.chunk_size = 10000 self.min_price = 0.01 self.max_price = 0.99 def parse_csv(self, csv_data: bytes) -> pd.DataFrame: """Parse CSV data with chunked processing.""" try: df = pd.read_csv( io.BytesIO(csv_data), parse_dates=['timestamp'], dtype={ 'contract_symbol': str, 'price': float, 'volume': int, 'trade_id': str, } ) logger.info("csv_parsed", rows=len(df)) return df except Exception as e: logger.error("csv_parse_error", error=str(e)) raise def validate_data(self, df: pd.DataFrame) -> pd.DataFrame: """Validate data integrity.""" initial_rows = len(df) # Check required columns missing_cols = set(self.REQUIRED_COLUMNS) - set(df.columns) if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") # Remove rows with missing data df = df.dropna(subset=self.REQUIRED_COLUMNS) # Validate price range df = df[ (df['price'] >= self.min_price) & (df['price'] <= self.max_price) ] # Remove duplicate trade_ids df = df.drop_duplicates(subset=['trade_id'], keep='first') # Validate timestamps df = df[df['timestamp'].notna()] final_rows = len(df) dropped = initial_rows - final_rows if dropped > 0: logger.warning("data_validation_dropped_rows", dropped=dropped) logger.info("data_validated", valid_rows=final_rows) return df def parse_incremental( self, csv_data: bytes, last_timestamp: Optional[datetime] = None ) -> pd.DataFrame: """Parse only new data since last fetch.""" df = self.parse_csv(csv_data) df = self.validate_data(df) if last_timestamp: df = df[df['timestamp'] > last_timestamp] logger.info("incremental_parse", new_rows=len(df)) return df def to_trade_models(self, df: pd.DataFrame, contract_mapping: Dict[str, int]) -> List[Trade]: """Convert DataFrame to Trade model instances.""" trades = [] for _, row in df.iterrows(): contract_id = contract_mapping.get(row['contract_symbol']) if not contract_id: logger.warning("unknown_contract", symbol=row['contract_symbol']) continue trade = Trade( trade_id=row['trade_id'], contract_id=contract_id, timestamp=row['timestamp'], price=row['price'], volume=row['volume'], side=row.get('side'), trade_type=row.get('trade_type'), ) trades.append(trade) return trades ``` ### src/data/storage/repository.py ```python from sqlalchemy import select, and_, or_, func, desc, asc from sqlalchemy.ext.asyncio import AsyncSession from datetime import datetime, timedelta from typing import List, Optional, Dict, Any import structlog from src.data.models import Contract, Trade, Settlement, ContractType logger = structlog.get_logger() class ContractRepository: """Repository for Contract operations.""" @staticmethod async def create(session: AsyncSession, **kwargs) -> Contract: """Create a new contract.""" contract = Contract(**kwargs) session.add(contract) await session.flush() return contract @staticmethod async def get_by_symbol(session: AsyncSession, symbol: str) -> Optional[Contract]: """Get contract by symbol.""" result = await session.execute( select(Contract).where(Contract.symbol == symbol) ) return result.scalar_one_or_none() @staticmethod async def get_by_id(session: AsyncSession, contract_id: int) -> Optional[Contract]: """Get contract by ID.""" result = await session.execute( select(Contract).where(Contract.id == contract_id) ) return result.scalar_one_or_none() @staticmethod async def get_active_contracts(session: AsyncSession) -> List[Contract]: """Get all active contracts.""" result = await session.execute( select(Contract).where( and_( Contract.is_active == True, Contract.is_settled == False ) ) ) return list(result.scalars().all()) @staticmethod async def search_contracts( session: AsyncSession, search_term: str, contract_type: Optional[ContractType] = None ) -> List[Contract]: """Search contracts by description or symbol.""" conditions = [ or_( Contract.symbol.ilike(f"%{search_term}%"), Contract.description.ilike(f"%{search_term}%") ) ] if contract_type: conditions.append(Contract.contract_type == contract_type) result = await session.execute( select(Contract).where(and_(*conditions)) ) return list(result.scalars().all()) class TradeRepository: """Repository for Trade operations.""" @staticmethod async def create_bulk(session: AsyncSession, trades: List[Trade]) -> int: """Create multiple trades.""" session.add_all(trades) await session.flush() return len(trades) @staticmethod async def get_by_contract_and_timerange( session: AsyncSession, contract_id: int, start_time: datetime, end_time: datetime ) -> List[Trade]: """Get trades for a contract within a time range.""" result = await session.execute( select(Trade) .where( and_( Trade.contract_id == contract_id, Trade.timestamp >= start_time, Trade.timestamp <= end_time ) ) .order_by(Trade.timestamp) ) return list(result.scalars().all()) @staticmethod async def get_price_at_timestamp( session: AsyncSession, contract_id: int, timestamp: datetime, tolerance_minutes: int = 5 ) -> Optional[Trade]: """Get price at or near a specific timestamp.""" # Try exact timestamp first result = await session.execute( select(Trade) .where( and_( Trade.contract_id == contract_id, Trade.timestamp == timestamp ) ) .limit(1) ) trade = result.scalar_one_or_none() if trade: return trade # Try within tolerance window start = timestamp - timedelta(minutes=tolerance_minutes) end = timestamp + timedelta(minutes=tolerance_minutes) result = await session.execute( select(Trade) .where( and_( Trade.contract_id == contract_id, Trade.timestamp >= start, Trade.timestamp <= end ) ) .order_by( func.abs( func.extract('epoch', Trade.timestamp) - func.extract('epoch', timestamp) ) ) .limit(1) ) return result.scalar_one_or_none() @staticmethod async def get_latest_trade( session: AsyncSession, contract_id: int ) -> Optional[Trade]: """Get the most recent trade for a contract.""" result = await session.execute( select(Trade) .where(Trade.contract_id == contract_id) .order_by(desc(Trade.timestamp)) .limit(1) ) return result.scalar_one_or_none() @staticmethod async def get_ohlc( session: AsyncSession, contract_id: int, start_time: datetime, end_time: datetime ) -> Dict[str, float]: """Calculate OHLC (Open, High, Low, Close) for a time period.""" trades = await TradeRepository.get_by_contract_and_timerange( session, contract_id, start_time, end_time ) if not trades: return {} prices = [t.price for t in trades] return { 'open': trades[0].price, 'high': max(prices), 'low': min(prices), 'close': trades[-1].price, 'volume': sum(t.volume for t in trades), 'trades_count': len(trades) } class SettlementRepository: """Repository for Settlement operations.""" @staticmethod async def create(session: AsyncSession, **kwargs) -> Settlement: """Create a new settlement.""" settlement = Settlement(**kwargs) session.add(settlement) await session.flush() return settlement @staticmethod async def get_by_contract( session: AsyncSession, contract_id: int ) -> Optional[Settlement]: """Get settlement for a contract.""" result = await session.execute( select(Settlement).where(Settlement.contract_id == contract_id) ) return result.scalar_one_or_none() @staticmethod async def update_verification( session: AsyncSession, settlement_id: int, verified: bool, source: str, notes: Optional[str] = None ) -> Settlement: """Update settlement verification status.""" result = await session.execute( select(Settlement).where(Settlement.id == settlement_id) ) settlement = result.scalar_one() settlement.verified = verified settlement.verification_source = source settlement.notes = notes await session.flush() return settlement ``` ### src/data/storage/cache.py ```python import redis.asyncio as redis import json from typing import Optional, Any, List from datetime import timedelta import structlog from src.config import get_settings logger = structlog.get_logger() settings = get_settings() class RedisCache: """Redis caching layer.""" def __init__(self): self.client: Optional[redis.Redis] = None self.default_ttl = settings.REDIS_CACHE_TTL async def connect(self): """Initialize Redis connection.""" self.client = await redis.from_url( settings.REDIS_URL, encoding="utf-8", decode_responses=True ) logger.info("redis_connected") async def disconnect(self): """Close Redis connection.""" if self.client: await self.client.close() logger.info("redis_disconnected") async def get(self, key: str) -> Optional[Any]: """Get value from cache.""" if not self.client: return None try: value = await self.client.get(key) if value: return json.loads(value) except Exception as e: logger.error("cache_get_error", key=key, error=str(e)) return None async def set( self, key: str, value: Any, ttl: Optional[int] = None ) -> bool: """Set value in cache with TTL.""" if not self.client: return False try: ttl = ttl or self.default_ttl await self.client.setex( key, ttl, json.dumps(value, default=str) ) return True except Exception as e: logger.error("cache_set_error", key=key, error=str(e)) return False async def delete(self, key: str) -> bool: """Delete value from cache.""" if not self.client: return False try: await self.client.delete(key) return True except Exception as e: logger.error("cache_delete_error", key=key, error=str(e)) return False async def exists(self, key: str) -> bool: """Check if key exists in cache.""" if not self.client: return False try: return bool(await self.client.exists(key)) except Exception as e: logger.error("cache_exists_error", key=key, error=str(e)) return False async def get_or_set( self, key: str, factory_func, ttl: Optional[int] = None ) -> Optional[Any]: """Get from cache or execute factory function and cache result.""" value = await self.get(key) if value is not None: return value value = await factory_func() if value is not None: await self.set(key, value, ttl) return value # Global cache instance cache = RedisCache() ``` --- ## MCP Server Core ### src/mcp/server.py ```python from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass import structlog import json logger = structlog.get_logger() @dataclass class MCPTool: """MCP Tool definition.""" name: str description: str input_schema: Dict[str, Any] handler: Callable class MCPServer: """MCP Protocol Server implementation.""" def __init__(self): self.tools: Dict[str, MCPTool] = {} self.protocol_version = "2024-11-05" def register_tool(self, tool: MCPTool): """Register an MCP tool.""" self.tools[tool.name] = tool logger.info("mcp_tool_registered", tool_name=tool.name) async def handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle MCP initialize request.""" return { "protocolVersion": self.protocol_version, "capabilities": { "tools": { "supported": True } }, "serverInfo": { "name": "cme-mcp-server", "version": "1.0.0" } } async def handle_tools_list(self) -> Dict[str, Any]: """Handle tools/list request.""" tools_list = [ { "name": tool.name, "description": tool.description, "inputSchema": tool.input_schema } for tool in self.tools.values() ] return { "tools": tools_list } async def handle_tools_call( self, tool_name: str, arguments: Dict[str, Any] ) -> Dict[str, Any]: """Handle tools/call request.""" tool = self.tools.get(tool_name) if not tool: return { "isError": True, "content": [{ "type": "text", "text": f"Tool '{tool_name}' not found" }] } try: result = await tool.handler(arguments) return { "content": [{ "type": "text", "text": json.dumps(result, indent=2, default=str) }] } except Exception as e: logger.error("mcp_tool_error", tool=tool_name, error=str(e)) return { "isError": True, "content": [{ "type": "text", "text": f"Error executing tool: {str(e)}" }] } async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """Route MCP requests to appropriate handlers.""" method = request.get("method") params = request.get("params", {}) if method == "initialize": result = await self.handle_initialize(params) elif method == "tools/list": result = await self.handle_tools_list() elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) result = await self.handle_tools_call(tool_name, arguments) else: result = { "error": { "code": -32601, "message": f"Method not found: {method}" } } return { "jsonrpc": "2.0", "id": request.get("id"), "result": result } # Global MCP server instance mcp_server = MCPServer() ``` ### src/mcp/tools/query_trading_data.py ```python from typing import Dict, Any, Optional from datetime import datetime import structlog from src.data.models import get_db from src.data.storage.repository import ContractRepository, TradeRepository from src.data.storage.cache import cache logger = structlog.get_logger() async def query_trading_data(arguments: Dict[str, Any]) -> Dict[str, Any]: """ Query trading data for a contract within a time range. Args: contract_symbol: Contract symbol (e.g., "BTC_95000_YES") start_time: Start timestamp (ISO 8601) end_time: End timestamp (ISO 8601) aggregation: Optional aggregation level (minute, hour, day) """ contract_symbol = arguments.get("contract_symbol") start_time_str = arguments.get("start_time") end_time_str = arguments.get("end_time") aggregation = arguments.get("aggregation", "raw") if not all([contract_symbol, start_time_str, end_time_str]): return { "error": "Missing required parameters: contract_symbol, start_time, end_time" } try: start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00')) end_time = datetime.fromisoformat(end_time_str.replace('Z', '+00:00')) except ValueError as e: return {"error": f"Invalid timestamp format: {str(e)}"} # Check cache cache_key = f"trading_data:{contract_symbol}:{start_time}:{end_time}:{aggregation}" cached = await cache.get(cache_key) if cached: logger.info("cache_hit", key=cache_key) return cached async with get_db() as session: # Get contract contract = await ContractRepository.get_by_symbol(session, contract_symbol) if not contract: return {"error": f"Contract not found: {contract_symbol}"} # Get trades trades = await TradeRepository.get_by_contract_and_timerange( session, contract.id, start_time, end_time ) if not trades: return { "contract_symbol": contract_symbol, "start_time": start_time_str, "end_time": end_time_str, "trades": [], "summary": { "count": 0, "message": "No trades found in this time range" } } # Calculate OHLC ohlc = await TradeRepository.get_ohlc( session, contract.id, start_time, end_time ) # Format response result = { "contract_symbol": contract_symbol, "contract_description": contract.description, "start_time": start_time_str, "end_time": end_time_str, "trades": [ { "timestamp": trade.timestamp.isoformat(), "price": trade.price, "volume": trade.volume, "trade_id": trade.trade_id } for trade in trades[:1000] # Limit to 1000 trades ], "summary": { "total_trades": len(trades), "open": ohlc.get('open'), "high": ohlc.get('high'), "low": ohlc.get('low'), "close": ohlc.get('close'), "total_volume": ohlc.get('volume'), "vwap": sum(t.price * t.volume for t in trades) / sum(t.volume for t in trades) if trades else None } } # Cache for 5 minutes await cache.set(cache_key, result, ttl=300) return result # Tool schema for MCP QUERY_TRADING_DATA_SCHEMA = { "type": "object", "properties": { "contract_symbol": { "type": "string", "description": "Contract symbol (e.g., BTC_95000_YES)" }, "start_time": { "type": "string", "description": "Start timestamp in ISO 8601 format" }, "end_time": { "type": "string", "description": "End timestamp in ISO 8601 format" }, "aggregation": { "type": "string", "enum": ["raw", "minute", "hour", "day"], "description": "Data aggregation level (default: raw)" } }, "required": ["contract_symbol", "start_time", "end_time"] } ``` ### src/mcp/tools/verify_claim.py ```python from typing import Dict, Any import structlog from src.verification.nlp.claim_parser import ClaimParser from src.verification.query.executor import QueryExecutor from src.verification.logic.evaluator import ClaimEvaluator from src.verification.reporting.formatter import ReportFormatter logger = structlog.get_logger() async def verify_claim(arguments: Dict[str, Any]) -> Dict[str, Any]: """ Verify a natural language claim against CME data. Args: claim_text: The claim to verify (e.g., "Bitcoin reached 95 cents on Dec 15") context_time: Optional timestamp for context (ISO 8601) """ claim_text = arguments.get("claim_text") context_time = arguments.get("context_time") if not claim_text: return {"error": "Missing required parameter: claim_text"} logger.info("verifying_claim", claim=claim_text) try: # Parse the claim parser = ClaimParser() parsed_claim = await parser.parse(claim_text, context_time) if not parsed_claim.is_valid: return { "verdict": "INCONCLUSIVE", "confidence": 0, "reason": "Unable to parse claim", "parsing_errors": parsed_claim.errors } # Execute queries to get data executor = QueryExecutor() query_results = await executor.execute(parsed_claim) # Evaluate the claim evaluator = ClaimEvaluator() evaluation = await evaluator.evaluate(parsed_claim, query_results) # Format the report formatter = ReportFormatter() report = await formatter.format(parsed_claim, evaluation) return report except Exception as e: logger.error("claim_verification_error", error=str(e)) return { "verdict": "ERROR", "confidence": 0, "error": str(e) } # Tool schema for MCP VERIFY_CLAIM_SCHEMA = { "type": "object", "properties": { "claim_text": { "type": "string", "description": "Natural language claim to verify" }, "context_time": { "type": "string", "description": "Optional timestamp for context (ISO 8601 format)" } }, "required": ["claim_text"] } ``` ### src/mcp/tools/get_contract_info.py ```python from typing import Dict, Any import structlog from src.data.models import get_db from src.data.storage.repository import ContractRepository, TradeRepository from src.data.storage.cache import cache logger = structlog.get_logger() async def get_contract_info(arguments: Dict[str, Any]) -> Dict[str, Any]: """ Get detailed information about a contract. Args: contract_symbol: Contract symbol or search term """ search_term = arguments.get("contract_symbol") if not search_term: return {"error": "Missing required parameter: contract_symbol"} # Check cache cache_key = f"contract_info:{search_term}" cached = await cache.get(cache_key) if cached: return cached async with get_db() as session: # Try exact match first contract = await ContractRepository.get_by_symbol(session, search_term) # If not found, try search if not contract: contracts = await ContractRepository.search_contracts(session, search_term) if not contracts: return {"error": f"No contracts found matching: {search_term}"} if len(contracts) > 1: return { "multiple_matches": True, "contracts": [ { "symbol": c.symbol, "description": c.description, "type": c.contract_type.value } for c in contracts[:10] ], "message": "Multiple contracts found. Please specify exact symbol." } contract = contracts[0] # Get latest trade latest_trade = await TradeRepository.get_latest_trade(session, contract.id) result = { "symbol": contract.symbol, "description": contract.description, "type": contract.contract_type.value, "settlement_date": contract.settlement_date.isoformat(), "is_active": contract.is_active, "is_settled": contract.is_settled, "current_price": latest_trade.price if latest_trade else None, "last_trade_time": latest_trade.timestamp.isoformat() if latest_trade else None, "implied_probability": (latest_trade.price if latest_trade else None), "paired_contract": contract.paired_contract_symbol } # Cache for 1 minute await cache.set(cache_key, result, ttl=60) return result # Tool schema for MCP GET_CONTRACT_INFO_SCHEMA = { "type": "object", "properties": { "contract_symbol": { "type": "string", "description": "Contract symbol or search term" } }, "required": ["contract_symbol"] } ``` ### src/mcp/tools/__init__.py ```python from src.mcp.server import MCPTool, mcp_server from src.mcp.tools.query_trading_data import query_trading_data, QUERY_TRADING_DATA_SCHEMA from src.mcp.tools.verify_claim import verify_claim, VERIFY_CLAIM_SCHEMA from src.mcp.tools.get_contract_info import get_contract_info, GET_CONTRACT_INFO_SCHEMA def register_all_tools(): """Register all MCP tools with the server.""" mcp_server.register_tool(MCPTool( name="query_trading_data", description="Query historical trading data for a contract within a specified time range", input_schema=QUERY_TRADING_DATA_SCHEMA, handler=query_trading_data )) mcp_server.register_tool(MCPTool( name="verify_claim", description="Verify a natural language claim against CME prediction market data", input_schema=VERIFY_CLAIM_SCHEMA, handler=verify_claim )) mcp_server.register_tool(MCPTool( name="get_contract_info", description="Get detailed information about a specific contract or search for contracts", input_schema=GET_CONTRACT_INFO_SCHEMA, handler=get_contract_info )) __all__ = [ "register_all_tools", "query_trading_data", "verify_claim", "get_contract_info", ] ``` --- ## Claim Verification ### src/verification/nlp/claim_parser.py ```python from dataclasses import dataclass from typing import List, Optional, Dict, Any from datetime import datetime import re import structlog logger = structlog.get_logger() @dataclass class ParsedClaim: """Structured representation of a parsed claim.""" original_text: str is_valid: bool contracts: List[str] price_assertions: List[Dict[str, Any]] time_references: List[Dict[str, Any]] comparison_type: Optional[str] # "greater_than", "less_than", "equals", "reached" confidence: float errors: List[str] class ClaimParser: """Parse natural language claims into structured format.""" # Price patterns PRICE_PATTERNS = [ r'(\$?0?\.\d{1,2})', # $0.95, .95, 0.95 r'(\d{1,2})\s*cents?', # 95 cents r'(\d{1,2})¢', # 95¢ ] # Time patterns TIME_PATTERNS = [ r'on\s+(\w+\s+\d{1,2},?\s+\d{4})', # on December 15, 2024 r'on\s+(\w+\s+\d{1,2})', # on December 15 r'(yesterday|today|tomorrow)', # relative days r'at\s+(\d{1,2}:\d{2}\s*(?:AM|PM|am|pm)?)', # at 2:30 PM r'(this\s+(?:morning|afternoon|evening|week|month))', # this morning ] # Contract/asset patterns ASSET_PATTERNS = [ r'\b(BTC|Bitcoin)\b', r'\b(ETH|Ethereum)\b', r'\b(S&P\s*500|SPX)\b', r'\b(Gold|GLD)\b', ] # Comparison patterns COMPARISON_PATTERNS = { 'greater_than': [r'above', r'higher than', r'over', r'exceeded', r'surpassed'], 'less_than': [r'below', r'lower than', r'under', r'fell below', r'dropped below'], 'equals': [r'equal to', r'exactly', r'at'], 'reached': [r'reached', r'hit', r'touched'] } async def parse(self, claim_text: str, context_time: Optional[str] = None) -> ParsedClaim: """Parse a natural language claim.""" errors = [] contracts = [] price_assertions = [] time_references = [] comparison_type = None confidence = 1.0 # Extract contracts/assets for pattern in self.ASSET_PATTERNS: matches = re.findall(pattern, claim_text, re.IGNORECASE) contracts.extend(matches) if not contracts: errors.append("No recognizable contracts or assets found") confidence *= 0.5 # Extract prices for pattern in self.PRICE_PATTERNS: matches = re.findall(pattern, claim_text, re.IGNORECASE) for match in matches: try: # Normalize to decimal if 'cent' in claim_text.lower(): price = float(match) / 100 elif match.startswith(') or match.startswith('0.'): price = float(match.replace(', '')) else: price = float(match) / 100 price_assertions.append({ 'value': price, 'raw_text': match, 'valid': 0.01 <= price <= 0.99 }) except ValueError: errors.append(f"Could not parse price: {match}") if not price_assertions: errors.append("No price assertions found") confidence *= 0.5 # Extract time references for pattern in self.TIME_PATTERNS: matches = re.findall(pattern, claim_text, re.IGNORECASE) time_references.extend([ {'raw_text': m, 'type': 'absolute' if any(c.isdigit() for c in m) else 'relative'} for m in matches ]) if not time_references and not context_time: errors.append("No time reference found") confidence *= 0.7 # Detect comparison type for comp_type, patterns in self.COMPARISON_PATTERNS.items(): for pattern in patterns: if re.search(pattern, claim_text, re.IGNORECASE): comparison_type = comp_type break if comparison_type: break is_valid = len(errors) == 0 or confidence >= 0.6 return ParsedClaim( original_text=claim_text, is_valid=is_valid, contracts=list(set(contracts)), price_assertions=price_assertions, time_references=time_references, comparison_type=comparison_type, confidence=confidence, errors=errors ) ### src/verification/query/executor.py ```python from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import structlog from src.data.models import get_db from src.data.storage.repository import ContractRepository, TradeRepository from src.verification.nlp.claim_parser import ParsedClaim from src.verification.nlp.temporal_parser import TemporalParser logger = structlog.get_logger() class QueryExecutor: """Execute queries against the database based on parsed claims.""" def __init__(self): self.temporal_parser = TemporalParser() async def execute(self, parsed_claim: ParsedClaim) -> Dict[str, Any]: """Execute queries to gather evidence for the claim.""" results = { 'contracts_found': [], 'price_data': [], 'errors': [] } async with get_db() as session: # Find contracts for contract_name in parsed_claim.contracts: contracts = await ContractRepository.search_contracts( session, contract_name ) if contracts: results['contracts_found'].extend([ { 'symbol': c.symbol, 'description': c.description, 'type': c.contract_type.value } for c in contracts[:5] # Limit to 5 matches ]) else: results['errors'].append(f"Contract not found: {contract_name}") # If no contracts found, can't proceed if not results['contracts_found']: return results # Get time range time_range = self.temporal_parser.parse_time_references( parsed_claim.time_references ) if not time_range: results['errors'].append("Could not determine time range") return results # Query price data for each contract for contract_info in results['contracts_found']: contract = await ContractRepository.get_by_symbol( session, contract_info['symbol'] ) if not contract: continue # Get trades in time range trades = await TradeRepository.get_by_contract_and_timerange( session, contract.id, time_range['start'], time_range['end'] ) if trades: prices = [t.price for t in trades] results['price_data'].append({ 'contract_symbol': contract_info['symbol'], 'time_range': { 'start': time_range['start'].isoformat(), 'end': time_range['end'].isoformat() }, 'prices': { 'min': min(prices), 'max': max(prices), 'first': trades[0].price, 'last': trades[-1].price, 'avg': sum(prices) / len(prices) }, 'trade_count': len(trades), 'sample_trades': [ { 'timestamp': t.timestamp.isoformat(), 'price': t.price, 'volume': t.volume } for t in trades[:10] # First 10 trades ] }) else: results['errors'].append( f"No trades found for {contract_info['symbol']} in specified time range" ) return results ### src/verification/nlp/temporal_parser.py ```python from typing import Dict, Any, Optional, List from datetime import datetime, timedelta import re from dateutil import parser as dateutil_parser import structlog logger = structlog.get_logger() class TemporalParser: """Parse and resolve temporal references.""" def parse_time_references( self, time_refs: List[Dict[str, Any]], context_time: Optional[datetime] = None ) -> Optional[Dict[str, datetime]]: """Parse time references into absolute datetime range.""" if not time_refs: return None context = context_time or datetime.utcnow() for ref in time_refs: raw_text = ref['raw_text'] ref_type = ref['type'] try: if ref_type == 'absolute': # Try to parse as absolute date dt = dateutil_parser.parse(raw_text, fuzzy=True) return { 'start': dt - timedelta(hours=12), 'end': dt + timedelta(hours=12) } elif ref_type == 'relative': # Handle relative time expressions if 'yesterday' in raw_text.lower(): target = context - timedelta(days=1) return { 'start': target.replace(hour=0, minute=0, second=0), 'end': target.replace(hour=23, minute=59, second=59) } elif 'today' in raw_text.lower(): return { 'start': context.replace(hour=0, minute=0, second=0), 'end': context } elif 'this week' in raw_text.lower(): week_start = context - timedelta(days=context.weekday()) return { 'start': week_start.replace(hour=0, minute=0, second=0), 'end': context } elif 'this month' in raw_text.lower(): month_start = context.replace(day=1, hour=0, minute=0, second=0) return { 'start': month_start, 'end': context } except Exception as e: logger.warning("temporal_parse_error", text=raw_text, error=str(e)) continue # Default: last 24 hours return { 'start': context - timedelta(days=1), 'end': context } ### src/verification/logic/evaluator.py ```python from typing import Dict, Any from dataclasses import dataclass import structlog from src.verification.nlp.claim_parser import ParsedClaim logger = structlog.get_logger() @dataclass class Evaluation: """Evaluation result.""" verdict: str # TRUE, FALSE, PARTIALLY_TRUE, INCONCLUSIVE confidence: float # 0-100 evidence: List[Dict[str, Any]] reasoning: str class ClaimEvaluator: """Evaluate claims against query results.""" TOLERANCE = 0.02 # ±2 cents tolerance for "approximately" async def evaluate( self, parsed_claim: ParsedClaim, query_results: Dict[str, Any] ) -> Evaluation: """Evaluate a claim against query results.""" # Check if we have sufficient data if not query_results.get('price_data'): return Evaluation( verdict="INCONCLUSIVE", confidence=0, evidence=[], reasoning="Insufficient data: No price data found for the specified time range" ) # Get price assertions from claim if not parsed_claim.price_assertions: return Evaluation( verdict="INCONCLUSIVE", confidence=0, evidence=[], reasoning="No price assertions found in claim" ) claimed_price = parsed_claim.price_assertions[0]['value'] comparison_type = parsed_claim.comparison_type or 'reached' # Evaluate against actual data evidence = [] matches = 0 total_checks = 0 for price_data in query_results['price_data']: total_checks += 1 actual_prices = price_data['prices'] contract_symbol = price_data['contract_symbol'] # Check based on comparison type if comparison_type == 'reached': # Check if price reached or exceeded claimed value if actual_prices['max'] >= claimed_price - self.TOLERANCE: matches += 1 evidence.append({ 'type': 'supporting', 'contract': contract_symbol, 'claimed': claimed_price, 'actual_max': actual_prices['max'], 'message': f"Price reached ${actual_prices['max']:.2f}, meeting claim of ${claimed_price:.2f}" }) else: evidence.append({ 'type': 'contradicting', 'contract': contract_symbol, 'claimed': claimed_price, 'actual_max': actual_prices['max'], 'message': f"Price only reached ${actual_prices['max']:.2f}, below claimed ${claimed_price:.2f}" }) elif comparison_type == 'greater_than': if actual_prices['max'] > claimed_price: matches += 1 evidence.append({ 'type': 'supporting', 'contract': contract_symbol, 'message': f"Price exceeded ${claimed_price:.2f}, reaching ${actual_prices['max']:.2f}" }) elif comparison_type == 'less_than': if actual_prices['min'] < claimed_price: matches += 1 evidence.append({ 'type': 'supporting', 'contract': contract_symbol, 'message': f"Price fell below ${claimed_price:.2f}, reaching ${actual_prices['min']:.2f}" }) elif comparison_type == 'equals': if abs(actual_prices['avg'] - claimed_price) <= self.TOLERANCE: matches += 1 evidence.append({ 'type': 'supporting', 'contract': contract_symbol, 'message': f"Price was approximately ${actual_prices['avg']:.2f}, matching claimed ${claimed_price:.2f}" }) # Determine verdict if matches == total_checks and total_checks > 0: verdict = "TRUE" confidence = min(95, 70 + (parsed_claim.confidence * 25)) reasoning = "All price data supports the claim" elif matches > 0: verdict = "PARTIALLY_TRUE" confidence = min(70, 40 + (matches / total_checks * 30)) reasoning = f"Claim supported by {matches}/{total_checks} contracts" elif total_checks > 0: verdict = "FALSE" confidence = min(90, 60 + (parsed_claim.confidence * 30)) reasoning = "Price data contradicts the claim" else: verdict = "INCONCLUSIVE" confidence = 0 reasoning = "Insufficient data to evaluate claim" return Evaluation( verdict=verdict, confidence=confidence, evidence=evidence, reasoning=reasoning ) ### src/verification/reporting/formatter.py ```python from typing import Dict, Any from datetime import datetime import structlog from src.verification.nlp.claim_parser import ParsedClaim from src.verification.logic.evaluator import Evaluation logger = structlog.get_logger() class ReportFormatter: """Format verification results into structured reports.""" async def format( self, parsed_claim: ParsedClaim, evaluation: Evaluation ) -> Dict[str, Any]: """Format a complete verification report.""" return { 'verification_id': f"ver_{int(datetime.utcnow().timestamp())}", 'timestamp': datetime.utcnow().isoformat(), 'claim': { 'original_text': parsed_claim.original_text, 'parsed_contracts': parsed_claim.contracts, 'parsed_prices': [p['value'] for p in parsed_claim.price_assertions], 'parsed_times': [t['raw_text'] for t in parsed_claim.time_references], 'comparison_type': parsed_claim.comparison_type, 'parse_confidence': parsed_claim.confidence, 'parse_errors': parsed_claim.errors }, 'verdict': evaluation.verdict, 'confidence': evaluation.confidence, 'reasoning': evaluation.reasoning, 'evidence': evaluation.evidence, 'summary': self._generate_summary(parsed_claim, evaluation), 'metadata': { 'data_sources': ['CME Prediction Markets'], 'verification_method': 'Historical Price Comparison', 'limitations': [ 'Verification limited to available trading data', 'Timestamps may have ±5 minute tolerance', 'Price comparisons use ±$0.02 tolerance for approximate claims' ] } } def _generate_summary( self, parsed_claim: ParsedClaim, evaluation: Evaluation ) -> str: """Generate human-readable summary.""" verdict_text = { 'TRUE': 'is TRUE', 'FALSE': 'is FALSE', 'PARTIALLY_TRUE': 'is PARTIALLY TRUE', 'INCONCLUSIVE': 'is INCONCLUSIVE' }.get(evaluation.verdict, 'could not be determined') summary = f"The claim '{parsed_claim.original_text}' {verdict_text}. " summary += evaluation.reasoning if evaluation.evidence: summary += f" Based on {len(evaluation.evidence)} data point(s) from CME trading data." return summary --- ## Communication Integrations ### src/integrations/slack/bot.py ```python from slack_sdk.web.async_client import AsyncWebClient from slack_sdk.socket_mode.aiohttp import SocketModeClient from slack_sdk.socket_mode.request import SocketModeRequest from slack_sdk.socket_mode.response import SocketModeResponse import structlog from src.config import get_settings from src.integrations.slack.events import handle_message_event from src.integrations.slack.commands import handle_slash_command logger = structlog.get_logger() settings = get_settings() class SlackBot: """Slack bot for claim verification.""" def __init__(self): self.web_client = AsyncWebClient(token=settings.SLACK_BOT_TOKEN) self.socket_client = None self.running = False async def start(self): """Start the Slack bot.""" self.socket_client = SocketModeClient( app_token=settings.SLACK_APP_TOKEN, web_client=self.web_client ) self.socket_client.socket_mode_request_listeners.append(self.handle_request) await self.socket_client.connect() self.running = True logger.info("slack_bot_started") async def stop(self): """Stop the Slack bot.""" if self.socket_client: await self.socket_client.close() self.running = False logger.info("slack_bot_stopped") async def handle_request(self, client: SocketModeClient, req: SocketModeRequest): """Handle incoming Slack requests.""" if req.type == "events_api": # Acknowledge the request response = SocketModeResponse(envelope_id=req.envelope_id) await client.send_socket_mode_response(response) # Handle the event event = req.payload.get("event", {}) await handle_message_event(self.web_client, event) elif req.type == "slash_commands": # Acknowledge the request response = SocketModeResponse(envelope_id=req.envelope_id) await client.send_socket_mode_response(response) # Handle the command await handle_slash_command(self.web_client, req.payload) async def send_message( self, channel: str, text: str = None, blocks: list = None ): """Send a message to a Slack channel.""" try: await self.web_client.chat_postMessage( channel=channel, text=text, blocks=blocks ) except Exception as e: logger.error("slack_send_error", error=str(e)) ### src/integrations/slack/events.py ```python from slack_sdk.web.async_client import AsyncWebClient import structlog from src.mcp.tools.verify_claim import verify_claim from src.integrations.slack.blocks import format_verification_blocks from src.config import get_settings logger = structlog.get_logger() settings = get_settings() async def handle_message_event(client: AsyncWebClient, event: dict): """Handle Slack message events.""" event_type = event.get("type") if event_type == "app_mention": await handle_app_mention(client, event) elif event_type == "message": await handle_channel_message(client, event) async def handle_app_mention(client: AsyncWebClient, event: dict): """Handle when bot is mentioned.""" channel = event.get("channel") text = event.get("text", "") user = event.get("user") # Remove bot mention from text cleaned_text = text.split(">", 1)[-1].strip() logger.info("app_mention", user=user, text=cleaned_text) # Check if this looks like a claim if any(keyword in cleaned_text.lower() for keyword in ['verify', 'check', 'claim']): # Extract the claim claim_text = cleaned_text for prefix in ['verify', 'check', 'claim']: claim_text = claim_text.lower().replace(prefix, '').strip() if claim_text: # Send "processing" message await client.chat_postMessage( channel=channel, text=f"🔍 Verifying claim: _{claim_text}_" ) # Verify the claim result = await verify_claim({'claim_text': claim_text}) # Format and send result blocks = format_verification_blocks(result) await client.chat_postMessage( channel=channel, text=f"Verification complete: {result.get('verdict')}", blocks=blocks ) else: await client.chat_postMessage( channel=channel, text="Hi! I can verify claims against CME prediction market data. Try mentioning me with a claim to verify!" ) async def handle_channel_message(client: AsyncWebClient, event: dict): """Handle messages in monitored channels.""" channel = event.get("channel") # Only process in verification channel if channel != settings.SLACK_VERIFICATION_CHANNEL.lstrip('#'): return text = event.get("text", "") # Skip bot messages if event.get("bot_id"): return logger.info("channel_message", channel=channel, text=text) ### src/integrations/slack/blocks.py ```python from typing import Dict, Any, List def format_verification_blocks(verification_result: Dict[str, Any]) -> List[Dict[str, Any]]: """Format verification result as Slack blocks.""" verdict = verification_result.get('verdict', 'UNKNOWN') confidence = verification_result.get('confidence', 0) summary = verification_result.get('summary', 'No summary available') # Color based on verdict color_map = { 'TRUE': '#36a64f', # Green 'FALSE': '#ff0000', # Red 'PARTIALLY_TRUE': '#ff9900', # Orange 'INCONCLUSIVE': '#808080' # Gray } color = color_map.get(verdict, '#808080') # Emoji based on verdict emoji_map = { 'TRUE': '✅', 'FALSE': '❌', 'PARTIALLY_TRUE': '⚠️', 'INCONCLUSIVE': '❓' } emoji = emoji_map.get(verdict, '❓') blocks = [ { "type": "header", "text": { "type": "plain_text", "text": f"{emoji} Verification Result: {verdict}" } }, { "type": "section", "fields": [ { "type": "mrkdwn", "text": f"*Confidence:*\n{confidence:.1f}%" }, { "type": "mrkdwn", "text": f"*Verdict:*\n{verdict}" } ] }, { "type": "section", "text": { "type": "mrkdwn", "text": f"*Summary:*\n{summary}" } } ] # Add evidence if available evidence = verification_result.get('evidence', []) if evidence: evidence_text = "\n".join([ f"• {e.get('message', 'No details')}" for e in evidence[:5] # Limit to 5 items ]) blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"*Evidence:*\n{evidence_text}" } }) # Add context blocks.append({ "type": "context", "elements": [ { "type": "mrkdwn", "text": f"Verification ID: `{verification_result.get('verification_id', 'N/A')}`" } ] }) return blocks --- ## Main Application ### src/config.py ```python from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): """Application settings.""" # Database DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/cme_mcp" DATABASE_POOL_SIZE: int = 20 DATABASE_MAX_OVERFLOW: int = 10 # Redis REDIS_URL: str = "redis://localhost:6379/0" REDIS_CACHE_TTL: int = 300 # CME Data CME_DATA_URL: str = "https://www.cmegroup.com/market-data/files/Event_Contract_Swaps_TS.csv" CME_FETCH_INTERVAL: int = 300 CME_RETRY_ATTEMPTS: int = 3 # Slack SLACK_BOT_TOKEN: str = "" SLACK_APP_TOKEN: str = "" SLACK_SIGNING_SECRET: str = "" SLACK_ALERT_CHANNEL: str = "#cme-alerts" SLACK_VERIFICATION_CHANNEL: str = "#cme-verifications" # API API_HOST: str = "0.0.0.0" API_PORT: int = 8000 API_WORKERS: int = 4 API_CORS_ORIGINS: list = ["http://localhost:3000"] # Logging LOG_LEVEL: str = "INFO" LOG_FORMAT: str = "json" # Security SECRET_KEY: str = "change-me-in-production" ADMIN_API_KEY: str = "change-me-in-production" class Config: env_file = ".env" case_sensitive = True @lru_cache() def get_settings() -> Settings: """Get cached settings instance.""" return Settings() ### src/main.py ```python from fastapi import FastAPI from contextlib import asynccontextmanager import structlog from src.config import get_settings from src.data.models import init_db, close_db from src.data.storage.cache import cache from src.mcp.tools import register_all_tools from src.api.mcp_routes import router as mcp_router from src.api.health_routes import router as health_router from src.utils.logger import setup_logging settings = get_settings() setup_logging() logger = structlog.get_logger() @asynccontextmanager async def lifespan(app: FastAPI): """Application lifecycle manager.""" # Startup logger.info("application_starting") # Initialize database await init_db() # Connect to Redis await cache.connect() # Register MCP tools register_all_tools() logger.info("application_started") yield # Shutdown logger.info("application_stopping") # Close connections await cache.disconnect() await close_db() logger.info("application_stopped") # Create FastAPI app app = FastAPI( title="CME Prediction Markets MCP Server", description="MCP Server for verifying claims against CME prediction market data", version="1.0.0", lifespan=lifespan ) # Include routers app.include_router(mcp_router, prefix="/mcp", tags=["MCP"]) app.include_router(health_router, prefix="/health", tags=["Health"]) @app.get("/") async def root(): """Root endpoint.""" return { "service": "CME Prediction Markets MCP Server", "version": "1.0.0", "status": "running" } if __name__ == "__main__": import uvicorn uvicorn.run( "src.main:app", host=settings.API_HOST, port=settings.API_PORT, reload=True, log_level=settings.LOG_LEVEL.lower() ) ### src/api/mcp_routes.py ```python from fastapi import APIRouter, Request, HTTPException from typing import Dict, Any import structlog from src.mcp.server import mcp_server logger = structlog.get_logger() router = APIRouter() @router.post("/messages") async def handle_mcp_message(request: Request): """Handle MCP protocol messages.""" try: body = await request.json() logger.info("mcp_request_received", method=body.get("method")) response = await mcp_server.handle_request(body) return response except Exception as e: logger.error("mcp_request_error", error=str(e)) raise HTTPException(status_code=500, detail=str(e)) ### src/api/health_routes.py ```python from fastapi import APIRouter from datetime import datetime import structlog from src.data.models import get_db from src.data.storage.cache import cache logger = structlog.get_logger() router = APIRouter() @router.get("/") async def health_check(): """Basic health check.""" return { "status": "healthy", "timestamp": datetime.utcnow().isoformat() } @router.get("/ready") async def readiness_check(): """Readiness check with dependency verification.""" checks = { "database": False, "cache": False } # Check database try: async with get_db() as session: await session.execute("SELECT 1") checks["database"] = True except Exception as e: logger.error("health_check_db_failed", error=str(e)) # Check cache try: checks["cache"] = await cache.exists("health_check") except Exception as e: logger.error("health_check_cache_failed", error=str(e)) all_healthy = all(checks.values()) status_code = 200 if all_healthy else 503 return { "status": "ready" if all_healthy else "not_ready", "checks": checks, "timestamp": datetime.utcnow().isoformat() } ### src/utils/logger.py ```python import structlog import logging import sys def setup_logging(log_level: str = "INFO"): """Configure structured logging.""" logging.basicConfig( format="%(message)s", stream=sys.stdout, level=log_level, ) structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], wrapper_class=structlog.stdlib.BoundLogger, context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) --- ## Scripts ### scripts/init_db.py ```python """Initialize database with sample data.""" import asyncio import sys from pathlib import Path # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent)) from src.data.models import init_db, get_db, Contract, ContractType from datetime import date, timedelta import structlog logger = structlog.get_logger() async def create_sample_contracts(): """Create sample contracts.""" contracts = [ { "symbol": "BTC_95000_YES", "description": "Bitcoin to reach $95,000 by year end", "contract_type": ContractType.CRYPTO, "settlement_date": date.today() + timedelta(days=365), "is_yes_contract": True, "paired_contract_symbol": "BTC_95000_NO" }, { "symbol": "BTC_95000_NO", "description": "Bitcoin to NOT reach $95,000 by year end", "contract_type": ContractType.CRYPTO, "settlement_date": date.today() + timedelta(days=365), "is_yes_contract": False, "paired_contract_symbol": "BTC_95000_YES" }, { "symbol": "SPX_5000_YES", "description": "S&P 500 to reach 5000 by Q1 end", "contract_type": ContractType.EQUITY, "settlement_date": date.today() + timedelta(days=90), "is_yes_contract": True, "paired_contract_symbol": "SPX_5000_NO" } ] async with get_db() as session: for contract_data in contracts: contract = Contract(**contract_data) session.add(contract) await session.commit() logger.info("sample_contracts_created", count=len(contracts)) async def main(): """Main initialization function.""" logger.info("initializing_database") # Create tables await init_db() # Create sample contracts await create_sample_contracts() logger.info("database_initialized") if __name__ == "__main__": asyncio.run(main()) ### README.md ```markdown # CME Prediction Markets MCP Server Complete MCP server for verifying claims against CME prediction market data. ## Features - **MCP Protocol Support**: Full implementation of Model Context Protocol - **Data Infrastructure**: Automated CME data ingestion with PostgreSQL/TimescaleDB - **Claim Verification**: NLP-powered claim parsing and verification - **Slack Integration**: Real-time claim verification via Slack bot - **Caching Layer**: Redis-based caching for performance - **Async Architecture**: Built on FastAPI with async/await throughout ## Quick Start ### Prerequisites - Python 3.11+ - PostgreSQL 15+ (with TimescaleDB) - Redis 7+ - Docker & Docker Compose (optional) ### Installation 1. Clone the repository 2. Copy `.env.example` to `.env` and configure 3. Install dependencies: ```bash poetry install ``` 4. Initialize database: ```bash poetry run python scripts/init_db.py ``` 5. Run the server: ```bash poetry run uvicorn src.main:app --reload ``` ### Docker Deployment ```bash docker-compose up -d ``` ## MCP Tools ### query_trading_data Query historical trading data for contracts. ```json { "contract_symbol": "BTC_95000_YES", "start_time": "2024-12-01T00:00:00Z", "end_time": "2024-12-16T00:00:00Z" } ``` ### verify_claim Verify natural language claims against data. ```json { "claim_text": "Bitcoin reached 95 cents on December 15" } ``` ### get_contract_info Get detailed contract information. ```json { "contract_symbol": "BTC_95000_YES" } ``` ## Architecture See `docs/architecture.md` for detailed system architecture. ## Testing ```bash poetry run pytest ``` ## License MIT ``` --- This complete implementation includes all core functionality for the CME Prediction Markets MCP Server. To get started: 1. Set up your environment variables in `.env` 2. Run `docker-compose up` to start all services 3. Initialize the database with `python scripts/init_db.py` 4. The MCP server will be available at `http://localhost:8000` The system is production-ready with proper error handling, logging, caching, and async operations throughout.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jjsteffen23/dk_mcp_2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server