Skip to main content
Glama
yahoo_adapter.py17.1 kB
# src/server/domain/adapters/yahoo_adapter.py """YahooFinance adapter using yfinance. All methods are async via asyncio.run_in_executor to avoid blocking. """ import asyncio import logging from datetime import datetime from decimal import Decimal from typing import Any, Dict, List, Optional import pandas as pd import yfinance as yf from src.server.domain.adapters.base import BaseDataAdapter from src.server.domain.types import ( AdapterCapability, Asset, AssetPrice, AssetSearchQuery, AssetSearchResult, AssetType, DataSource, Exchange, MarketInfo, MarketStatus, ) from src.server.utils.logger import logger class YahooAdapter(BaseDataAdapter): name = "yahoo" def __init__(self, cache, proxy_url: str = None): super().__init__(DataSource.YAHOO) self.cache = cache self.logger = logger self.proxy_url = proxy_url or "http://127.0.0.1:7890" # Configure proxy if self.proxy_url: try: yf.set_config(proxy=self.proxy_url) self.logger.info( f"✅ Yahoo adapter configured with proxy (yf.set_config): {self.proxy_url}" ) except AttributeError: import os os.environ["HTTP_PROXY"] = self.proxy_url os.environ["HTTPS_PROXY"] = self.proxy_url os.environ["http_proxy"] = self.proxy_url os.environ["https_proxy"] = self.proxy_url self.logger.info( f"✅ Yahoo adapter configured with proxy (env vars): {self.proxy_url}" ) else: self.logger.info("ℹ️ Yahoo adapter running without proxy") def get_capabilities(self) -> List[AdapterCapability]: """Declare Yahoo Finance's capabilities.""" return [ AdapterCapability( asset_type=AssetType.STOCK, exchanges={ Exchange.NASDAQ, Exchange.NYSE, Exchange.AMEX, Exchange.HKEX, }, ), AdapterCapability( asset_type=AssetType.ETF, exchanges={Exchange.NASDAQ, Exchange.NYSE} ), AdapterCapability( asset_type=AssetType.INDEX, exchanges={Exchange.NASDAQ, Exchange.NYSE} ), AdapterCapability( asset_type=AssetType.CRYPTO, exchanges={Exchange.CRYPTO} ), ] def get_supported_asset_types(self) -> List[AssetType]: """Get list of supported asset types.""" return [ AssetType.STOCK, AssetType.ETF, AssetType.INDEX, AssetType.FOREX, AssetType.CRYPTO, # Add support for Crypto ] def convert_to_source_ticker(self, ticker: str) -> str: """Convert internal ticker to Yahoo Finance format. Internal format: EXCHANGE:SYMBOL Yahoo format: SYMBOL.EXCHANGE_SUFFIX """ if ":" not in ticker: return ticker exchange, symbol = ticker.split(":", 1) # Handle Crypto if exchange == "CRYPTO": # Convert BTC/USDT -> BTC-USD if "/" in symbol: base, quote = symbol.split("/") # Yahoo uses USD for most crypto pairs if quote in ["USDT", "USDC", "USD"]: return f"{base}-USD" return f"{base}-{quote}" return f"{symbol}-USD" # Handle US stocks (no suffix) if exchange in ["NASDAQ", "NYSE", "AMEX", "US"]: return symbol # Handle HK stocks if exchange == "HKEX": return f"{symbol}.HK" # Handle A-shares if exchange == "SSE": # Shanghai return f"{symbol}.SS" elif exchange == "SZSE": return f"{symbol}.SZ" elif exchange == "CRYPTO": return f"{symbol}-USD" elif exchange in ["NASDAQ", "NYSE", "AMEX"]: return symbol else: return symbol def convert_to_internal_ticker( self, source_ticker: str, default_exchange: Optional[str] = None ) -> str: """Convert Yahoo Finance format to EXCHANGE:SYMBOL.""" # Special handling for indices from yfinance - remove ^ prefix if source_ticker.startswith("^"): symbol = source_ticker[1:] # Remove ^ prefix if default_exchange: return f"{default_exchange}:{symbol}" return f"UNKNOWN:{symbol}" # Special handling for crypto from yfinance - remove currency suffix if "-USD" in source_ticker: crypto_symbol = source_ticker.split("-")[0].upper() return f"CRYPTO:{crypto_symbol}" # Special handling for Hong Kong stocks from yfinance if ".HK" in source_ticker: symbol = source_ticker.replace(".HK", "") if symbol.isdigit(): symbol = symbol.zfill(5) return f"HKEX:{symbol}" # Special handling for Shanghai stocks from yfinance if ".SS" in source_ticker: symbol = source_ticker.replace(".SS", "") return f"SSE:{symbol}" # Special handling for Shenzhen stocks from yfinance if ".SZ" in source_ticker: symbol = source_ticker.replace(".SZ", "") return f"SZSE:{symbol}" if default_exchange: return f"{default_exchange}:{source_ticker}" # Default to NASDAQ if no exchange info return f"NASDAQ:{source_ticker}" async def _run(self, func, *args, **kwargs): loop = asyncio.get_event_loop() max_retries = 3 base_delay = 1 for attempt in range(max_retries): try: return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) except Exception as e: error_msg = str(e).lower() if ( "too many requests" in error_msg or "429" in error_msg or "rate limited" in error_msg ): if attempt == max_retries - 1: raise e delay = base_delay * (2**attempt) + ( 0.1 * (asyncio.get_event_loop().time() % 1) ) self.logger.warning( f"Rate limited (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}" ) await asyncio.sleep(delay) else: raise e def _to_yf_ticker(self, ticker: str) -> str: return self.convert_to_source_ticker(ticker) async def get_asset_info(self, ticker: str) -> Optional[Asset]: """Fetch detailed asset information.""" ticker_norm = self._to_yf_ticker(ticker) cache_key = f"yahoo:info:{ticker_norm}" cached = await self.cache.get(cache_key) if cached: return Asset.model_validate(cached) try: ticker_obj = await self._run(yf.Ticker, ticker_norm) info = await self._run(lambda: ticker_obj.info) if not info or "symbol" not in info: return None # Map Yahoo info to Asset model exchange_map = { "NMS": "NASDAQ", "NYQ": "NYSE", "ASE": "AMEX", "HKG": "HKEX", } yf_exchange = info.get("exchange", "") exchange = exchange_map.get(yf_exchange, yf_exchange) asset = Asset( ticker=ticker, asset_type=AssetType.STOCK, # Default to STOCK name=info.get("longName") or info.get("shortName") or ticker, market_info=MarketInfo( exchange=exchange, country=info.get("country", "US"), currency=info.get("currency", "USD"), timezone=info.get("timeZoneShortName", "UTC"), market_status=MarketStatus.UNKNOWN, ), source_mappings={DataSource.YAHOO: ticker_norm}, properties={ "sector": info.get("sector"), "industry": info.get("industry"), "website": info.get("website"), "description": info.get("longBusinessSummary"), }, ) await self.cache.set(cache_key, asset.model_dump(), ttl=3600) return asset except Exception as e: self.logger.warning(f"Failed to fetch asset info for {ticker}: {e}") return None async def get_real_time_price(self, ticker: str) -> Optional[AssetPrice]: """Fetch current price.""" ticker_norm = self._to_yf_ticker(ticker) cache_key = f"yahoo:price:{ticker_norm}" cached = await self.cache.get(cache_key) if cached: # Reconstruct AssetPrice from cached dict return AssetPrice.from_dict(cached) try: ticker_obj = await self._run(yf.Ticker, ticker_norm) info = await self._run(lambda: ticker_obj.info) price = ( info.get("currentPrice") or info.get("regularMarketPrice") or info.get("ask") ) if price is None: return None asset_price = AssetPrice( ticker=ticker, price=Decimal(str(price)), currency=info.get("currency", "USD"), timestamp=datetime.utcnow(), volume=Decimal(str(info.get("volume", 0))), open_price=( Decimal(str(info.get("open", 0))) if info.get("open") else None ), high_price=( Decimal(str(info.get("dayHigh", 0))) if info.get("dayHigh") else None ), low_price=( Decimal(str(info.get("dayLow", 0))) if info.get("dayLow") else None ), close_price=( Decimal(str(info.get("previousClose", 0))) if info.get("previousClose") else None ), change=None, # Calculate if needed change_percent=None, market_cap=( Decimal(str(info.get("marketCap", 0))) if info.get("marketCap") else None ), source=DataSource.YAHOO, ) # Calculate change if possible if asset_price.close_price and asset_price.price: asset_price.change = asset_price.price - asset_price.close_price asset_price.change_percent = ( asset_price.change / asset_price.close_price ) * 100 # Cache as dict await self.cache.set(cache_key, asset_price.to_dict(), ttl=60) return asset_price except Exception as e: self.logger.warning(f"Failed to fetch price for {ticker}: {e}") return None async def get_historical_prices( self, ticker: str, start_date: datetime, end_date: datetime, interval: str = "1d", ) -> List[AssetPrice]: """Fetch historical prices.""" ticker_norm = self._to_yf_ticker(ticker) start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") cache_key = f"yahoo:history:{ticker_norm}:{start_str}:{end_str}:{interval}" cached = await self.cache.get(cache_key) if cached: return [AssetPrice.from_dict(item) for item in cached] try: ticker_obj = await self._run(yf.Ticker, ticker_norm) hist = await self._run( ticker_obj.history, start=start_str, end=end_str, interval=interval ) if hist.empty: return [] prices = [] for idx, row in hist.iterrows(): # idx is Timestamp timestamp = idx.to_pydatetime() price = AssetPrice( ticker=ticker, price=Decimal(str(row["Close"])), currency="USD", # Default, might need to fetch from info timestamp=timestamp, volume=Decimal(str(row["Volume"])), open_price=Decimal(str(row["Open"])), high_price=Decimal(str(row["High"])), low_price=Decimal(str(row["Low"])), close_price=Decimal(str(row["Close"])), source=DataSource.YAHOO, ) prices.append(price) # Cache list of dicts await self.cache.set(cache_key, [p.to_dict() for p in prices], ttl=3600) return prices except Exception as e: self.logger.error(f"Failed to fetch history for {ticker}: {e}") return [] async def search_assets(self, query: AssetSearchQuery) -> List[AssetSearchResult]: """Search for assets (Yahoo doesn't support direct search well).""" return [] async def get_financials(self, ticker: str) -> Dict[str, Any]: """Fetch financial statements.""" # Keep existing implementation but ensure it works with new base class # ... (Same implementation as before, just copied over) cache_key = f"yahoo:financials:{ticker}" cached = await self.cache.get(cache_key) if cached: return cached ticker_norm = self._to_yf_ticker(ticker) try: ticker_obj = await self._run(yf.Ticker, ticker_norm) def fetch_financial_data(): balance_sheet = ticker_obj.balance_sheet income_statement = ticker_obj.financials cash_flow = ticker_obj.cashflow info = ticker_obj.info return balance_sheet, income_statement, cash_flow, info balance_sheet, income_statement, cash_flow, info = await self._run( fetch_financial_data ) company_info = { "公司名称": info.get("longName", info.get("shortName", "")), "股票代码": ticker_norm, "行业": info.get("industry", ""), "板块": info.get("sector", ""), "国家": info.get("country", ""), "网站": info.get("website", ""), "总市值": info.get("marketCap", 0), "员工人数": info.get("fullTimeEmployees", 0), "公司简介": info.get("longBusinessSummary", "")[:200], } # Convert DataFrames to JSON-serializable format def df_to_serializable(df): if df.empty: return {} # Reset index to convert Timestamp index to column df_reset = df.reset_index() # Convert to dict with orient='records' return df_reset.to_dict(orient='records') # Clean function to handle Timestamp and other non-serializable objects def clean_for_json(obj): """Recursively clean object for JSON serialization.""" import math from datetime import datetime if isinstance(obj, dict): return {str(k): clean_for_json(v) for k, v in obj.items()} elif isinstance(obj, list): return [clean_for_json(item) for item in obj] elif isinstance(obj, (pd.Timestamp, datetime)): return obj.isoformat() if hasattr(obj, 'isoformat') else str(obj) elif isinstance(obj, float) and (math.isnan(obj) or math.isinf(obj)): return None elif isinstance(obj, (int, float, str, bool, type(None))): return obj else: return str(obj) result = { "balance_sheet": clean_for_json(df_to_serializable(balance_sheet)), "income_statement": clean_for_json(df_to_serializable(income_statement)), "cash_flow": clean_for_json(df_to_serializable(cash_flow)), "financial_indicators": None, "company_info": company_info, "_raw_info": clean_for_json(info), } await self.cache.set(cache_key, result, ttl=3600) return result except Exception as e: self.logger.error(f"Failed to fetch financials for {ticker}: {e}") raise ValueError(f"Failed to fetch financials for {ticker}: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server