Skip to main content
Glama
adapter_manager.py25.7 kB
# src/server/domain/adapter_manager.py """AdapterManager for coordinating multiple data source adapters. This manager routes ticker symbols to appropriate adapters based on capabilities, with support for caching, failover, and LLM-based fallback search. Aligned with ValueCell's architecture. """ import json import logging import threading import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Dict, List, Optional from src.server.domain.adapters.base import BaseDataAdapter from src.server.domain.types import ( Asset, AssetPrice, AssetSearchQuery, AssetSearchResult, AssetType, DataSource, Exchange, ) logger = logging.getLogger(__name__) class AdapterManager: """Manager for coordinating multiple asset data adapters. Provides unified interface for: - Asset search - Real-time prices - Historical prices - Asset information - Batch operations """ def __init__(self): """Initialize adapter manager.""" self.adapters: Dict[DataSource, BaseDataAdapter] = {} # Maintain registration order for priority routing self._adapter_order: List[BaseDataAdapter] = [] # Exchange → Adapters routing table (maintains registration order) self.exchange_routing: Dict[str, List[BaseDataAdapter]] = {} # Ticker → Adapter cache for fast lookups self._ticker_cache: Dict[str, BaseDataAdapter] = {} self._cache_lock = threading.Lock() self.lock = threading.RLock() logger.info("Asset adapter manager initialized") def _rebuild_routing_table(self) -> None: """Rebuild routing table based on registered adapters' capabilities. Adapters are processed in registration order to maintain priority. """ with self.lock: self.exchange_routing.clear() # Use _adapter_order to maintain registration order for adapter in self._adapter_order: capabilities = adapter.get_capabilities() # Get all exchanges supported by this adapter supported_exchanges = set() for cap in capabilities: for exchange in cap.exchanges: exchange_key = ( exchange.value if isinstance(exchange, Exchange) else exchange ) supported_exchanges.add(exchange_key) # Register adapter for each supported exchange for exchange_key in supported_exchanges: if exchange_key not in self.exchange_routing: self.exchange_routing[exchange_key] = [] self.exchange_routing[exchange_key].append(adapter) # Clear ticker cache when routing table changes with self._cache_lock: self._ticker_cache.clear() logger.debug( f"Routing table rebuilt with {len(self.exchange_routing)} exchanges" ) def register_adapter(self, adapter: BaseDataAdapter) -> None: """Register a data adapter and rebuild routing table. Adapters are prioritized in registration order. For exchanges with multiple adapters, the first registered adapter will be tried first. Args: adapter: Data adapter instance to register """ with self.lock: self.adapters[adapter.source] = adapter self._adapter_order.append(adapter) self._rebuild_routing_table() logger.info(f"Registered adapter: {adapter.source.value}") def get_available_adapters(self) -> List[DataSource]: """Get list of available data adapters.""" with self.lock: return list(self.adapters.keys()) def get_adapters_for_exchange(self, exchange: str) -> List[BaseDataAdapter]: """Get list of adapters for a specific exchange. Args: exchange: Exchange identifier (e.g., "NASDAQ", "SSE") Returns: List of adapters that support the exchange """ with self.lock: return self.exchange_routing.get(exchange, []) def get_adapters_for_asset_type( self, asset_type: AssetType ) -> List[BaseDataAdapter]: """Get list of adapters that support a specific asset type. Args: asset_type: Type of asset Returns: List of adapters that support this asset type """ with self.lock: supporting_adapters = set() for adapter in self.adapters.values(): supported_types = adapter.get_supported_asset_types() if asset_type in supported_types: supporting_adapters.add(adapter) return list(supporting_adapters) def get_adapter_for_ticker(self, ticker: str) -> Optional[BaseDataAdapter]: """Get the best adapter for a specific ticker (with caching). Args: ticker: Asset ticker in internal format (e.g., "NASDAQ:AAPL") Returns: Best available adapter for the ticker or None if not found """ # Check cache first with self._cache_lock: if ticker in self._ticker_cache: return self._ticker_cache[ticker] # Parse ticker if ":" not in ticker: logger.warning(f"Invalid ticker format (missing ':'): {ticker}") return None exchange, symbol = ticker.split(":", 1) # Get adapters for this exchange adapters = self.get_adapters_for_exchange(exchange) if not adapters: logger.debug(f"No adapters registered for exchange: {exchange}") return None # Find first adapter that validates this ticker for adapter in adapters: if adapter.validate_ticker(ticker): # Cache the result with self._cache_lock: self._ticker_cache[ticker] = adapter logger.debug(f"Matched adapter {adapter.source.value} for {ticker}") return adapter logger.warning(f"No suitable adapter found for ticker: {ticker}") return None def _deduplicate_search_results( self, results: List[AssetSearchResult] ) -> List[AssetSearchResult]: """Smart deduplication of search results to handle cross-exchange duplicates. This method handles cases where the same asset appears on multiple exchanges (e.g., AMEX:GORO vs NASDAQ:GORO). It prioritizes certain exchanges and removes likely duplicates based on symbol matching. Args: results: List of search results to deduplicate Returns: Deduplicated list of search results """ # Exchange priority for US stocks (higher number = higher priority) exchange_priority = { "NASDAQ": 3, "NYSE": 2, "AMEX": 1, "HKEX": 3, "SSE": 2, "SZSE": 2, "BSE": 1, } seen_tickers = set() # Map: (symbol, country) -> best result so far symbol_map: Dict[tuple, AssetSearchResult] = {} for result in results: # Skip exact ticker duplicates if result.ticker in seen_tickers: continue try: exchange, symbol = result.ticker.split(":", 1) except ValueError: logger.warning( f"Invalid ticker format in search result: {result.ticker}" ) continue # Create a key for cross-exchange deduplication dedup_key = (symbol.upper(), result.country) # Check if we've seen this symbol in the same country before if dedup_key in symbol_map: existing_result = symbol_map[dedup_key] existing_exchange = existing_result.ticker.split(":")[0] # Compare exchange priorities current_priority = exchange_priority.get(exchange, 0) existing_priority = exchange_priority.get(existing_exchange, 0) if current_priority > existing_priority: symbol_map[dedup_key] = result logger.debug( f"Preferring {result.ticker} over {existing_result.ticker} (priority)" ) elif current_priority == existing_priority: # Same priority, prefer higher relevance score if result.relevance_score > existing_result.relevance_score: symbol_map[dedup_key] = result logger.debug( f"Preferring {result.ticker} over {existing_result.ticker} (relevance)" ) else: symbol_map[dedup_key] = result seen_tickers.add(result.ticker) # Convert map back to list unique_results = list(symbol_map.values()) # Sort by relevance score (descending) unique_results.sort(key=lambda x: x.relevance_score, reverse=True) logger.info( f"Deduplicated {len(results)} results to {len(unique_results)} unique assets" ) return unique_results async def search_assets(self, query: AssetSearchQuery) -> List[AssetSearchResult]: """Search for assets across all available adapters. Args: query: Search query parameters Returns: Combined and deduplicated search results """ all_results = [] # Use all available adapters with self.lock: target_adapters = list(self.adapters.values()) if not target_adapters: return [] # Search in parallel across adapters tasks = [adapter.search_assets(query) for adapter in target_adapters] results_list = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results_list): adapter = target_adapters[i] if isinstance(result, Exception): logger.warning( f"Search failed for adapter {adapter.source.value}: {result}" ) else: all_results.extend(result) # Smart deduplication of results unique_results = self._deduplicate_search_results(all_results) # TODO: Add LLM fallback search if no results found # if len(unique_results) == 0: # fallback_results = self._fallback_search_assets(query) # unique_results = self._deduplicate_search_results(fallback_results) return unique_results[: query.limit] async def get_asset_info(self, ticker: str) -> Optional[Asset]: """Get detailed asset information with automatic failover. Args: ticker: Asset ticker in internal format Returns: Asset information or None if not found """ adapter = self.get_adapter_for_ticker(ticker) if not adapter: logger.warning(f"No suitable adapter found for ticker: {ticker}") return None # Try the primary adapter try: logger.debug( f"Fetching asset info for {ticker} from {adapter.source.value}" ) asset_info = await adapter.get_asset_info(ticker) if asset_info: logger.info( f"Successfully fetched asset info for {ticker} from {adapter.source.value}" ) return asset_info except Exception as e: logger.warning( f"Primary adapter {adapter.source.value} failed for {ticker}: {e}" ) # Automatic failover: try other adapters for this exchange exchange = ticker.split(":")[0] if ":" in ticker else "" fallback_adapters = self.get_adapters_for_exchange(exchange) for fallback_adapter in fallback_adapters: if fallback_adapter.source == adapter.source: continue if not fallback_adapter.validate_ticker(ticker): continue try: logger.debug( f"Fallback: trying {fallback_adapter.source.value} for {ticker}" ) asset_info = await fallback_adapter.get_asset_info(ticker) if asset_info: logger.info( f"Fallback success: fetched asset info for {ticker} from {fallback_adapter.source.value}" ) # Update cache to use successful adapter with self._cache_lock: self._ticker_cache[ticker] = fallback_adapter return asset_info except Exception as e: logger.warning( f"Fallback adapter {fallback_adapter.source.value} failed for {ticker}: {e}" ) continue logger.error(f"All adapters failed for {ticker}") return None async def get_real_time_price(self, ticker: str) -> Optional[AssetPrice]: """Get real-time price for an asset with automatic failover. Args: ticker: Asset ticker in internal format Returns: Current price data or None if not available """ adapter = self.get_adapter_for_ticker(ticker) if not adapter: logger.warning(f"No suitable adapter found for ticker: {ticker}") return None # Try the primary adapter try: logger.debug(f"Fetching price for {ticker} from {adapter.source.value}") price = await adapter.get_real_time_price(ticker) if price: logger.info( f"Successfully fetched price for {ticker} from {adapter.source.value}" ) return price except Exception as e: logger.warning( f"Primary adapter {adapter.source.value} failed for {ticker}: {e}" ) # Automatic failover exchange = ticker.split(":")[0] if ":" in ticker else "" fallback_adapters = self.get_adapters_for_exchange(exchange) for fallback_adapter in fallback_adapters: if fallback_adapter.source == adapter.source: continue if not fallback_adapter.validate_ticker(ticker): continue try: logger.debug( f"Fallback: trying {fallback_adapter.source.value} for {ticker}" ) price = await fallback_adapter.get_real_time_price(ticker) if price: logger.info( f"Fallback success: fetched price for {ticker} from {fallback_adapter.source.value}" ) with self._cache_lock: self._ticker_cache[ticker] = fallback_adapter return price except Exception as e: logger.warning( f"Fallback adapter {fallback_adapter.source.value} failed for {ticker}: {e}" ) continue logger.error(f"All adapters failed for {ticker}") return None async def get_multiple_prices( self, tickers: List[str] ) -> Dict[str, Optional[AssetPrice]]: """Get real-time prices for multiple assets efficiently with automatic failover. Args: tickers: List of asset tickers Returns: Dictionary mapping tickers to price data """ # Group tickers by adapter adapter_tickers: Dict[BaseDataAdapter, List[str]] = {} for ticker in tickers: adapter = self.get_adapter_for_ticker(ticker) if adapter: if adapter not in adapter_tickers: adapter_tickers[adapter] = [] adapter_tickers[adapter].append(ticker) # Fetch prices in parallel from each adapter all_results = {} failed_tickers = [] if not adapter_tickers: return {ticker: None for ticker in tickers} tasks = [] adapters_list = [] for adapter, ticker_list in adapter_tickers.items(): tasks.append(adapter.get_multiple_prices(ticker_list)) adapters_list.append((adapter, ticker_list)) results_list = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results_list): adapter, ticker_list = adapters_list[i] if isinstance(result, Exception): logger.warning( f"Batch price fetch failed for adapter {adapter.source.value}: {result}" ) failed_tickers.extend(ticker_list) else: # result is Dict[str, Optional[AssetPrice]] for ticker, price in result.items(): if price is not None: all_results[ticker] = price else: failed_tickers.append(ticker) # Retry failed tickers individually with fallback adapters if failed_tickers: logger.info( f"Retrying {len(failed_tickers)} failed tickers with fallback adapters" ) for ticker in failed_tickers: if ticker not in all_results or all_results[ticker] is None: price = await self.get_real_time_price(ticker) all_results[ticker] = price # Ensure all requested tickers are in results for ticker in tickers: if ticker not in all_results: all_results[ticker] = None return all_results async def get_historical_prices( self, ticker: str, start_date: datetime, end_date: datetime, interval: str = "1d", ) -> List[AssetPrice]: """Get historical price data for an asset with automatic failover. Args: ticker: Asset ticker in internal format start_date: Start date for historical data end_date: End date for historical data interval: Data interval Returns: List of historical price data """ adapter = self.get_adapter_for_ticker(ticker) if not adapter: logger.warning(f"No suitable adapter found for ticker: {ticker}") return [] # Try the primary adapter try: logger.debug( f"Fetching historical data for {ticker} from {adapter.source.value}" ) prices = await adapter.get_historical_prices( ticker, start_date, end_date, interval ) if prices: logger.info( f"Successfully fetched {len(prices)} historical prices for {ticker} from {adapter.source.value}" ) return prices except Exception as e: logger.warning( f"Primary adapter {adapter.source.value} failed for historical data of {ticker}: {e}" ) # Automatic failover exchange = ticker.split(":")[0] if ":" in ticker else "" fallback_adapters = self.get_adapters_for_exchange(exchange) for fallback_adapter in fallback_adapters: if fallback_adapter.source == adapter.source: continue if not fallback_adapter.validate_ticker(ticker): continue try: logger.info( f"Fallback: trying {fallback_adapter.source.value} for historical data of {ticker}" ) prices = await fallback_adapter.get_historical_prices( ticker, start_date, end_date, interval ) if prices: logger.info( f"Fallback success: fetched {len(prices)} historical prices for {ticker} from {fallback_adapter.source.value}" ) with self._cache_lock: self._ticker_cache[ticker] = fallback_adapter return prices else: logger.warning( f"Fallback adapter {fallback_adapter.source.value} returned empty data for {ticker}" ) except Exception as e: logger.warning( f"Fallback adapter {fallback_adapter.source.value} failed for historical data of {ticker}: {e}" ) continue logger.error(f"All adapters failed for historical data of {ticker}") return [] async def get_financials(self, ticker: str) -> Dict: """Fetch financial/fundamental data for a ticker. Args: ticker: Asset ticker in internal format Returns: Dictionary containing financial statements and metrics Raises: ValueError: If no adapter found or all adapters failed """ adapter = self.get_adapter_for_ticker(ticker) if not adapter: raise ValueError(f"No adapter found for ticker {ticker}") try: return await adapter.get_financials(ticker) except Exception as e: if isinstance(e, NotImplementedError): logger.warning( f"Adapter {adapter.source.value} does not support financials" ) else: logger.warning(f"Adapter {adapter.source.value} failed: {e}") # Try failover if ":" in ticker: exchange, _ = ticker.split(":", 1) adapters = self.get_adapters_for_exchange(exchange) for alt in adapters: if alt is adapter: continue try: logger.info( f"Trying failover adapter {alt.source.value} for financials of {ticker}" ) return await alt.get_financials(ticker) except Exception as failover_error: logger.warning( f"Failover adapter {alt.source.value} also failed: {failover_error}" ) continue raise ValueError( f"All adapters failed to fetch financials for {ticker}: {e}" ) async def get_filings( self, ticker: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 10, filing_types: Optional[List[str]] = None, ) -> List[Dict]: """Fetch regulatory filings/announcements. Args: ticker: Asset ticker start_date: Start date end_date: End date limit: Max results filing_types: List of filing types to filter Returns: List of filings """ adapter = self.get_adapter_for_ticker(ticker) if not adapter: raise ValueError(f"No adapter found for ticker {ticker}") try: result = await adapter.get_filings( ticker, start_date, end_date, limit, filing_types ) return result except Exception as e: if isinstance(e, NotImplementedError): logger.warning( f"Adapter {adapter.source.value} does not " "support filings" ) else: logger.warning(f"Adapter {adapter.source.value} failed: {e}") # Try failover if ":" in ticker: exchange, _ = ticker.split(":", 1) adapters = self.get_adapters_for_exchange(exchange) for alt in adapters: if alt is adapter: continue try: logger.info( f"Trying failover adapter {alt.source.value} " f"for filings of {ticker}" ) result = await alt.get_filings( ticker, start_date, end_date, limit, filing_types ) return result except Exception as failover_error: logger.warning( f"Failover adapter {alt.source.value} " f"also failed: {failover_error}" ) continue raise ValueError(f"All adapters failed to fetch filings for {ticker}: {e}") # Singleton instance _adapter_manager_instance: Optional[AdapterManager] = None _adapter_manager_lock = threading.Lock() def get_adapter_manager() -> AdapterManager: """Get the singleton AdapterManager instance.""" global _adapter_manager_instance if _adapter_manager_instance is None: with _adapter_manager_lock: if _adapter_manager_instance is None: _adapter_manager_instance = AdapterManager() return _adapter_manager_instance

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server