"""
SQLAlchemy ORM models for IBKR MCP database.
This module defines the database schema for:
- Option chain snapshots and historical data
- Trade records and execution details
- Greeks history and risk metrics
- Portfolio positions
参考 brokers-mcp 的模型设计,但针对 IBKR 的业务需求进行优化。
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Index,
Integer,
Numeric,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
# SQLAlchemy Base for all ORM models
Base = declarative_base()
# Type alias for numeric columns
# Using Numeric for precise financial calculations
PriceType = Numeric(precision=12, scale=6) # e.g., 123456.123456
QuantityType = Numeric(precision=15, scale=6) # e.g., 1000.123456
PnLType = Numeric(precision=15, scale=2) # e.g., 12345.67
class OptionChainSnapshot(Base):
"""
Stores option chain snapshots for historical analysis.
Each snapshot represents the complete option chain for a symbol
at a specific point in time, including all strikes, expirations,
and market data (bid, ask, volume, open interest, implied volatility).
"""
__tablename__ = "option_chain_snapshots"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Symbol identification
symbol = Column(String(20), nullable=False, index=True, comment="Underlying symbol, e.g., AAPL")
symbol_context = Column(String(100), nullable=True, comment="Additional symbol context if needed")
# Timestamp
timestamp = Column(
DateTime,
nullable=False,
default=datetime.utcnow,
index=True,
comment="Snapshot timestamp in UTC",
)
# Underlying market data
underlying_price = Column(PriceType, nullable=True, comment="Current price of underlying asset")
underlying_bid = Column(PriceType, nullable=True)
underlying_ask = Column(PriceType, nullable=True)
underlying_volume = Column(Integer, nullable=True)
# Market metadata
market_data_type = Column(String(20), default="LIVE", comment="LIVE, DELAYED, etc.")
exchange = Column(String(20), nullable=True, comment="Primary exchange")
currency = Column(String(3), default="USD", comment="ISO currency code")
# Complete option chain data as JSON
# This stores the entire chain for flexible querying later
# Format: [{"strike": 150, "expiry": "2025-06-20", "calls": [...], "puts": [...]}, ...]
options_json = Column(Text, nullable=False, comment="Complete option chain in JSON format")
# Snapshot metadata
total_calls = Column(Integer, default=0, comment="Total number of call options")
total_puts = Column(Integer, default=0, comment="Total number of put options")
total_volume = Column(Integer, default=0, comment="Sum of all option volumes")
total_open_interest = Column(Integer, default=0, comment="Sum of all open interest")
# Snapshot statistics
avg_call_iv = Column(Float, nullable=True, comment="Average implied volatility for calls")
avg_put_iv = Column(Float, nullable=True, comment="Average implied volatility for puts")
iv_skew = Column(Float, nullable=True, comment="IV skew (put IV - call IV)")
# Creation timestamp
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Indexes for performance
__table_args__ = (
Index("idx_option_chain_symbol_timestamp", "symbol", "timestamp"),
Index("idx_option_chain_timestamp", "timestamp"),
Index("idx_option_chain_underlying_price", "underlying_price"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"id": self.id,
"symbol": self.symbol,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"underlying_price": float(self.underlying_price) if self.underlying_price else None,
"underlying_bid": float(self.underlying_bid) if self.underlying_bid else None,
"underlying_ask": float(self.underlying_ask) if self.underlying_ask else None,
"options_count": self.total_calls + self.total_puts,
"total_volume": self.total_volume,
"total_open_interest": self.total_open_interest,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
def get_options(self) -> List[Dict[str, Any]]:
"""Parse and return options JSON as Python objects."""
if not self.options_json:
return []
try:
return json.loads(self.options_json)
except json.JSONDecodeError:
logger.error(f"Failed to decode options JSON for snapshot {self.id}")
return []
class TradeRecord(Base):
"""
Records all trading activities including orders, executions, and fills.
This captures the complete trade lifecycle from initial order
through final execution and settlement.
"""
__tablename__ = "trade_records"
# Primary key
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
# Trade identification
symbol = Column(String(20), nullable=False, index=True, comment="Traded symbol")
con_id = Column(Integer, nullable=True, index=True, comment="IB contract ID")
# Account information
account = Column(String(20), nullable=True, index=True, comment="IB account identifier")
# Trade details
action = Column(String(10), nullable=False, comment="BUY or SELL")
quantity = Column(QuantityType, nullable=False, comment="Trade quantity")
price = Column(PriceType, nullable=True, comment="Execution price")
avg_price = Column(PriceType, nullable=True, comment="Average execution price")
filled_quantity = Column(QuantityType, default=0, comment="Quantity filled")
remaining_quantity = Column(QuantityType, default=0, comment="Quantity remaining")
# Order management
order_id = Column(String(50), nullable=True, index=True, comment="IB order ID")
client_order_id = Column(String(50), nullable=True, index=True, comment="Client order ID")
# Trade timing
timestamp = Column(DateTime, default=datetime.utcnow, index=True, comment="Trade timestamp")
execution_time = Column(DateTime, nullable=True, comment="Time of execution")
last_trade_date = Column(String(8), nullable=True, comment="For options: YYYYMMDD")
# Contract details
sec_type = Column(String(10), nullable=True, comment="Security type: STK, OPT, FUT, etc.")
strike = Column(PriceType, nullable=True, comment="For options: strike price")
right = Column(String(2), nullable=True, comment="For options: C or P")
expiry = Column(String(8), nullable=True, comment="For options: YYYYMMDD")
multiplier = Column(Integer, default=1, comment="Contract multiplier")
# Order management
order_type = Column(String(20), nullable=True, comment="MKT, LMT, STP, etc.")
status = Column(String(20), nullable=False, default="PENDING", index=True)
status_msg = Column(String(200), nullable=True, comment="Status message or error")
# Financial details
commission = Column(PnLType, default=0, comment="Commission paid")
commission_currency = Column(String(3), default="USD", comment="Commission currency")
realized_pnl = Column(PnLType, default=0, comment="Realized P&L")
realized_pnl_currency = Column(String(3), default="USD", comment="P&L currency")
# Strategy and tags
strategy = Column(String(50), nullable=True, index=True, comment="Trading strategy name")
tags = Column(String(200), nullable=True, comment="Comma-separated tags")
# Notes and metadata
notes = Column(Text, nullable=True, comment="Trade notes")
metadata_json = Column(Text, nullable=True, comment="Additional metadata in JSON")
# Relationships
fills = relationship("TradeFill", back_populates="trade", cascade="all, delete-orphan")
# Indexes
__table_args__ = (
Index("idx_trade_symbol_timestamp", "symbol", "timestamp"),
Index("idx_trade_account_timestamp", "account", "timestamp"),
Index("idx_trade_order_id", "order_id"),
Index("idx_trade_status", "status"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"id": self.id,
"symbol": self.symbol,
"action": self.action,
"quantity": float(self.quantity) if self.quantity else None,
"price": float(self.price) if self.price else None,
"avg_price": float(self.avg_price) if self.avg_price else None,
"filled_quantity": float(self.filled_quantity) if self.filled_quantity else None,
"status": self.status,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"realized_pnl": float(self.realized_pnl) if self.realized_pnl else None,
"strategy": self.strategy,
}
class TradeFill(Base):
"""
Individual fills/executions for a trade.
A single trade can have multiple fills (partial executions).
"""
__tablename__ = "trade_fills"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Foreign key to trade
trade_id = Column(String(36), ForeignKey("trade_records.id"), nullable=False, index=True)
# Fill details
timestamp = Column(DateTime, default=datetime.utcnow, index=True, comment="Fill timestamp")
price = Column(PriceType, nullable=False, comment="Fill price")
quantity = Column(QuantityType, nullable=False, comment="Fill quantity")
commission = Column(PnLType, default=0, comment="Commission for this fill")
# Exchange information
exchange = Column(String(20), nullable=True, comment="Execution exchange")
liquidity_flag = Column(String(5), nullable=True, comment="P=Public, I=Initiator")
# IB-specific identifiers
ib_exec_id = Column(String(50), nullable=True, unique=True, comment="IB execution ID")
# Relationships
trade = relationship("TradeRecord", back_populates="fills")
# Indexes
__table_args__ = (
Index("idx_fill_trade_id", "trade_id"),
Index("idx_fill_timestamp", "timestamp"),
)
class GreeksHistory(Base):
"""
Historical record of portfolio Greeks and risk metrics.
Stores calculated Greeks for the entire portfolio or per-symbol
at regular intervals for risk monitoring and analysis.
"""
__tablename__ = "greeks_history"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Identification
symbol = Column(String(20), nullable=True, index=True, comment="Specific symbol, null for portfolio-wide")
account = Column(String(20), nullable=True, index=True, comment="IB account, null for all accounts")
# Timestamp
timestamp = Column(DateTime, default=datetime.utcnow, index=True, comment="Calculation timestamp")
# Position quantities
long_positions = Column(Integer, default=0, comment="Number of long positions")
short_positions = Column(Integer, default=0, comment="Number of short positions")
total_contracts = Column(Integer, default=0, comment="Total number of contracts")
# Greeks (normalized to portfolio exposure)
total_delta = Column(Numeric(precision=15, scale=6), default=0, comment="Total delta exposure")
total_gamma = Column(Numeric(precision=15, scale=6), default=0, comment="Total gamma exposure")
total_vega = Column(Numeric(precision=15, scale=6), default=0, comment="Total vega exposure")
total_theta = Column(Numeric(precision=15, scale=6), default=0, comment="Total theta exposure")
total_rho = Column(Numeric(precision=15, scale=6), default=0, comment="Total rho exposure")
# Per-share Greeks (for stocks)
stock_delta = Column(Numeric(precision=15, scale=6), default=0, comment="Stock delta exposure")
# Risk metrics
portfolio_value = Column(PnLType, default=0, comment="Total portfolio market value")
cash_balance = Column(PnLType, default=0, comment="Cash balance")
buying_power = Column(PnLType, default=0, comment="Available buying power")
# Risk assessments
risk_score = Column(Numeric(precision=10, scale=4), nullable=True, comment="Overall risk score 0-1")
concentration_risk = Column(Numeric(precision=10, scale=4), nullable=True, comment="Concentration risk 0-1")
volatility_risk = Column(Numeric(precision=10, scale=4), nullable=True, comment="Volatility risk 0-1")
# Greeks by expiration (for options)
# Format: {"2025-06-20": {"delta": 100, "gamma": 5}, "2025-07-18": {...}}
greeks_by_expiry_json = Column(Text, nullable=True, comment="Greeks breakdown by expiration")
# Greeks by underlying
# Format: {"AAPL": {"delta": 50, "gamma": 2}, "MSFT": {...}}
greeks_by_underlying_json = Column(Text, nullable=True, comment="Greeks breakdown by underlying")
# Creation timestamp
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Indexes
__table_args__ = (
Index("idx_greeks_symbol_timestamp", "symbol", "timestamp"),
Index("idx_greeks_account_timestamp", "account", "timestamp"),
Index("idx_greeks_timestamp", "timestamp"),
Index("idx_greeks_risk_score", "risk_score"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"id": self.id,
"symbol": self.symbol,
"account": self.account,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"total_delta": float(self.total_delta) if self.total_delta else None,
"total_gamma": float(self.total_gamma) if self.total_gamma else None,
"total_vega": float(self.total_vega) if self.total_vega else None,
"total_theta": float(self.total_theta) if self.total_theta else None,
"risk_score": float(self.risk_score) if self.risk_score else None,
"portfolio_value": float(self.portfolio_value) if self.portfolio_value else None,
}
def get_greeks_by_expiry(self) -> Dict[str, Any]:
"""Parse and return greeks by expiry JSON."""
if not self.greeks_by_expiry_json:
return {}
try:
return json.loads(self.greeks_by_expiry_json)
except json.JSONDecodeError:
return {}
def get_greeks_by_underlying(self) -> Dict[str, Any]:
"""Parse and return greeks by underlying JSON."""
if not self.greeks_by_underlying_json:
return {}
try:
return json.loads(self.greeks_by_underlying_json)
except json.JSONDecodeError:
return {}
class RiskAlert(Base):
"""
Risk alerts and warnings generated by the system.
Stores automated risk assessments and recommendations.
"""
__tablename__ = "risk_alerts"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Alert identification
alert_type = Column(String(50), nullable=False, index=True, comment="Type of risk alert")
severity = Column(String(20), nullable=False, index=True, comment="LOW, MEDIUM, HIGH, CRITICAL")
# Scope
symbol = Column(String(20), nullable=True, index=True, comment="Affected symbol")
account = Column(String(20), nullable=True, index=True, comment="Affected account")
# Alert details
title = Column(String(200), nullable=False, comment="Alert title")
message = Column(Text, nullable=False, comment="Alert message")
# Risk metrics at time of alert
risk_value = Column(Numeric(precision=15, scale=6), nullable=True, comment="Current risk value")
threshold_value = Column(Numeric(precision=15, scale=6), nullable=True, comment="Risk threshold")
risk_metric = Column(String(50), nullable=True, comment="Which metric triggered alert")
# Status tracking
status = Column(String(20), default="ACTIVE", index=True, comment="ACTIVE, ACKNOWLEDGED, RESOLVED")
timestamp = Column(DateTime, default=datetime.utcnow, index=True, comment="Alert timestamp")
acknowledged_at = Column(DateTime, nullable=True, comment="When alert was acknowledged")
resolved_at = Column(DateTime, nullable=True, comment="When alert was resolved")
# Actions and recommendations
recommended_action = Column(Text, nullable=True, comment="Recommended corrective action")
playbook_name = Column(String(50), nullable=True, comment="Associated playbook")
# Metadata
metadata_json = Column(Text, nullable=True, comment="Additional context in JSON")
# Indexes
__table_args__ = (
Index("idx_risk_alert_timestamp", "timestamp"),
Index("idx_risk_alert_status", "status"),
Index("idx_risk_alert_severity", "severity"),
Index("idx_risk_alert_type", "alert_type"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"id": self.id,
"alert_type": self.alert_type,
"severity": self.severity,
"title": self.title,
"message": self.message,
"status": self.status,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
}
# Additional helper models for flexibility
class MarketDataSnapshot(Base):
"""
General market data snapshots for any instrument.
Provides flexibility to store market data for stocks, futures, etc.
beyond just options.
"""
__tablename__ = "market_data_snapshots"
# Primary key
id = Column(Integer, primary_key=True, autoincrement=True)
# Identification
symbol = Column(String(20), nullable=False, index=True)
con_id = Column(Integer, nullable=True, index=True)
sec_type = Column(String(10), nullable=True, comment="STK, FUT, etc.")
# Timestamp
timestamp = Column(DateTime, default=datetime.utcnow, index=True)
# Market data
bid = Column(PriceType, nullable=True)
ask = Column(PriceType, nullable=True)
last = Column(PriceType, nullable=True)
high = Column(PriceType, nullable=True)
low = Column(PriceType, nullable=True)
open = Column(PriceType, nullable=True)
close = Column(PriceType, nullable=True)
volume = Column(Integer, nullable=True)
# Additional fields
exchange = Column(String(20), nullable=True)
currency = Column(String(3), default="USD")
# Indexes
__table_args__ = (
Index("idx_market_data_symbol_timestamp", "symbol", "timestamp"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"symbol": self.symbol,
"bid": float(self.bid) if self.bid else None,
"ask": float(self.ask) if self.ask else None,
"last": float(self.last) if self.last else None,
"volume": self.volume,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
}
# Configure logging
from loguru import logger
__all__ = [
"Base",
"OptionChainSnapshot",
"TradeRecord",
"TradeFill",
"GreeksHistory",
"RiskAlert",
"MarketDataSnapshot",
"PriceType",
"QuantityType",
"PnLType",
]