Skip to main content
Glama
base.py6.59 kB
# src/server/domain/adapters/base.py """Abstract base class for data adapters. Each adapter must implement async methods to fetch price, history and optional search, returning structured data models. """ import abc import logging from datetime import datetime from typing import Any, Dict, List, Optional, Set from src.server.domain.types import ( AdapterCapability, Asset, AssetPrice, AssetSearchQuery, AssetSearchResult, AssetType, DataSource, Exchange, ) logger = logging.getLogger(__name__) class BaseDataAdapter(abc.ABC): """Abstract base class for all data source adapters. Each adapter must implement: - get_asset_info: Fetch detailed asset info - get_real_time_price: Fetch current price - get_historical_prices: Fetch historical data - search_assets: Search for assets - get_capabilities: Declare supported asset types and exchanges """ name: str source: DataSource def __init__(self, source: DataSource, **kwargs): """Initialize adapter with data source and configuration. Args: source: Data source identifier **kwargs: Additional configuration parameters """ self.source = source self.config = kwargs self.logger = logging.getLogger(f"{__name__}.{source.value}") @abc.abstractmethod async def get_asset_info(self, ticker: str) -> Optional[Asset]: """Fetch detailed information for an asset. Args: ticker: Asset ticker in internal format Returns: Asset object or None if not found """ pass @abc.abstractmethod async def get_real_time_price(self, ticker: str) -> Optional[AssetPrice]: """Fetch current price for ticker. Args: ticker: Asset ticker in internal format Returns: Current price data or None if not found """ pass async def get_multiple_prices( self, tickers: List[str] ) -> Dict[str, Optional[AssetPrice]]: """Fetch prices for multiple tickers efficiently. Default implementation calls get_real_time_price sequentially. Adapters should override this if they support batch fetching. Args: tickers: List of asset tickers Returns: Dictionary mapping tickers to price data """ results = {} for ticker in tickers: try: results[ticker] = await self.get_real_time_price(ticker) except Exception as e: self.logger.warning(f"Failed to fetch price for {ticker}: {e}") results[ticker] = None return results @abc.abstractmethod async def get_historical_prices( self, ticker: str, start_date: datetime, end_date: datetime, interval: str = "1d", ) -> List[AssetPrice]: """Fetch historical price data. Args: ticker: Asset ticker in internal format start_date: Start date end_date: End date interval: Data interval Returns: List of historical price data """ pass @abc.abstractmethod async def search_assets(self, query: AssetSearchQuery) -> List[AssetSearchResult]: """Search for assets. Args: query: Search query parameters Returns: List of search results """ pass async def get_filings( self, ticker: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, limit: int = 10, filing_types: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """Get regulatory filings/announcements.""" raise NotImplementedError(f"{self.name} does not support filings retrieval") async def get_financials(self, ticker: str) -> Dict[str, Any]: """Optional method to fetch fundamental/financial data. Args: ticker: Asset ticker in internal format Returns: Dictionary containing financial statements and metrics """ raise NotImplementedError( f"{self.source.value} does not support get_financials" ) @abc.abstractmethod def get_capabilities(self) -> List[AdapterCapability]: """Get capabilities describing supported types and exchanges. Returns: List of capabilities """ pass def get_supported_asset_types(self) -> List[AssetType]: """Get list of asset types supported by this adapter.""" capabilities = self.get_capabilities() asset_types = set() for cap in capabilities: asset_types.add(cap.asset_type) return list(asset_types) def get_supported_exchanges(self) -> Set[Exchange]: """Get set of all exchanges supported by this adapter.""" capabilities = self.get_capabilities() exchanges: Set[Exchange] = set() for cap in capabilities: exchanges.update(cap.exchanges) return exchanges def validate_ticker(self, ticker: str) -> bool: """Validate if ticker format is supported by this adapter. Args: ticker: Ticker in internal format (e.g., "NASDAQ:AAPL") Returns: True if ticker is valid for this adapter """ try: if ":" not in ticker: return False exchange, _ = ticker.split(":", 1) capabilities = self.get_capabilities() # Check if any capability supports this exchange return any( cap.supports_exchange(Exchange(exchange)) for cap in capabilities ) except Exception: return False @abc.abstractmethod def convert_to_source_ticker(self, internal_ticker: str) -> str: """Convert internal ticker to data source format. Args: internal_ticker: Ticker in internal format Returns: Ticker in data source specific format """ pass @abc.abstractmethod def convert_to_internal_ticker( self, source_ticker: str, default_exchange: Optional[str] = None ) -> str: """Convert data source ticker to internal format. Args: source_ticker: Ticker in data source format default_exchange: Default exchange if not determinable Returns: Ticker in internal format """ pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server