get_realtime_data
Retrieve current stock prices and trading data for Taiwan-listed companies to monitor market movements and inform investment decisions.
Instructions
Get real-time data for a specific stock.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| stock_code | Yes |
Implementation Reference
- mcp_server.py:157-179 (handler)MCP tool handler implementation for 'get_realtime_data'. This is the entry point decorated with @mcp.tool that handles the tool execution, error handling, and response formatting using RealtimeDataResponse model.@mcp.tool(name="get_realtime_data", description="Get real-time data for a specific stock.", ) async def get_realtime_data_tool(stock_code: str) -> RealtimeDataResponse: """Get real-time data for a specific stock.""" try: raw_data = await get_realtime_data(stock_code) # Extract clean data without _metadata for Pydantic model from tw_stock_agent.utils.mcp_error_handler import MCPResponseFormatter clean_data = MCPResponseFormatter.extract_metadata_for_model(raw_data) return RealtimeDataResponse(**clean_data) except TwStockAgentError as e: return RealtimeDataResponse( stock_code=stock_code, updated_at=e.context.timestamp.isoformat(), error=e.message ) except Exception as e: return RealtimeDataResponse( stock_code=stock_code, updated_at=datetime.now().isoformat(), error=f"Unexpected error: {str(e)}" )
- Pydantic model (schema) defining the structure and validation for the realtime data response, including fields like current_price, volume, bid/ask prices, with computed fields for analysis.class RealtimeDataResponse(BaseStockResponse): """Enhanced response model for realtime stock data.""" company_name: Optional[str] = Field( None, description="Company name (股票名稱)", alias="name" ) current_price: Optional[TWDAmount] = Field( None, description="Current price (現在價格)", alias="currentPrice" ) opening_price: Optional[TWDAmount] = Field( None, description="Opening price (開盤價)", alias="open" ) highest_price: Optional[TWDAmount] = Field( None, description="Highest price (最高價)", alias="high" ) lowest_price: Optional[TWDAmount] = Field( None, description="Lowest price (最低價)", alias="low" ) trading_volume: Optional[int] = Field( None, description="Trading volume (成交量)", alias="volume", ge=0 ) bid_price: Optional[TWDAmount] = Field( None, description="Best bid price (最佳買價)", alias="bidPrice" ) ask_price: Optional[TWDAmount] = Field( None, description="Best ask price (最佳賣價)", alias="askPrice" ) bid_size: Optional[int] = Field( None, description="Bid size (買張數)", alias="bidSize", ge=0 ) ask_size: Optional[int] = Field( None, description="Ask size (賣張數)", alias="askSize", ge=0 ) market_status: Optional[str] = Field( None, description="Market status (市場狀態)", alias="marketStatus" ) def __init__(self, **data): # Handle backward compatibility for price_field in ['current_price', 'open', 'high', 'low', 'bid_price', 'ask_price']: field_name = price_field if price_field == 'open': field_name = 'opening_price' elif price_field == 'high': field_name = 'highest_price' elif price_field == 'low': field_name = 'lowest_price' # current_price, bid_price, ask_price stay the same if price_field in data and data[price_field] is not None: value = data[price_field] if not isinstance(value, dict): data[field_name] = {'amount': value} if price_field != field_name: # Only remove if field name changed data.pop(price_field, None) super().__init__(**data) self.metadata.data_type = "realtime_data" @field_validator("market_status") @classmethod def validate_market_status(cls, v: Optional[str]) -> Optional[str]: """Validate market status.""" if v is None: return None valid_statuses = { 'open', 'closed', 'pre_market', 'after_hours', 'opening_auction', 'closing_auction', 'trading_halt' } v_lower = v.lower().replace(' ', '_') if v_lower not in valid_statuses: return 'unknown' return v_lower @computed_field @property def price_change_from_open(self) -> Optional[Dict[str, Any]]: """Calculate price change from opening.""" if not self.current_price or not self.opening_price or self.opening_price.amount == 0: return None change_amount = self.current_price.amount - self.opening_price.amount change_percentage = float((change_amount / self.opening_price.amount) * 100) return { "change_amount": float(change_amount), "change_percentage": change_percentage, "is_positive": change_amount > 0, "formatted_change": f"{'+' if change_amount >= 0 else ''}{change_amount:.2f}" } @computed_field @property def bid_ask_spread(self) -> Optional[Dict[str, Any]]: """Calculate bid-ask spread.""" if not self.bid_price or not self.ask_price: return None spread_amount = self.ask_price.amount - self.bid_price.amount mid_price = (self.ask_price.amount + self.bid_price.amount) / 2 spread_percentage = float((spread_amount / mid_price) * 100) if mid_price > 0 else None return { "spread_amount": float(spread_amount), "spread_percentage": spread_percentage, "mid_price": float(mid_price) } @computed_field @property def is_actively_trading(self) -> bool: """Check if stock is actively trading.""" return ( self.market_status == 'open' and self.current_price is not None and self.trading_volume is not None and self.trading_volume > 0 )
- Core helper function in StockService that fetches realtime stock data from twstock API, implements caching, circuit breaking, retry logic, and error handling.@with_async_error_handling(operation="get_realtime_data") @with_retry(max_retries=1, base_delay=0.5) # Shorter retry for real-time data async def get_realtime_data(self, stock_code: str) -> dict[str, Any]: """ 獲取即時股票資訊 Args: stock_code: 股票代號 Returns: 即時股票資訊 Raises: InvalidStockCodeError: 股票代號格式錯誤 StockNotFoundError: 找不到股票 StockDataUnavailableError: 即時資料無法取得 StockMarketClosedError: 股市休市 """ # Track request for performance monitoring self._performance_monitor.record_request() # Validate stock code validated_code = StockCodeValidator.validate_stock_code(stock_code) cache_key = f"realtime_{validated_code}" cached_data = self._get_cached_data(cache_key) if cached_data: return cached_data try: # Use circuit breaker for external API call realtime_data = await self.circuit_breaker.acall( asyncio.to_thread, twstock.realtime.get, validated_code ) if not realtime_data: raise StockNotFoundError( stock_code=validated_code, message=f"No real-time data available for stock '{validated_code}'" ) if realtime_data.get('success', False) is False: # Check if market is closed current_hour = datetime.now().hour if current_hour < 9 or current_hour > 14: # Taiwan market hours: 9:00-13:30 from tw_stock_agent.exceptions import StockMarketClosedError raise StockMarketClosedError( stock_code=validated_code, message="Taiwan stock market is closed. Trading hours: 9:00 AM - 1:30 PM (GMT+8)" ) else: raise StockDataUnavailableError( stock_code=validated_code, data_type="real-time data", message="Real-time data temporarily unavailable" ) realtime_info = realtime_data.get('realtime', {}) stock_info = realtime_data.get('info', {}) data = { "stock_code": validated_code, "name": stock_info.get('name'), "current_price": realtime_info.get('latest_trade_price'), "open": realtime_info.get('open'), "high": realtime_info.get('high'), "low": realtime_info.get('low'), "volume": realtime_info.get('accumulate_trade_volume'), "updated_at": datetime.now().isoformat(), "market_status": "open" if realtime_data.get('success') else "closed" } # 使用標籤進行分組管理 tags = ["realtime", f"stock_{validated_code}", "live_data"] self._set_cached_data(cache_key, data, self.cache_ttl['realtime'], tags) return data except (StockDataUnavailableError, StockNotFoundError): # Track error and re-raise specific stock errors self._performance_monitor.record_error() raise except Exception as e: # Track error and convert generic exceptions to appropriate stock errors self._performance_monitor.record_error() if "connection" in str(e).lower() or "timeout" in str(e).lower(): raise StockDataUnavailableError( stock_code=validated_code, data_type="real-time data", message=f"Unable to fetch real-time data due to network issues: {str(e)}" ) else: raise StockDataUnavailableError( stock_code=validated_code, data_type="real-time data", message=f"Failed to fetch real-time data: {str(e)}" )
- Intermediate helper function that validates input parameters, delegates to StockService, and formats the response for MCP compatibility.@mcp_error_handler("get_realtime_data") async def get_realtime_data(stock_code: str) -> dict[str, Any]: """ 取得即時股票資訊 Args: stock_code: 股票代號,例如:2330 Returns: 即時股票資訊 Raises: InvalidStockCodeError: 股票代號格式錯誤 StockNotFoundError: 找不到股票 StockDataUnavailableError: 即時資料無法取得 StockMarketClosedError: 股市休市 """ # Validate request parameters validated_params = validate_stock_request(stock_code=stock_code) # Fetch real-time data result = await stock_service.get_realtime_data(validated_params["stock_code"]) # Format response for MCP with enhanced realtime structure return MCPResponseFormatter.format_realtime_data_response(result)