Skip to main content
Glama
market_data.py9.05 kB
"""Market data streaming resource.""" import asyncio import json from typing import Dict, Any, Set, Optional from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession from ..models import AppContext, ContractRequest # Global state for market data resources _market_data_cache: Dict[str, Dict[str, Any]] = {} _market_data_resource_subscriptions: Set[str] = set() _resource_background_streams: Dict[str, asyncio.Task] = {} def register_market_data_resource(mcp: FastMCP): """Register market data streaming resource.""" @mcp.resource("ibkr://market-data/{resource_id}") async def get_market_data_resource(resource_id: str) -> str: """Get current market data snapshot for a symbol. This resource provides real-time market data that updates automatically. Subscribe to this resource to receive notifications when data changes. Resource ID format: - Stocks: Just the symbol (e.g., "AAPL", "MSFT") - Forex: symbol.currency (e.g., "USD.JPY", "EUR.USD", "USD.SGD") - Others: symbol or symbol.identifier as appropriate Usage: 1. Call ibkr_start_market_data_resource tool to start streaming 2. Subscribe to this resource: ibkr://market-data/{resource_id} 3. Read resource to get current snapshot 4. Receive notifications when data updates 5. Re-read resource when notified to get latest data Args: resource_id: Resource identifier (e.g., "AAPL", "USD.JPY", "EUR.USD") Returns: JSON string with current market data or error message """ print(f"[RESOURCE READ] Requested resource_id: '{resource_id}'") print(f"[RESOURCE READ] Cache keys: {list(_market_data_cache.keys())}") if resource_id not in _market_data_cache: return json.dumps({ "error": f"No data for {resource_id}", "message": f"Call ibkr_start_market_data_resource() first to start streaming this resource", "subscribed": False }) data = _market_data_cache[resource_id] return json.dumps({ "resource_id": resource_id, "subscribed": True, "data": data.get("data", {}), "last_update": data.get("timestamp", 0), "contract": data.get("params", {}) }) @mcp.tool() async def ibkr_start_market_data_resource( ctx: Context[ServerSession, AppContext], symbol: str, secType: str = "STK", exchange: str = "SMART", currency: str = "USD" ) -> str: """Start streaming market data to a resource. This starts a background task that continuously updates the resource ibkr://market-data/{resource_id}. Clients can subscribe to the resource to receive notifications when data changes. Resource ID format: - Stocks (STK): Just the symbol (e.g., "AAPL" → ibkr://market-data/AAPL) - Forex (CASH): symbol.currency (e.g., "USD" + "JPY" → ibkr://market-data/USD.JPY) - Others: symbol or symbol.identifier This is the MCP-recommended pattern for streaming data - much more efficient than polling! Args: symbol: Stock/currency symbol (e.g., "AAPL", "USD", "EUR") secType: Security type (STK, OPT, FUT, CASH, BOND, etc.) exchange: Exchange (SMART for smart routing, IDEALPRO for forex, etc.) currency: Currency code (USD, EUR, GBP, JPY, SGD, etc.) Returns: JSON with resource URI and subscription status Example workflows: # Stock: Apple result = ibkr_start_market_data_resource("AAPL", "STK", "SMART", "USD") // Resource: ibkr://market-data/AAPL # Forex: USD/JPY result = ibkr_start_market_data_resource("USD", "CASH", "IDEALPRO", "JPY") // Resource: ibkr://market-data/USD.JPY # Forex: EUR/USD result = ibkr_start_market_data_resource("EUR", "CASH", "IDEALPRO", "USD") // Resource: ibkr://market-data/EUR.USD """ tws = ctx.request_context.lifespan_context.tws if not tws or not tws.is_connected(): return json.dumps({ "error": "TWS client not connected", "message": "Call ibkr_connect first" }) # Create resource ID: for CASH (forex), use symbol.currency format # For stocks and others, just use symbol if secType == "CASH": resource_id = f"{symbol}.{currency}" else: resource_id = symbol if resource_id in _market_data_resource_subscriptions: return json.dumps({ "status": "already_subscribed", "resource_uri": f"ibkr://market-data/{resource_id}", "message": f"Market data already streaming for {resource_id}" }) # Initialize cache _market_data_cache[resource_id] = { "data": {}, "timestamp": 0, "params": { "symbol": symbol, "secType": secType, "exchange": exchange, "currency": currency } } # Start background streaming task async def stream_to_resource(): """Background task that updates the resource and sends notifications.""" req = ContractRequest(symbol=symbol, secType=secType, exchange=exchange, currency=currency) print(f"[RESOURCE] Starting market data stream for {resource_id} ({symbol}/{currency})") print(f"[RESOURCE] TWS connected: {tws.is_connected()}") try: print(f"[RESOURCE] Entering async for loop for {resource_id}") async for data in tws.stream_market_data(req): print(f"[RESOURCE] Received data for {resource_id}: {data}") if data: # Update cache _market_data_cache[resource_id]["data"] = data _market_data_cache[resource_id]["timestamp"] = asyncio.get_event_loop().time() # Notify all subscribed clients that resource changed await ctx.session.send_resource_updated(f"ibkr://market-data/{resource_id}") print(f"[RESOURCE] Updated {resource_id}: {list(data.keys())} - notification sent") except asyncio.CancelledError: print(f"[RESOURCE] Stream cancelled for {resource_id}") except Exception as e: print(f"[RESOURCE] Stream error for {resource_id}: {e}") import traceback traceback.print_exc() task = asyncio.create_task(stream_to_resource()) _resource_background_streams[resource_id] = task _market_data_resource_subscriptions.add(resource_id) return json.dumps({ "status": "subscribed", "resource_uri": f"ibkr://market-data/{resource_id}", "resource_id": resource_id, "message": f"Market data streaming started. Subscribe to resource 'ibkr://market-data/{resource_id}' to receive updates.", "contract": { "symbol": symbol, "secType": secType, "exchange": exchange, "currency": currency } }) @mcp.tool() async def ibkr_stop_market_data_resource(resource_id: str) -> str: """Stop streaming market data to a resource. Cancels the background task and clears cached data for a resource. Args: resource_id: Resource identifier (e.g., "AAPL", "USD.JPY", "EUR.USD") Returns: JSON with status """ if resource_id not in _resource_background_streams: return json.dumps({ "error": f"No active stream for {resource_id}", "subscribed": False }) # Cancel background task task = _resource_background_streams[resource_id] task.cancel() try: await task except asyncio.CancelledError: pass # Cleanup del _resource_background_streams[resource_id] _market_data_resource_subscriptions.remove(resource_id) del _market_data_cache[resource_id] print(f"[RESOURCE] Stopped stream for {resource_id}") return json.dumps({ "status": "stopped", "resource_id": resource_id, "message": f"Market data streaming stopped for {resource_id}" })

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haymant/tws-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server