get_option_chain
Retrieve option chain data for a specified underlying symbol to analyze available contracts, strike prices, and expiration dates for trading strategies.
Instructions
Get option chain for an underlying.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| symbol | Yes | Underlying symbol | |
| exchange | No | ||
| underlying_sec_type | No | STK |
Implementation Reference
- src/ib_async_mcp/server.py:719-728 (handler)Implementation of the get_option_chain tool handler, which fetches option chains for a given symbol.
if name == "get_option_chain": contract = Stock(args["symbol"], "SMART", "USD") await ib.qualifyContractsAsync(contract) chains = await ib.reqSecDefOptParamsAsync( args["symbol"], args.get("exchange", ""), args.get("underlying_sec_type", "STK"), contract.conId, ) return [serialize_object(c) for c in chains] - src/ib_async_mcp/server.py:327-728 (registration)Registration of the get_option_chain tool within the MCP server.
Tool( name="get_option_chain", description="Get option chain for an underlying.", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Underlying symbol"}, "exchange": {"type": "string", "default": ""}, "underlying_sec_type": {"type": "string", "default": "STK"}, }, "required": ["symbol"], }, ), Tool( name="calculate_implied_volatility", description="Calculate implied volatility from option price.", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string"}, "expiry": {"type": "string"}, "strike": {"type": "number"}, "right": {"type": "string"}, "option_price": {"type": "number"}, "underlying_price": {"type": "number"}, }, "required": ["symbol", "expiry", "strike", "right", "option_price", "underlying_price"], }, ), Tool( name="calculate_option_price", description="Calculate option price from volatility.", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string"}, "expiry": {"type": "string"}, "strike": {"type": "number"}, "right": {"type": "string"}, "volatility": {"type": "number"}, "underlying_price": {"type": "number"}, }, "required": ["symbol", "expiry", "strike", "right", "volatility", "underlying_price"], }, ), # Scanner tools Tool( name="get_scanner_parameters", description="Get available scanner parameters.", inputSchema={"type": "object", "properties": {}}, ), Tool( name="run_scanner", description="Run a market scanner.", inputSchema={ "type": "object", "properties": { "scan_code": {"type": "string", "description": "Scanner code (e.g., TOP_PERC_GAIN, MOST_ACTIVE)"}, "instrument": {"type": "string", "default": "STK", "description": "Instrument type"}, "location_code": {"type": "string", "default": "STK.US.MAJOR", "description": "Location code"}, "num_rows": {"type": "integer", "default": 10, "description": "Number of results"}, }, "required": ["scan_code"], }, ), # News tools Tool( name="get_news_providers", description="Get available news providers.", inputSchema={"type": "object", "properties": {}}, ), Tool( name="get_news_article", description="Get a news article.", inputSchema={ "type": "object", "properties": { "provider_code": {"type": "string"}, "article_id": {"type": "string"}, }, "required": ["provider_code", "article_id"], }, ), Tool( name="get_historical_news", description="Get historical news headlines.", inputSchema={ "type": "object", "properties": { "con_id": {"type": "integer", "description": "Contract ID"}, "provider_codes": {"type": "string", "description": "Comma-separated provider codes"}, "total_results": {"type": "integer", "default": 10}, }, "required": ["con_id", "provider_codes"], }, ), # Utility tools Tool( name="get_current_time", description="Get current TWS server time.", inputSchema={"type": "object", "properties": {}}, ), ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls.""" global ib try: result = await _handle_tool(name, arguments) return [TextContent(type="text", text=json.dumps(serialize_object(result), indent=2))] except Exception as e: return [TextContent(type="text", text=json.dumps({"error": str(e)}))] async def _handle_tool(name: str, args: dict) -> Any: """Route tool calls to appropriate handlers.""" global ib # Connection tools if name == "connect": if ib is not None and ib.isConnected(): return {"status": "already_connected"} ib = IB() await ib.connectAsync( host=args.get("host", "127.0.0.1"), port=args.get("port", 7496), clientId=args.get("client_id", 1), readonly=args.get("readonly", True), ) return {"status": "connected", "accounts": ib.managedAccounts()} if name == "disconnect": if ib is not None: ib.disconnect() ib = None return {"status": "disconnected"} if name == "is_connected": return {"connected": ib is not None and ib.isConnected()} # Require connection for other tools if ib is None or not ib.isConnected(): raise RuntimeError("Not connected. Call 'connect' first.") # Account tools if name == "get_accounts": return {"accounts": ib.managedAccounts()} if name == "get_account_values": values = ib.accountValues(args.get("account", "")) return [{"account": v.account, "tag": v.tag, "value": v.value, "currency": v.currency} for v in values] if name == "get_account_summary": summary = ib.accountSummary(args.get("account", "")) return [{"account": v.account, "tag": v.tag, "value": v.value, "currency": v.currency} for v in summary] if name == "get_portfolio": items = ib.portfolio(args.get("account", "")) return [{ "symbol": p.contract.symbol, "sec_type": p.contract.secType, "position": p.position, "market_price": p.marketPrice, "market_value": p.marketValue, "average_cost": p.averageCost, "unrealized_pnl": p.unrealizedPNL, "realized_pnl": p.realizedPNL, "account": p.account, } for p in items] if name == "get_positions": positions = ib.positions(args.get("account", "")) return [{ "account": p.account, "symbol": p.contract.symbol, "sec_type": p.contract.secType, "position": p.position, "avg_cost": p.avgCost, } for p in positions] if name == "get_pnl": pnl_list = ib.pnl(args.get("account", "")) return [{ "account": p.account, "daily_pnl": p.dailyPnL, "unrealized_pnl": p.unrealizedPnL, "realized_pnl": p.realizedPnL, } for p in pnl_list] # Contract tools if name == "create_contract": contract_type = args.pop("contract_type") kwargs = {k: v for k, v in args.items() if v is not None} if "expiry" in kwargs: kwargs["lastTradeDateOrContractMonth"] = kwargs.pop("expiry") contract = create_contract(contract_type, **kwargs) return serialize_object(contract) if name == "qualify_contracts": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) qualified = await ib.qualifyContractsAsync(contract) return [serialize_object(c) for c in qualified] if name == "get_contract_details": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) details = await ib.reqContractDetailsAsync(contract) return [serialize_object(d) for d in details] if name == "search_symbols": results = await ib.reqMatchingSymbolsAsync(args["pattern"]) return [serialize_object(r) for r in (results or [])] # Market data tools if name == "get_market_data": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) await ib.qualifyContractsAsync(contract) tickers = await ib.reqTickersAsync(contract) if tickers: t = tickers[0] return { "symbol": contract.symbol, "bid": t.bid, "ask": t.ask, "last": t.last, "volume": t.volume, "open": t.open, "high": t.high, "low": t.low, "close": t.close, } return {"error": "No data available"} if name == "get_historical_data": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) await ib.qualifyContractsAsync(contract) bars = await ib.reqHistoricalDataAsync( contract, endDateTime="", durationStr=args.get("duration", "1 D"), barSizeSetting=args.get("bar_size", "1 hour"), whatToShow=args.get("what_to_show", "TRADES"), useRTH=args.get("use_rth", True), ) return [{ "date": b.date.isoformat() if hasattr(b.date, 'isoformat') else str(b.date), "open": b.open, "high": b.high, "low": b.low, "close": b.close, "volume": b.volume, } for b in bars] if name == "get_head_timestamp": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) await ib.qualifyContractsAsync(contract) ts = await ib.reqHeadTimeStampAsync( contract, whatToShow=args.get("what_to_show", "TRADES"), useRTH=True, formatDate=1, ) return {"head_timestamp": ts.isoformat() if ts else None} # Order tools if name == "place_order": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) await ib.qualifyContractsAsync(contract) order_type = args["order_type"].lower() action = args["action"].upper() quantity = args["quantity"] if order_type == "market": order = MarketOrder(action, quantity) elif order_type == "limit": order = LimitOrder(action, quantity, args["limit_price"]) elif order_type == "stop": order = StopOrder(action, quantity, args["stop_price"]) elif order_type == "stop_limit": order = StopLimitOrder(action, quantity, args["limit_price"], args["stop_price"]) else: raise ValueError(f"Unknown order type: {order_type}") trade = ib.placeOrder(contract, order) return { "order_id": trade.order.orderId, "status": trade.orderStatus.status, "filled": trade.orderStatus.filled, "remaining": trade.orderStatus.remaining, } if name == "cancel_order": orders = ib.orders() for order in orders: if order.orderId == args["order_id"]: ib.cancelOrder(order) return {"status": "cancel_requested", "order_id": args["order_id"]} return {"error": "Order not found"} if name == "cancel_all_orders": ib.reqGlobalCancel() return {"status": "cancel_all_requested"} if name == "get_open_orders": orders = ib.openOrders() return [serialize_object(o) for o in orders] if name == "get_open_trades": trades = ib.openTrades() return [{ "order_id": t.order.orderId, "symbol": t.contract.symbol, "action": t.order.action, "quantity": t.order.totalQuantity, "order_type": t.order.orderType, "status": t.orderStatus.status, "filled": t.orderStatus.filled, "remaining": t.orderStatus.remaining, } for t in trades] if name == "get_executions": executions = ib.executions() return [serialize_object(e) for e in executions] if name == "get_fills": fills = ib.fills() return [{ "symbol": f.contract.symbol, "exec_id": f.execution.execId, "side": f.execution.side, "shares": f.execution.shares, "price": f.execution.price, "time": f.time.isoformat() if f.time else None, "commission": f.commissionReport.commission if f.commissionReport else None, } for f in fills] if name == "what_if_order": contract = create_contract( args["contract_type"], symbol=args["symbol"], exchange=args.get("exchange", "SMART"), currency=args.get("currency", "USD"), ) await ib.qualifyContractsAsync(contract) order_type = args.get("order_type", "market").lower() if order_type == "limit": order = LimitOrder(args["action"], args["quantity"], args["limit_price"]) else: order = MarketOrder(args["action"], args["quantity"]) state = await ib.whatIfOrderAsync(contract, order) return serialize_object(state) # Options tools if name == "get_option_chain": contract = Stock(args["symbol"], "SMART", "USD") await ib.qualifyContractsAsync(contract) chains = await ib.reqSecDefOptParamsAsync( args["symbol"], args.get("exchange", ""), args.get("underlying_sec_type", "STK"), contract.conId, ) return [serialize_object(c) for c in chains]