Skip to main content
Glama

Interactive Brokers MCP Server

by atilcan
ib.py20.1 kB
import os import time from datetime import datetime, timedelta, timezone from typing import Optional, List, Dict, Any, Tuple from ib_insync import IB, Stock, Option, Contract class IBManager: def __init__(self) -> None: self._ib: Optional[IB] = None def connect(self) -> IB: # Read and sanitize environment variables at connect-time host_raw = os.getenv("IB_HOST", "127.0.0.1") port_raw = os.getenv("IB_PORT", "4001") client_id_raw = os.getenv("IB_CLIENT_ID", "19") mdt_raw = os.getenv("IB_MARKET_DATA_TYPE", "1") host = host_raw.strip() try: port = int(port_raw.strip()) except Exception: port = 4001 try: client_id = int(client_id_raw.strip()) except Exception: client_id = 19 try: mdt = int(mdt_raw.strip()) except Exception: mdt = 1 if self._ib and self._ib.isConnected(): return self._ib ib = IB() try: ib.connect(host, port, clientId=client_id, readonly=True) except Exception as exc: # Provide a clearer message for host/port issues raise RuntimeError( f"Failed to connect to IB Gateway at {host}:{port} (clientId {client_id}). " "Ensure IB Gateway is running, logged in, API enabled, and host/port are correct." ) from exc try: ib.reqMarketDataType(mdt) except Exception: pass self._ib = ib return ib def stock(self, symbol: str, exchange: str = "SMART", currency: str = "USD") -> Contract: return Stock(symbol, exchange, currency) def option(self, symbol: str, expiry: str, strike: float, right: str, exchange: str = "SMART", currency: str = "USD") -> Contract: return Option(symbol, expiry, float(strike), right.upper(), exchange=exchange, currency=currency) def req_snapshot(self, contract: Contract, timeout_sec: float = 6.0): ib = self.connect() try: ib.qualifyContracts(contract) except Exception: pass ticker = ib.reqMktData(contract, genericTickList="", snapshot=True) deadline = time.time() + timeout_sec def has_any() -> bool: return (ticker.bid is not None) or (ticker.ask is not None) or (ticker.last is not None) or ticker.time while time.time() < deadline and not has_any(): ib.waitOnUpdate(timeout=0.2) return ticker def req_best_effort(self, contract: Contract, timeout_sec: float = 6.0, allow_delayed_fallback: bool = True): ib = self.connect() ticker = self.req_snapshot(contract, timeout_sec=timeout_sec) def as_float(value): if value is None: return None try: f = float(value) except Exception: return None if f != f or f == float("inf") or f == float("-inf"): return None return f bid = as_float(getattr(ticker, "bid", None)) ask = as_float(getattr(ticker, "ask", None)) last = as_float(getattr(ticker, "last", None)) close = as_float(getattr(ticker, "close", None)) volume = as_float(getattr(ticker, "volume", None)) ts = int(ticker.time.timestamp()) if getattr(ticker, "time", None) else None # Prefer live marketPrice if last is missing try: mp = as_float(ticker.marketPrice()) except Exception: mp = None if last is None and mp is not None: last = mp # Use midpoint if available when last/close missing if last is None and close is None: if bid is not None and ask is not None: mid = (bid + ask) / 2.0 last = as_float(mid) elif bid is not None: last = as_float(bid) elif ask is not None: last = as_float(ask) if (bid is None and ask is None and last is None and close is None and volume is None): # Try a brief streaming subscription for live update before historical fallback try: stream_ticker = ib.reqMktData(contract, genericTickList="", snapshot=False) deadline = time.time() + 2.0 def has_stream_any() -> bool: return (getattr(stream_ticker, "bid", None) is not None) or (getattr(stream_ticker, "ask", None) is not None) or (getattr(stream_ticker, "last", None) is not None) or getattr(stream_ticker, "time", None) while time.time() < deadline and not has_stream_any(): ib.waitOnUpdate(timeout=0.2) try: ib.cancelMktData(stream_ticker.contract) except Exception: pass # Merge if we got anything bid = bid or as_float(getattr(stream_ticker, "bid", None)) ask = ask or as_float(getattr(stream_ticker, "ask", None)) last = last or as_float(getattr(stream_ticker, "last", None)) ts = ts or (int(stream_ticker.time.timestamp()) if getattr(stream_ticker, "time", None) else None) except Exception: pass if (bid is None and ask is None and last is None and close is None and volume is None) and allow_delayed_fallback: # Temporarily switch to delayed market data if available try: original_mdt = None try: original_mdt = int(os.getenv("IB_MARKET_DATA_TYPE", "1").strip()) except Exception: original_mdt = 1 ib.reqMarketDataType(3) delayed_ticker = self.req_snapshot(contract, timeout_sec=timeout_sec) bid = as_float(getattr(delayed_ticker, "bid", None)) or bid ask = as_float(getattr(delayed_ticker, "ask", None)) or ask last = as_float(getattr(delayed_ticker, "last", None)) or last close = as_float(getattr(delayed_ticker, "close", None)) or close volume = as_float(getattr(delayed_ticker, "volume", None)) or volume ts = ts or (int(delayed_ticker.time.timestamp()) if getattr(delayed_ticker, "time", None) else None) finally: try: if original_mdt is not None: ib.reqMarketDataType(original_mdt) except Exception: pass if (bid is None and ask is None and last is None and close is None and volume is None): ib = self.connect() # Try historical TRADES first, then fallback to MIDPOINT bars = [] try: bars = ib.reqHistoricalData( contract, endDateTime="", durationStr="1 D", barSizeSetting="1 day", whatToShow="TRADES", useRTH=True, formatDate=1, keepUpToDate=False, ) or [] except Exception: bars = [] if not bars: try: bars = ib.reqHistoricalData( contract, endDateTime="", durationStr="1 D", barSizeSetting="1 day", whatToShow="MIDPOINT", useRTH=True, formatDate=1, keepUpToDate=False, ) or [] except Exception: bars = [] if bars: bar = bars[-1] last = as_float(getattr(bar, "close", None)) if last is None else last close = as_float(getattr(bar, "close", None)) if close is None else close volume = as_float(getattr(bar, "volume", None)) if volume is None else volume # bar.date may be string or datetime depending on ib_insync version try: ts = int(getattr(bar, "date").timestamp()) except Exception: ts = ts # For options, historical daily bars may be unavailable; try historical ticks if (last is None) and getattr(contract, 'secType', '').upper() in ('OPT', 'FOP', 'BAG'): try: end = datetime.now(timezone.utc) def fetch_ticks(days_back: int): start = end - timedelta(days=days_back) return ib.reqHistoricalTicks( contract, startDateTime=start, endDateTime=end, numberOfTicks=1, whatToShow="TRADES", useRth=False, ignoreSize=False, miscOptions=[], ) or [] ticks = fetch_ticks(7) if not ticks: ticks = fetch_ticks(14) if not ticks: ticks = fetch_ticks(30) if ticks: tick = ticks[-1] # ib_insync HistoricalTickLast has price and time last = as_float(getattr(tick, 'price', None)) try: ts = int(getattr(tick, 'time')) except Exception: ts = ts except Exception: pass return { "bid": bid, "ask": ask, "last": last if last is not None else close, "close": close, "volume": int(volume) if volume is not None else None, "timestamp": ts, } def req_snapshots_batch(self, contracts: List[Contract], timeout_sec: float = 6.0, chunk_size: int = 80) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for c in contracts: try: data = self.req_best_effort(c, timeout_sec=timeout_sec) except Exception: data = {"bid": None, "ask": None, "last": None, "close": None, "volume": None, "timestamp": None} results.append(data) time.sleep(0.02) return results # -------- Bulk helpers for options -------- def _get_underlying_conid_and_price(self, symbol: str) -> Tuple[int, Optional[float]]: ib = self.connect() stk = self.stock(symbol) try: ib.qualifyContracts(stk) except Exception: pass price_info = self.req_best_effort(stk) price = price_info.get("last") return getattr(stk, "conId", 0), price def list_option_contracts( self, symbol: str, months_ahead: int, right: str = "BOTH", strikes_around: int = 10, include_weeklies: bool = True, exchange: str = "SMART", currency: str = "USD", ) -> List[Dict[str, Any]]: ib = self.connect() underlying_conid, spot = self._get_underlying_conid_and_price(symbol) params = ib.reqSecDefOptParams(symbol, "", "STK", underlying_conid) or [] # Merge strikes and expirations across exchanges, but prefer SMART where available all_expirations: set[str] = set() all_strikes: set[float] = set() preferred = None for p in params: if p.exchange == exchange: preferred = p use = preferred or (params[0] if params else None) if not use: return [] all_expirations = set(use.expirations) all_strikes = set(float(s) for s in use.strikes) # Filter expirations to next N months now = datetime.now(timezone.utc) cutoff = now + timedelta(days=30 * max(1, months_ahead)) def parse_exp(s: str) -> Optional[datetime]: try: if len(s) == 8: return datetime.strptime(s, "%Y%m%d").replace(tzinfo=timezone.utc) if len(s) == 6: # Interpret YYYYMM as last day of month for filtering purposes dt = datetime.strptime(s + "01", "%Y%m%d").replace(tzinfo=timezone.utc) # move to next month, back one day if dt.month == 12: dt2 = dt.replace(year=dt.year + 1, month=1, day=1) else: dt2 = dt.replace(month=dt.month + 1, day=1) return dt2 - timedelta(days=1) except Exception: return None return None exps = sorted([e for e in all_expirations if (parse_exp(e) and parse_exp(e) <= cutoff)]) # Filter weeklies if requested (keep only monthly: 3rd Friday heuristic) if not include_weeklies: filtered = [] for e in exps: dt = parse_exp(e) if not dt: continue # 3rd Friday heuristic # find first day of month first = dt.replace(day=1) # find first Friday first_friday = first + timedelta(days=(4 - first.weekday()) % 7) third_friday = first_friday + timedelta(weeks=2) if dt.date() == third_friday.date(): filtered.append(e) exps = filtered strikes = sorted(all_strikes) # If we have a spot, choose strikes around it; else take center of strikes list if strikes and strikes_around > 0: if spot is not None: # find nearest index idx = min(range(len(strikes)), key=lambda i: abs(strikes[i] - float(spot))) else: idx = len(strikes) // 2 lo = max(0, idx - strikes_around) hi = min(len(strikes), idx + strikes_around + 1) strikes = strikes[lo:hi] rights: List[str] = [right.upper()] if right.upper() in ("C", "P") else ["C", "P"] contracts: List[Dict[str, Any]] = [] for e in exps: for s in strikes: for r in rights: contracts.append({ "symbol": symbol.upper(), "expiry": e, "strike": float(s), "right": r, "exchange": exchange, "currency": currency, }) return contracts def get_option_quotes_bulk( self, symbol: str, months_ahead: int, right: str = "BOTH", strikes_around: int = 10, include_weeklies: bool = True, exchange: str = "SMART", currency: str = "USD", max_contracts: int = 200, ) -> List[Dict[str, Any]]: items = self.list_option_contracts( symbol=symbol, months_ahead=months_ahead, right=right, strikes_around=strikes_around, include_weeklies=include_weeklies, exchange=exchange, currency=currency, ) if max_contracts and len(items) > max_contracts: items = items[:max_contracts] results: List[Dict[str, Any]] = [] for it in items: contract = self.option( it["symbol"], it["expiry"], it["strike"], it["right"], it["exchange"], it["currency"] ) data = self.req_best_effort(contract) results.append({ **it, "bid": data.get("bid"), "ask": data.get("ask"), "last": data.get("last"), "timestamp": data.get("timestamp"), }) # Small pacing to be nice to rate limits time.sleep(0.05) return results # -------- Contract-details-based enumeration (valid combos only) -------- def get_option_expirations( self, symbol: str, exchange: str = "SMART", ) -> List[str]: ib = self.connect() params = ib.reqSecDefOptParams(symbol, "", "STK", 0) or [] # Prefer SMART if present use = None for p in params: if p.exchange == exchange: use = p break if not use and params: use = params[0] return sorted(use.expirations) if use else [] def list_option_details_filtered( self, symbol: str, expirations: List[str], right: str = "C", exchange: str = "SMART", strike_min: Optional[float] = None, strike_max: Optional[float] = None, max_per_expiry: int = 80, ) -> List[Dict[str, Any]]: ib = self.connect() results: List[Dict[str, Any]] = [] for expiry in expirations: template = Option(symbol, expiry, 0.0, right.upper(), exchange=exchange) cds = [] try: cds = ib.reqContractDetails(template) or [] except Exception: cds = [] count = 0 for cd in cds: c = cd.contract try: s = float(getattr(c, "strike", 0.0)) except Exception: continue if strike_min is not None and s < strike_min: continue if strike_max is not None and s > strike_max: continue results.append({ "symbol": symbol.upper(), "expiry": expiry, "strike": s, "right": right.upper(), "conId": getattr(c, "conId", None), "exchange": exchange, "currency": getattr(c, "currency", "USD"), "localSymbol": getattr(c, "localSymbol", None), "tradingClass": getattr(c, "tradingClass", None), "multiplier": getattr(c, "multiplier", None), }) count += 1 if max_per_expiry and count >= max_per_expiry: break # small pacing between expiries time.sleep(0.05) return results def get_quotes_for_conids(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for it in items: # Build a full Option contract for robustness try: c = Option( it["symbol"], it["expiry"], float(it["strike"]), it["right"], exchange=it.get("exchange", "SMART"), currency=it.get("currency", "USD"), ) except Exception: continue con_id = it.get("conId") if con_id: try: c.conId = int(con_id) except Exception: pass if it.get("localSymbol"): c.localSymbol = it.get("localSymbol") if it.get("tradingClass"): c.tradingClass = it.get("tradingClass") try: data = self.req_best_effort(c) except Exception: data = {"bid": None, "ask": None, "last": None, "timestamp": None} results.append({**it, **{ "bid": data.get("bid"), "ask": data.get("ask"), "last": data.get("last"), "timestamp": data.get("timestamp"), }}) time.sleep(0.03) return results ib_manager = IBManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/atilcan/ib-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server