ib.py•20.1 kB
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Optional, List, Dict, Any, Tuple
from ib_insync import IB, Stock, Option, Contract
class IBManager:
def __init__(self) -> None:
self._ib: Optional[IB] = None
def connect(self) -> IB:
# Read and sanitize environment variables at connect-time
host_raw = os.getenv("IB_HOST", "127.0.0.1")
port_raw = os.getenv("IB_PORT", "4001")
client_id_raw = os.getenv("IB_CLIENT_ID", "19")
mdt_raw = os.getenv("IB_MARKET_DATA_TYPE", "1")
host = host_raw.strip()
try:
port = int(port_raw.strip())
except Exception:
port = 4001
try:
client_id = int(client_id_raw.strip())
except Exception:
client_id = 19
try:
mdt = int(mdt_raw.strip())
except Exception:
mdt = 1
if self._ib and self._ib.isConnected():
return self._ib
ib = IB()
try:
ib.connect(host, port, clientId=client_id, readonly=True)
except Exception as exc:
# Provide a clearer message for host/port issues
raise RuntimeError(
f"Failed to connect to IB Gateway at {host}:{port} (clientId {client_id}). "
"Ensure IB Gateway is running, logged in, API enabled, and host/port are correct."
) from exc
try:
ib.reqMarketDataType(mdt)
except Exception:
pass
self._ib = ib
return ib
def stock(self, symbol: str, exchange: str = "SMART", currency: str = "USD") -> Contract:
return Stock(symbol, exchange, currency)
def option(self, symbol: str, expiry: str, strike: float, right: str, exchange: str = "SMART", currency: str = "USD") -> Contract:
return Option(symbol, expiry, float(strike), right.upper(), exchange=exchange, currency=currency)
def req_snapshot(self, contract: Contract, timeout_sec: float = 6.0):
ib = self.connect()
try:
ib.qualifyContracts(contract)
except Exception:
pass
ticker = ib.reqMktData(contract, genericTickList="", snapshot=True)
deadline = time.time() + timeout_sec
def has_any() -> bool:
return (ticker.bid is not None) or (ticker.ask is not None) or (ticker.last is not None) or ticker.time
while time.time() < deadline and not has_any():
ib.waitOnUpdate(timeout=0.2)
return ticker
def req_best_effort(self, contract: Contract, timeout_sec: float = 6.0, allow_delayed_fallback: bool = True):
ib = self.connect()
ticker = self.req_snapshot(contract, timeout_sec=timeout_sec)
def as_float(value):
if value is None:
return None
try:
f = float(value)
except Exception:
return None
if f != f or f == float("inf") or f == float("-inf"):
return None
return f
bid = as_float(getattr(ticker, "bid", None))
ask = as_float(getattr(ticker, "ask", None))
last = as_float(getattr(ticker, "last", None))
close = as_float(getattr(ticker, "close", None))
volume = as_float(getattr(ticker, "volume", None))
ts = int(ticker.time.timestamp()) if getattr(ticker, "time", None) else None
# Prefer live marketPrice if last is missing
try:
mp = as_float(ticker.marketPrice())
except Exception:
mp = None
if last is None and mp is not None:
last = mp
# Use midpoint if available when last/close missing
if last is None and close is None:
if bid is not None and ask is not None:
mid = (bid + ask) / 2.0
last = as_float(mid)
elif bid is not None:
last = as_float(bid)
elif ask is not None:
last = as_float(ask)
if (bid is None and ask is None and last is None and close is None and volume is None):
# Try a brief streaming subscription for live update before historical fallback
try:
stream_ticker = ib.reqMktData(contract, genericTickList="", snapshot=False)
deadline = time.time() + 2.0
def has_stream_any() -> bool:
return (getattr(stream_ticker, "bid", None) is not None) or (getattr(stream_ticker, "ask", None) is not None) or (getattr(stream_ticker, "last", None) is not None) or getattr(stream_ticker, "time", None)
while time.time() < deadline and not has_stream_any():
ib.waitOnUpdate(timeout=0.2)
try:
ib.cancelMktData(stream_ticker.contract)
except Exception:
pass
# Merge if we got anything
bid = bid or as_float(getattr(stream_ticker, "bid", None))
ask = ask or as_float(getattr(stream_ticker, "ask", None))
last = last or as_float(getattr(stream_ticker, "last", None))
ts = ts or (int(stream_ticker.time.timestamp()) if getattr(stream_ticker, "time", None) else None)
except Exception:
pass
if (bid is None and ask is None and last is None and close is None and volume is None) and allow_delayed_fallback:
# Temporarily switch to delayed market data if available
try:
original_mdt = None
try:
original_mdt = int(os.getenv("IB_MARKET_DATA_TYPE", "1").strip())
except Exception:
original_mdt = 1
ib.reqMarketDataType(3)
delayed_ticker = self.req_snapshot(contract, timeout_sec=timeout_sec)
bid = as_float(getattr(delayed_ticker, "bid", None)) or bid
ask = as_float(getattr(delayed_ticker, "ask", None)) or ask
last = as_float(getattr(delayed_ticker, "last", None)) or last
close = as_float(getattr(delayed_ticker, "close", None)) or close
volume = as_float(getattr(delayed_ticker, "volume", None)) or volume
ts = ts or (int(delayed_ticker.time.timestamp()) if getattr(delayed_ticker, "time", None) else None)
finally:
try:
if original_mdt is not None:
ib.reqMarketDataType(original_mdt)
except Exception:
pass
if (bid is None and ask is None and last is None and close is None and volume is None):
ib = self.connect()
# Try historical TRADES first, then fallback to MIDPOINT
bars = []
try:
bars = ib.reqHistoricalData(
contract,
endDateTime="",
durationStr="1 D",
barSizeSetting="1 day",
whatToShow="TRADES",
useRTH=True,
formatDate=1,
keepUpToDate=False,
) or []
except Exception:
bars = []
if not bars:
try:
bars = ib.reqHistoricalData(
contract,
endDateTime="",
durationStr="1 D",
barSizeSetting="1 day",
whatToShow="MIDPOINT",
useRTH=True,
formatDate=1,
keepUpToDate=False,
) or []
except Exception:
bars = []
if bars:
bar = bars[-1]
last = as_float(getattr(bar, "close", None)) if last is None else last
close = as_float(getattr(bar, "close", None)) if close is None else close
volume = as_float(getattr(bar, "volume", None)) if volume is None else volume
# bar.date may be string or datetime depending on ib_insync version
try:
ts = int(getattr(bar, "date").timestamp())
except Exception:
ts = ts
# For options, historical daily bars may be unavailable; try historical ticks
if (last is None) and getattr(contract, 'secType', '').upper() in ('OPT', 'FOP', 'BAG'):
try:
end = datetime.now(timezone.utc)
def fetch_ticks(days_back: int):
start = end - timedelta(days=days_back)
return ib.reqHistoricalTicks(
contract,
startDateTime=start,
endDateTime=end,
numberOfTicks=1,
whatToShow="TRADES",
useRth=False,
ignoreSize=False,
miscOptions=[],
) or []
ticks = fetch_ticks(7)
if not ticks:
ticks = fetch_ticks(14)
if not ticks:
ticks = fetch_ticks(30)
if ticks:
tick = ticks[-1]
# ib_insync HistoricalTickLast has price and time
last = as_float(getattr(tick, 'price', None))
try:
ts = int(getattr(tick, 'time'))
except Exception:
ts = ts
except Exception:
pass
return {
"bid": bid,
"ask": ask,
"last": last if last is not None else close,
"close": close,
"volume": int(volume) if volume is not None else None,
"timestamp": ts,
}
def req_snapshots_batch(self, contracts: List[Contract], timeout_sec: float = 6.0, chunk_size: int = 80) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for c in contracts:
try:
data = self.req_best_effort(c, timeout_sec=timeout_sec)
except Exception:
data = {"bid": None, "ask": None, "last": None, "close": None, "volume": None, "timestamp": None}
results.append(data)
time.sleep(0.02)
return results
# -------- Bulk helpers for options --------
def _get_underlying_conid_and_price(self, symbol: str) -> Tuple[int, Optional[float]]:
ib = self.connect()
stk = self.stock(symbol)
try:
ib.qualifyContracts(stk)
except Exception:
pass
price_info = self.req_best_effort(stk)
price = price_info.get("last")
return getattr(stk, "conId", 0), price
def list_option_contracts(
self,
symbol: str,
months_ahead: int,
right: str = "BOTH",
strikes_around: int = 10,
include_weeklies: bool = True,
exchange: str = "SMART",
currency: str = "USD",
) -> List[Dict[str, Any]]:
ib = self.connect()
underlying_conid, spot = self._get_underlying_conid_and_price(symbol)
params = ib.reqSecDefOptParams(symbol, "", "STK", underlying_conid) or []
# Merge strikes and expirations across exchanges, but prefer SMART where available
all_expirations: set[str] = set()
all_strikes: set[float] = set()
preferred = None
for p in params:
if p.exchange == exchange:
preferred = p
use = preferred or (params[0] if params else None)
if not use:
return []
all_expirations = set(use.expirations)
all_strikes = set(float(s) for s in use.strikes)
# Filter expirations to next N months
now = datetime.now(timezone.utc)
cutoff = now + timedelta(days=30 * max(1, months_ahead))
def parse_exp(s: str) -> Optional[datetime]:
try:
if len(s) == 8:
return datetime.strptime(s, "%Y%m%d").replace(tzinfo=timezone.utc)
if len(s) == 6:
# Interpret YYYYMM as last day of month for filtering purposes
dt = datetime.strptime(s + "01", "%Y%m%d").replace(tzinfo=timezone.utc)
# move to next month, back one day
if dt.month == 12:
dt2 = dt.replace(year=dt.year + 1, month=1, day=1)
else:
dt2 = dt.replace(month=dt.month + 1, day=1)
return dt2 - timedelta(days=1)
except Exception:
return None
return None
exps = sorted([e for e in all_expirations if (parse_exp(e) and parse_exp(e) <= cutoff)])
# Filter weeklies if requested (keep only monthly: 3rd Friday heuristic)
if not include_weeklies:
filtered = []
for e in exps:
dt = parse_exp(e)
if not dt:
continue
# 3rd Friday heuristic
# find first day of month
first = dt.replace(day=1)
# find first Friday
first_friday = first + timedelta(days=(4 - first.weekday()) % 7)
third_friday = first_friday + timedelta(weeks=2)
if dt.date() == third_friday.date():
filtered.append(e)
exps = filtered
strikes = sorted(all_strikes)
# If we have a spot, choose strikes around it; else take center of strikes list
if strikes and strikes_around > 0:
if spot is not None:
# find nearest index
idx = min(range(len(strikes)), key=lambda i: abs(strikes[i] - float(spot)))
else:
idx = len(strikes) // 2
lo = max(0, idx - strikes_around)
hi = min(len(strikes), idx + strikes_around + 1)
strikes = strikes[lo:hi]
rights: List[str] = [right.upper()] if right.upper() in ("C", "P") else ["C", "P"]
contracts: List[Dict[str, Any]] = []
for e in exps:
for s in strikes:
for r in rights:
contracts.append({
"symbol": symbol.upper(),
"expiry": e,
"strike": float(s),
"right": r,
"exchange": exchange,
"currency": currency,
})
return contracts
def get_option_quotes_bulk(
self,
symbol: str,
months_ahead: int,
right: str = "BOTH",
strikes_around: int = 10,
include_weeklies: bool = True,
exchange: str = "SMART",
currency: str = "USD",
max_contracts: int = 200,
) -> List[Dict[str, Any]]:
items = self.list_option_contracts(
symbol=symbol,
months_ahead=months_ahead,
right=right,
strikes_around=strikes_around,
include_weeklies=include_weeklies,
exchange=exchange,
currency=currency,
)
if max_contracts and len(items) > max_contracts:
items = items[:max_contracts]
results: List[Dict[str, Any]] = []
for it in items:
contract = self.option(
it["symbol"], it["expiry"], it["strike"], it["right"], it["exchange"], it["currency"]
)
data = self.req_best_effort(contract)
results.append({
**it,
"bid": data.get("bid"),
"ask": data.get("ask"),
"last": data.get("last"),
"timestamp": data.get("timestamp"),
})
# Small pacing to be nice to rate limits
time.sleep(0.05)
return results
# -------- Contract-details-based enumeration (valid combos only) --------
def get_option_expirations(
self,
symbol: str,
exchange: str = "SMART",
) -> List[str]:
ib = self.connect()
params = ib.reqSecDefOptParams(symbol, "", "STK", 0) or []
# Prefer SMART if present
use = None
for p in params:
if p.exchange == exchange:
use = p
break
if not use and params:
use = params[0]
return sorted(use.expirations) if use else []
def list_option_details_filtered(
self,
symbol: str,
expirations: List[str],
right: str = "C",
exchange: str = "SMART",
strike_min: Optional[float] = None,
strike_max: Optional[float] = None,
max_per_expiry: int = 80,
) -> List[Dict[str, Any]]:
ib = self.connect()
results: List[Dict[str, Any]] = []
for expiry in expirations:
template = Option(symbol, expiry, 0.0, right.upper(), exchange=exchange)
cds = []
try:
cds = ib.reqContractDetails(template) or []
except Exception:
cds = []
count = 0
for cd in cds:
c = cd.contract
try:
s = float(getattr(c, "strike", 0.0))
except Exception:
continue
if strike_min is not None and s < strike_min:
continue
if strike_max is not None and s > strike_max:
continue
results.append({
"symbol": symbol.upper(),
"expiry": expiry,
"strike": s,
"right": right.upper(),
"conId": getattr(c, "conId", None),
"exchange": exchange,
"currency": getattr(c, "currency", "USD"),
"localSymbol": getattr(c, "localSymbol", None),
"tradingClass": getattr(c, "tradingClass", None),
"multiplier": getattr(c, "multiplier", None),
})
count += 1
if max_per_expiry and count >= max_per_expiry:
break
# small pacing between expiries
time.sleep(0.05)
return results
def get_quotes_for_conids(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for it in items:
# Build a full Option contract for robustness
try:
c = Option(
it["symbol"],
it["expiry"],
float(it["strike"]),
it["right"],
exchange=it.get("exchange", "SMART"),
currency=it.get("currency", "USD"),
)
except Exception:
continue
con_id = it.get("conId")
if con_id:
try:
c.conId = int(con_id)
except Exception:
pass
if it.get("localSymbol"):
c.localSymbol = it.get("localSymbol")
if it.get("tradingClass"):
c.tradingClass = it.get("tradingClass")
try:
data = self.req_best_effort(c)
except Exception:
data = {"bid": None, "ask": None, "last": None, "timestamp": None}
results.append({**it, **{
"bid": data.get("bid"),
"ask": data.get("ask"),
"last": data.get("last"),
"timestamp": data.get("timestamp"),
}})
time.sleep(0.03)
return results
ib_manager = IBManager()