Skip to main content
Glama
nadavgb-atom

ib-async-mcp

by nadavgb-atom

get_option_chain

Retrieve option chain data for a specified underlying symbol to analyze available contracts, strike prices, and expiration dates for trading strategies.

Instructions

Get option chain for an underlying.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
symbolYesUnderlying symbol
exchangeNo
underlying_sec_typeNoSTK

Implementation Reference

  • Implementation of the get_option_chain tool handler, which fetches option chains for a given symbol.
    if name == "get_option_chain":
        contract = Stock(args["symbol"], "SMART", "USD")
        await ib.qualifyContractsAsync(contract)
        chains = await ib.reqSecDefOptParamsAsync(
            args["symbol"],
            args.get("exchange", ""),
            args.get("underlying_sec_type", "STK"),
            contract.conId,
        )
        return [serialize_object(c) for c in chains]
  • Registration of the get_option_chain tool within the MCP server.
            Tool(
                name="get_option_chain",
                description="Get option chain for an underlying.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "symbol": {"type": "string", "description": "Underlying symbol"},
                        "exchange": {"type": "string", "default": ""},
                        "underlying_sec_type": {"type": "string", "default": "STK"},
                    },
                    "required": ["symbol"],
                },
            ),
            Tool(
                name="calculate_implied_volatility",
                description="Calculate implied volatility from option price.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "symbol": {"type": "string"},
                        "expiry": {"type": "string"},
                        "strike": {"type": "number"},
                        "right": {"type": "string"},
                        "option_price": {"type": "number"},
                        "underlying_price": {"type": "number"},
                    },
                    "required": ["symbol", "expiry", "strike", "right", "option_price", "underlying_price"],
                },
            ),
            Tool(
                name="calculate_option_price",
                description="Calculate option price from volatility.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "symbol": {"type": "string"},
                        "expiry": {"type": "string"},
                        "strike": {"type": "number"},
                        "right": {"type": "string"},
                        "volatility": {"type": "number"},
                        "underlying_price": {"type": "number"},
                    },
                    "required": ["symbol", "expiry", "strike", "right", "volatility", "underlying_price"],
                },
            ),
            # Scanner tools
            Tool(
                name="get_scanner_parameters",
                description="Get available scanner parameters.",
                inputSchema={"type": "object", "properties": {}},
            ),
            Tool(
                name="run_scanner",
                description="Run a market scanner.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "scan_code": {"type": "string", "description": "Scanner code (e.g., TOP_PERC_GAIN, MOST_ACTIVE)"},
                        "instrument": {"type": "string", "default": "STK", "description": "Instrument type"},
                        "location_code": {"type": "string", "default": "STK.US.MAJOR", "description": "Location code"},
                        "num_rows": {"type": "integer", "default": 10, "description": "Number of results"},
                    },
                    "required": ["scan_code"],
                },
            ),
            # News tools
            Tool(
                name="get_news_providers",
                description="Get available news providers.",
                inputSchema={"type": "object", "properties": {}},
            ),
            Tool(
                name="get_news_article",
                description="Get a news article.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "provider_code": {"type": "string"},
                        "article_id": {"type": "string"},
                    },
                    "required": ["provider_code", "article_id"],
                },
            ),
            Tool(
                name="get_historical_news",
                description="Get historical news headlines.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "con_id": {"type": "integer", "description": "Contract ID"},
                        "provider_codes": {"type": "string", "description": "Comma-separated provider codes"},
                        "total_results": {"type": "integer", "default": 10},
                    },
                    "required": ["con_id", "provider_codes"],
                },
            ),
            # Utility tools
            Tool(
                name="get_current_time",
                description="Get current TWS server time.",
                inputSchema={"type": "object", "properties": {}},
            ),
        ]
    
    
    
    @app.call_tool()
    async def call_tool(name: str, arguments: dict) -> list[TextContent]:
        """Handle tool calls."""
        global ib
        
        try:
            result = await _handle_tool(name, arguments)
            return [TextContent(type="text", text=json.dumps(serialize_object(result), indent=2))]
        except Exception as e:
            return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
    
    
    async def _handle_tool(name: str, args: dict) -> Any:
        """Route tool calls to appropriate handlers."""
        global ib
        
        # Connection tools
        if name == "connect":
            if ib is not None and ib.isConnected():
                return {"status": "already_connected"}
            ib = IB()
            await ib.connectAsync(
                host=args.get("host", "127.0.0.1"),
                port=args.get("port", 7496),
                clientId=args.get("client_id", 1),
                readonly=args.get("readonly", True),
            )
            return {"status": "connected", "accounts": ib.managedAccounts()}
        
        if name == "disconnect":
            if ib is not None:
                ib.disconnect()
                ib = None
            return {"status": "disconnected"}
        
        if name == "is_connected":
            return {"connected": ib is not None and ib.isConnected()}
        
        # Require connection for other tools
        if ib is None or not ib.isConnected():
            raise RuntimeError("Not connected. Call 'connect' first.")
        
        # Account tools
        if name == "get_accounts":
            return {"accounts": ib.managedAccounts()}
        
        if name == "get_account_values":
            values = ib.accountValues(args.get("account", ""))
            return [{"account": v.account, "tag": v.tag, "value": v.value, "currency": v.currency} for v in values]
        
        if name == "get_account_summary":
            summary = ib.accountSummary(args.get("account", ""))
            return [{"account": v.account, "tag": v.tag, "value": v.value, "currency": v.currency} for v in summary]
        
        if name == "get_portfolio":
            items = ib.portfolio(args.get("account", ""))
            return [{
                "symbol": p.contract.symbol,
                "sec_type": p.contract.secType,
                "position": p.position,
                "market_price": p.marketPrice,
                "market_value": p.marketValue,
                "average_cost": p.averageCost,
                "unrealized_pnl": p.unrealizedPNL,
                "realized_pnl": p.realizedPNL,
                "account": p.account,
            } for p in items]
        
        if name == "get_positions":
            positions = ib.positions(args.get("account", ""))
            return [{
                "account": p.account,
                "symbol": p.contract.symbol,
                "sec_type": p.contract.secType,
                "position": p.position,
                "avg_cost": p.avgCost,
            } for p in positions]
        
        if name == "get_pnl":
            pnl_list = ib.pnl(args.get("account", ""))
            return [{
                "account": p.account,
                "daily_pnl": p.dailyPnL,
                "unrealized_pnl": p.unrealizedPnL,
                "realized_pnl": p.realizedPnL,
            } for p in pnl_list]
    
        
        # Contract tools
        if name == "create_contract":
            contract_type = args.pop("contract_type")
            kwargs = {k: v for k, v in args.items() if v is not None}
            if "expiry" in kwargs:
                kwargs["lastTradeDateOrContractMonth"] = kwargs.pop("expiry")
            contract = create_contract(contract_type, **kwargs)
            return serialize_object(contract)
        
        if name == "qualify_contracts":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            qualified = await ib.qualifyContractsAsync(contract)
            return [serialize_object(c) for c in qualified]
        
        if name == "get_contract_details":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            details = await ib.reqContractDetailsAsync(contract)
            return [serialize_object(d) for d in details]
        
        if name == "search_symbols":
            results = await ib.reqMatchingSymbolsAsync(args["pattern"])
            return [serialize_object(r) for r in (results or [])]
        
        # Market data tools
        if name == "get_market_data":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            await ib.qualifyContractsAsync(contract)
            tickers = await ib.reqTickersAsync(contract)
            if tickers:
                t = tickers[0]
                return {
                    "symbol": contract.symbol,
                    "bid": t.bid,
                    "ask": t.ask,
                    "last": t.last,
                    "volume": t.volume,
                    "open": t.open,
                    "high": t.high,
                    "low": t.low,
                    "close": t.close,
                }
            return {"error": "No data available"}
        
        if name == "get_historical_data":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            await ib.qualifyContractsAsync(contract)
            bars = await ib.reqHistoricalDataAsync(
                contract,
                endDateTime="",
                durationStr=args.get("duration", "1 D"),
                barSizeSetting=args.get("bar_size", "1 hour"),
                whatToShow=args.get("what_to_show", "TRADES"),
                useRTH=args.get("use_rth", True),
            )
            return [{
                "date": b.date.isoformat() if hasattr(b.date, 'isoformat') else str(b.date),
                "open": b.open,
                "high": b.high,
                "low": b.low,
                "close": b.close,
                "volume": b.volume,
            } for b in bars]
        
        if name == "get_head_timestamp":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            await ib.qualifyContractsAsync(contract)
            ts = await ib.reqHeadTimeStampAsync(
                contract,
                whatToShow=args.get("what_to_show", "TRADES"),
                useRTH=True,
                formatDate=1,
            )
            return {"head_timestamp": ts.isoformat() if ts else None}
    
        
        # Order tools
        if name == "place_order":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            await ib.qualifyContractsAsync(contract)
            
            order_type = args["order_type"].lower()
            action = args["action"].upper()
            quantity = args["quantity"]
            
            if order_type == "market":
                order = MarketOrder(action, quantity)
            elif order_type == "limit":
                order = LimitOrder(action, quantity, args["limit_price"])
            elif order_type == "stop":
                order = StopOrder(action, quantity, args["stop_price"])
            elif order_type == "stop_limit":
                order = StopLimitOrder(action, quantity, args["limit_price"], args["stop_price"])
            else:
                raise ValueError(f"Unknown order type: {order_type}")
            
            trade = ib.placeOrder(contract, order)
            return {
                "order_id": trade.order.orderId,
                "status": trade.orderStatus.status,
                "filled": trade.orderStatus.filled,
                "remaining": trade.orderStatus.remaining,
            }
        
        if name == "cancel_order":
            orders = ib.orders()
            for order in orders:
                if order.orderId == args["order_id"]:
                    ib.cancelOrder(order)
                    return {"status": "cancel_requested", "order_id": args["order_id"]}
            return {"error": "Order not found"}
        
        if name == "cancel_all_orders":
            ib.reqGlobalCancel()
            return {"status": "cancel_all_requested"}
        
        if name == "get_open_orders":
            orders = ib.openOrders()
            return [serialize_object(o) for o in orders]
        
        if name == "get_open_trades":
            trades = ib.openTrades()
            return [{
                "order_id": t.order.orderId,
                "symbol": t.contract.symbol,
                "action": t.order.action,
                "quantity": t.order.totalQuantity,
                "order_type": t.order.orderType,
                "status": t.orderStatus.status,
                "filled": t.orderStatus.filled,
                "remaining": t.orderStatus.remaining,
            } for t in trades]
        
        if name == "get_executions":
            executions = ib.executions()
            return [serialize_object(e) for e in executions]
        
        if name == "get_fills":
            fills = ib.fills()
            return [{
                "symbol": f.contract.symbol,
                "exec_id": f.execution.execId,
                "side": f.execution.side,
                "shares": f.execution.shares,
                "price": f.execution.price,
                "time": f.time.isoformat() if f.time else None,
                "commission": f.commissionReport.commission if f.commissionReport else None,
            } for f in fills]
        
        if name == "what_if_order":
            contract = create_contract(
                args["contract_type"],
                symbol=args["symbol"],
                exchange=args.get("exchange", "SMART"),
                currency=args.get("currency", "USD"),
            )
            await ib.qualifyContractsAsync(contract)
            
            order_type = args.get("order_type", "market").lower()
            if order_type == "limit":
                order = LimitOrder(args["action"], args["quantity"], args["limit_price"])
            else:
                order = MarketOrder(args["action"], args["quantity"])
            
            state = await ib.whatIfOrderAsync(contract, order)
            return serialize_object(state)
    
        
        # Options tools
        if name == "get_option_chain":
            contract = Stock(args["symbol"], "SMART", "USD")
            await ib.qualifyContractsAsync(contract)
            chains = await ib.reqSecDefOptParamsAsync(
                args["symbol"],
                args.get("exchange", ""),
                args.get("underlying_sec_type", "STK"),
                contract.conId,
            )
            return [serialize_object(c) for c in chains]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nadavgb-atom/ib-async-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server