Skip to main content
Glama
finnhub_adapter.py19.4 kB
# src/server/domain/adapters/finnhub_adapter.py """FinnHub adapter for US stock news and data. Provides news data for US stocks using FinnHub API. All methods are async via asyncio.run_in_executor to avoid blocking the event loop. """ import asyncio from datetime import datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional from src.server.domain.adapters.base import BaseDataAdapter from src.server.domain.types import ( AdapterCapability, Asset, AssetPrice, AssetSearchQuery, AssetSearchResult, AssetType, DataSource, Exchange, MarketInfo, MarketStatus, ) from src.server.utils.logger import logger class FinnhubAdapter(BaseDataAdapter): """FinnHub adapter for US market news data.""" name = "finnhub" def __init__(self, finnhub_conn, cache): super().__init__(DataSource.FINNHUB) self.finnhub_conn = finnhub_conn self.cache = cache self.logger = logger def get_capabilities(self) -> List[AdapterCapability]: """Declare FinnHub's capabilities - US exchanges only.""" return [ AdapterCapability( asset_type=AssetType.STOCK, exchanges={Exchange.NASDAQ, Exchange.NYSE, Exchange.AMEX}, ), ] def convert_to_source_ticker(self, internal_ticker: str) -> str: """Convert EXCHANGE:SYMBOL to FinnHub format.""" if ":" in internal_ticker: _, symbol = internal_ticker.split(":", 1) return symbol return internal_ticker def convert_to_internal_ticker( self, source_ticker: str, default_exchange: Optional[str] = None ) -> str: """Convert FinnHub format to EXCHANGE:SYMBOL.""" if ":" in source_ticker: return source_ticker exchange = default_exchange or "NASDAQ" return f"{exchange}:{source_ticker}" async def _run(self, func, *args, **kwargs): """Run sync function in executor.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) async def get_asset_info(self, ticker: str) -> Optional[Asset]: """Fetch asset info using Finnhub company profile.""" cache_key = f"finnhub:info:{ticker}" cached = await self.cache.get(cache_key) if cached: return Asset.model_validate(cached) symbol = self.convert_to_source_ticker(ticker) session = self.finnhub_conn.get_client() if not session: return None base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() try: def fetch_profile(): url = f"{base_url}/stock/profile2" params = {"symbol": symbol, "token": api_key} resp = session.get(url, params=params, timeout=10) if resp.status_code == 200: return resp.json() return {} profile = await self._run(fetch_profile) if not profile: return None asset = Asset( ticker=ticker, asset_type=AssetType.STOCK, name=profile.get("name", ticker), market_info=MarketInfo( exchange=profile.get("exchange", "US"), country=profile.get("country", "US"), currency=profile.get("currency", "USD"), timezone="UTC", market_status=MarketStatus.UNKNOWN, ), source_mappings={DataSource.FINNHUB: symbol}, properties={ "industry": profile.get("finnhubIndustry"), "ipo": profile.get("ipo"), "weburl": profile.get("weburl"), "market_cap": profile.get("marketCapitalization"), }, ) await self.cache.set(cache_key, asset.model_dump(), ttl=3600) return asset except Exception as e: self.logger.warning(f"Failed to fetch asset info for {ticker}: {e}") return None async def get_real_time_price(self, ticker: str) -> Optional[AssetPrice]: """Fetch current price (Quote).""" cache_key = f"finnhub:price:{ticker}" cached = await self.cache.get(cache_key) if cached: return AssetPrice.from_dict(cached) symbol = self.convert_to_source_ticker(ticker) session = self.finnhub_conn.get_client() if not session: return None base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() try: def fetch_quote(): url = f"{base_url}/quote" params = {"symbol": symbol, "token": api_key} resp = session.get(url, params=params, timeout=10) if resp.status_code == 200: return resp.json() return {} quote = await self._run(fetch_quote) if not quote or "c" not in quote: return None # Quote data: c (current), d (change), dp (percent), h, l, o, pc (prev close), t (timestamp) timestamp = datetime.fromtimestamp( quote.get("t", datetime.utcnow().timestamp()) ) asset_price = AssetPrice( ticker=ticker, price=Decimal(str(quote["c"])), currency="USD", # Assumption timestamp=timestamp, volume=None, # Not in quote endpoint open_price=Decimal(str(quote["o"])) if quote.get("o") else None, high_price=Decimal(str(quote["h"])) if quote.get("h") else None, low_price=Decimal(str(quote["l"])) if quote.get("l") else None, close_price=Decimal(str(quote["pc"])) if quote.get("pc") else None, change=Decimal(str(quote["d"])) if quote.get("d") else None, change_percent=Decimal(str(quote["dp"])) if quote.get("dp") else None, source=DataSource.FINNHUB, ) await self.cache.set(cache_key, asset_price.to_dict(), ttl=60) return asset_price except Exception as e: self.logger.warning(f"Failed to fetch price for {ticker}: {e}") return None async def get_historical_prices( self, ticker: str, start_date: datetime, end_date: datetime, interval: str = "1d", ) -> List[AssetPrice]: """Fetch historical prices (Candles).""" # Finnhub candles: /stock/candle?symbol=...&resolution=D&from=...&to=... # Resolution: 1, 5, 15, 30, 60, D, W, M res_map = {"1d": "D", "1w": "W", "1m": "M"} res = res_map.get(interval, "D") symbol = self.convert_to_source_ticker(ticker) session = self.finnhub_conn.get_client() if not session: self.logger.warning("Finnhub session not available (missing API key?)") return [] base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() start_ts = int(start_date.timestamp()) end_ts = int(end_date.timestamp()) try: def fetch_candles(): url = f"{base_url}/stock/candle" params = { "symbol": symbol, "resolution": res, "from": start_ts, "to": end_ts, "token": api_key, } self.logger.debug(f"Finnhub request: {url} params={params}") resp = session.get(url, params=params, timeout=10) if resp.status_code == 200: data = resp.json() self.logger.debug(f"Finnhub response: {data}") return data self.logger.warning(f"Finnhub error: {resp.status_code} {resp.text}") return {} data = await self._run(fetch_candles) if not data or data.get("s") != "ok": return [] # Data: c, h, l, o, s, t, v prices = [] for i in range(len(data["t"])): timestamp = datetime.fromtimestamp(data["t"][i]) price = AssetPrice( ticker=ticker, price=Decimal(str(data["c"][i])), currency="USD", timestamp=timestamp, volume=Decimal(str(data["v"][i])), open_price=Decimal(str(data["o"][i])), high_price=Decimal(str(data["h"][i])), low_price=Decimal(str(data["l"][i])), close_price=Decimal(str(data["c"][i])), source=DataSource.FINNHUB, ) prices.append(price) return prices except Exception as e: self.logger.error(f"Failed to fetch history for {ticker}: {e}") return [] async def search_assets(self, query: AssetSearchQuery) -> List[AssetSearchResult]: """Search for assets.""" session = self.finnhub_conn.get_client() if not session: return [] base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() try: def fetch_search(): url = f"{base_url}/search" params = {"q": query.query, "token": api_key} resp = session.get(url, params=params, timeout=10) if resp.status_code == 200: return resp.json() return {} data = await self._run(fetch_search) if not data or "result" not in data: return [] results = [] for item in data["result"][: query.limit]: # Filter out non-US if needed, or just return all symbol = item.get("symbol") if not symbol or "." in symbol: # Simple filter for common stocks continue results.append( AssetSearchResult( ticker=f"NASDAQ:{symbol}", # Default assumption asset_type=AssetType.STOCK, name=item.get("description", ""), exchange="US", # Finnhub doesn't always give exchange in search country="US", relevance_score=1.0, # No score provided ) ) return results except Exception as e: self.logger.warning(f"Search failed: {e}") return [] async def get_filings( self, ticker: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 10, filing_types: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """Fetch filings from Finnhub.""" symbol = self.convert_to_source_ticker(ticker) session = self.finnhub_conn.get_client() if not session: return [] base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() try: def fetch_filings(): url = f"{base_url}/stock/filings" params = {"symbol": symbol, "token": api_key} if start_date: params["from"] = start_date.strftime("%Y-%m-%d") if end_date: params["to"] = end_date.strftime("%Y-%m-%d") resp = session.get(url, params=params, timeout=10) if resp.status_code == 200: return resp.json() return [] data = await self._run(fetch_filings) if not data: return [] filings = [] for item in data: # Filter by filing types if provided if filing_types: form = item.get("form", "") if form not in filing_types: continue accession_number = item.get("accessionNumber") filing_url = item.get("filingUrl") # Fallback: extract accessionNumber from URL if missing if not accession_number and filing_url: import re # Pattern matches standard SEC accession number format in URL # e.g. .../0001104659-25-115949-index.html match = re.search(r"(\d{10}-\d{2}-\d{6})", filing_url) if match: accession_number = match.group(1) filings.append( { "accessionNumber": accession_number, "symbol": item.get("symbol"), "filingDate": item.get("filedDate"), "reportDate": item.get("reportDate"), "form": item.get("form"), "filingUrl": filing_url, "description": item.get("description"), } ) if len(filings) >= limit: break return filings except Exception as e: self.logger.error(f"Failed to fetch filings: {e}") return [] async def get_news(self, ticker: str, limit: int = 20) -> List[Dict[str, Any]]: """Fetch news from FinnHub API.""" # Keep existing implementation cache_key = f"finnhub:news:{ticker}:{limit}" cached = await self.cache.get(cache_key) if cached: return cached session = self.finnhub_conn.get_client() if session is None: raise RuntimeError("FinnHub connection not established") symbol = self.convert_to_source_ticker(ticker) end_date = datetime.now() start_date = end_date - timedelta(days=30) def fetch_finnhub_news(): base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() url = f"{base_url}/company-news" params = { "symbol": symbol, "from": start_date.strftime("%Y-%m-%d"), "to": end_date.strftime("%Y-%m-%d"), "token": api_key, } response = session.get(url, params=params, timeout=10) if response.status_code == 200: return response.json() return [] try: data = await self._run(fetch_finnhub_news) if not data: return [] news_list = [] for item in data[:limit]: try: timestamp = item.get("datetime", 0) publish_time = datetime.fromtimestamp(timestamp).strftime( "%Y-%m-%d %H:%M:%S" ) news_list.append( { "title": item.get("headline", ""), "url": item.get("url", ""), "publish_time": publish_time, "source": item.get("source", "FinnHub"), "snippet": item.get("summary", "")[:200], } ) except Exception: continue await self.cache.set(cache_key, news_list, ttl=3600) return news_list except Exception as e: self.logger.error(f"Failed to fetch news: {e}") return [] async def get_financials(self, ticker: str) -> Dict[str, Any]: """Fetch financial data using Finnhub API.""" # Keep existing implementation cache_key = f"finnhub:financials:{ticker}" cached = await self.cache.get(cache_key) if cached: return cached symbol = self.convert_to_source_ticker(ticker) session = self.finnhub_conn.get_client() if session is None: raise RuntimeError("Finnhub connection not established") base_url = self.finnhub_conn.get_base_url() api_key = self.finnhub_conn.get_api_key() try: def fetch_finnhub_data(): profile_url = f"{base_url}/stock/profile2" profile_params = {"symbol": symbol, "token": api_key} profile_resp = session.get( profile_url, params=profile_params, timeout=10 ) profile = profile_resp.json() if profile_resp.status_code == 200 else {} metrics_url = f"{base_url}/stock/metric" metrics_params = {"symbol": symbol, "metric": "all", "token": api_key} metrics_resp = session.get( metrics_url, params=metrics_params, timeout=10 ) metrics = metrics_resp.json() if metrics_resp.status_code == 200 else {} return profile, metrics profile, metrics = await self._run(fetch_finnhub_data) metric_data = metrics.get("metric", {}) company_info = { "公司名称": profile.get("name", ""), "股票代码": symbol, "行业": profile.get("finnhubIndustry", ""), "国家": profile.get("country", ""), "网站": profile.get("weburl", ""), "总市值": profile.get("marketCapitalization", 0) * 1_000_000, "员工人数": profile.get("shareOutstanding", 0), "上市时间": profile.get("ipo", ""), "交易所": profile.get("exchange", ""), } result = { "balance_sheet": None, "income_statement": None, "cash_flow": None, "financial_indicators": None, "company_info": company_info, "_raw_info": { "marketCap": profile.get("marketCapitalization", 0) * 1_000_000, "peRatio": metric_data.get("peNormalizedAnnual", 0), "pbRatio": metric_data.get("pbAnnual", 0), "dividendYield": metric_data.get("dividendYieldIndicatedAnnual", 0), "roe": metric_data.get("roeTTM", 0), "roa": metric_data.get("roaTTM", 0), "currentRatio": metric_data.get("currentRatioAnnual", 0), "quickRatio": metric_data.get("quickRatioAnnual", 0), "debtEquity": metric_data.get("totalDebt/totalEquityAnnual", 0), "revenueGrowth": metric_data.get("revenueGrowthTTMYoy", 0), "epsGrowth": metric_data.get("epsGrowthTTMYoy", 0), }, } await self.cache.set(cache_key, result, ttl=3600) return result except Exception as e: self.logger.error(f"Failed to fetch financials for {ticker}: {e}") raise ValueError(f"Failed to fetch financials for {ticker}: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server