Skip to main content
Glama
market_data.pyâ€ĸ32.1 kB
""" Market & Historical Data Tools Tools for retrieving factual market data: - Real-time stock quotes - Historical price data - Financial fundamentals - Dividend history and corporate actions """ import yfinance as yf from typing import Dict, Any, List import json import pandas as pd from datetime import datetime, timedelta import numpy as np import logging from models import ( MCPTool, MCPToolInputSchema, StockQuote, StockFundamentals, IncomeStatement, BalanceSheet, CashFlowStatement, FinancialRatios, FundamentalsMetadata, PriceHistory, PriceBar, TotalReturnPoint, PriceAdjustments, DividendsAndActions, DividendRecord, DividendYield, DividendGrowth, CorporateAction ) def get_market_data_tools() -> List[MCPTool]: """Return list of market data and historical tools""" return [ MCPTool( name="get_realtime_quote", title="Real-time Stock Quote", description="Retrieve current market data for a stock including price, volume, market cap, and key financial ratios. Provides up-to-date trading information essential for market analysis and investment decisions.", inputSchema=MCPToolInputSchema( type="object", properties={ "symbol": { "type": "string", "description": "Stock ticker symbol (e.g., 'AAPL' for Apple, 'GOOGL' for Google, 'TSLA' for Tesla)" } }, required=["symbol"] ) ), MCPTool( name="get_fundamentals", title="Financial Statements and Ratios", description="Access comprehensive financial data including income statements, balance sheets, cash flow statements, and calculated financial ratios. Essential for fundamental analysis, valuation, and long-term investment research.", inputSchema=MCPToolInputSchema( type="object", properties={ "instrument": { "type": "string", "description": "Stock ticker symbol (e.g., 'AAPL' for Apple, 'MSFT' for Microsoft)" }, "period": { "type": "string", "description": "Reporting period for financial statements", "enum": ["annual", "quarterly", "ttm"] }, "years": { "type": "integer", "description": "Number of years/periods of historical data to retrieve (default: 5)", "minimum": 1, "maximum": 10, "default": 5 } }, required=["instrument", "period"] ) ), MCPTool( name="get_price_history", title="Historical Price Data", description="Retrieve historical OHLCV (Open, High, Low, Close, Volume) data with optional total return calculation including reinvested dividends. Ideal for backtesting, technical analysis, and performance measurement.", inputSchema=MCPToolInputSchema( type="object", properties={ "instrument": { "type": "string", "description": "Stock ticker symbol (e.g., 'AAPL' for Apple, 'SPY' for SPDR S&P 500 ETF)" }, "start": { "type": "string", "description": "Start date for historical data in YYYY-MM-DD format (e.g., '2023-01-01')", "pattern": "^\\d{4}-\\d{2}-\\d{2}$" }, "end": { "type": "string", "description": "End date for historical data in YYYY-MM-DD format (e.g., '2024-12-31')", "pattern": "^\\d{4}-\\d{2}-\\d{2}$" }, "interval": { "type": "string", "description": "Data frequency: daily (1d), weekly (1w), or monthly (1m). Note: 1m interval limited to last 30 days", "enum": ["1d", "1w", "1m"], "default": "1d" }, "adjusted": { "type": "boolean", "description": "Return split and dividend adjusted prices for accurate historical analysis", "default": True }, "include_total_return": { "type": "boolean", "description": "Calculate total return index assuming dividend reinvestment for performance analysis", "default": True } }, required=["instrument", "start", "end"] ) ), MCPTool( name="get_dividends_and_actions", title="Dividend History and Corporate Actions", description="Analyze dividend payment history and corporate actions with quality metrics. Includes dividend yield calculations, growth rates, consistency scoring, and stock split information for income-focused investment analysis.", inputSchema=MCPToolInputSchema( type="object", properties={ "instrument": { "type": "string", "description": "Stock ticker symbol (e.g., 'KO' for Coca-Cola, 'JNJ' for Johnson & Johnson)" }, "start": { "type": "string", "description": "Start date for dividend history in YYYY-MM-DD format. If not specified, defaults to 10 years ago", "pattern": "^\\d{4}-\\d{2}-\\d{2}$" }, "end": { "type": "string", "description": "End date for dividend history in YYYY-MM-DD format. If not specified, defaults to current date", "pattern": "^\\d{4}-\\d{2}-\\d{2}$" } }, required=["instrument"] ) ), ] def get_realtime_quote(arguments: Dict[str, Any]) -> str: """ Get real-time stock quote using yfinance Args: arguments: Dictionary containing 'symbol' key Returns: JSON string with stock data or error message """ try: symbol = arguments.get("symbol", "").upper().strip() if not symbol: return json.dumps({ "error": "Symbol is required", "symbol": None }) # Get stock data ticker = yf.Ticker(symbol) info = ticker.info hist = ticker.history(period="1d") if hist.empty: return json.dumps({ "error": f"No data found for symbol '{symbol}'. Are you sure this symbol exists?", "symbol": symbol }) # Get current price from the latest close current_price = hist['Close'].iloc[-1] if not hist.empty else None # Calculate change from previous close if available change = None change_percent = None if len(hist) > 1: prev_close = hist['Close'].iloc[-2] change = current_price - prev_close change_percent = (change / prev_close) * 100 if prev_close != 0 else None # Create StockQuote model quote = StockQuote( symbol=symbol, price=float(current_price) if current_price else None, change=float(change) if change else None, change_percent=float(change_percent) if change_percent else None, volume=int(hist['Volume'].iloc[-1]) if not hist.empty and 'Volume' in hist else None, market_cap=info.get('marketCap'), pe_ratio=info.get('trailingPE'), dividend_yield=info.get('dividendYield'), fifty_two_week_high=info.get('fiftyTwoWeekHigh'), fifty_two_week_low=info.get('fiftyTwoWeekLow'), currency=info.get('currency', 'USD') ) return quote.model_dump_json(indent=2) except Exception as e: return json.dumps({ "error": f"Error fetching data for '{symbol}': {str(e)}", "symbol": symbol }) def calculate_financial_ratios(ticker: yf.Ticker, info: Dict[str, Any]) -> FinancialRatios: """Calculate financial ratios from yfinance data""" try: # Get financial statements income_stmt = ticker.financials balance_sheet = ticker.balance_sheet cashflow_stmt = ticker.cashflow ratios = FinancialRatios() # Basic ratios from info ratios.pe_ratio = info.get('trailingPE') ratios.pb_ratio = info.get('priceToBook') ratios.ps_ratio = info.get('priceToSalesTrailing12Months') ratios.ev_ebitda = info.get('enterpriseToEbitda') ratios.roe = info.get('returnOnEquity') ratios.debt_to_equity = info.get('debtToEquity') ratios.current_ratio = info.get('currentRatio') ratios.quick_ratio = info.get('quickRatio') ratios.gross_margin = info.get('grossMargins') ratios.operating_margin = info.get('operatingMargins') ratios.net_margin = info.get('profitMargins') ratios.dividend_payout_ratio = info.get('payoutRatio') # Calculate additional ratios if data available if not income_stmt.empty and not balance_sheet.empty: latest_income = income_stmt.iloc[:, 0] latest_balance = balance_sheet.iloc[:, 0] # ROIC calculation if 'Net Income' in latest_income.index and 'Total Stockholder Equity' in latest_balance.index and 'Long Term Debt' in latest_balance.index: net_income = latest_income.get('Net Income', 0) total_equity = latest_balance.get('Total Stockholder Equity', 0) long_term_debt = latest_balance.get('Long Term Debt', 0) invested_capital = total_equity + long_term_debt if invested_capital != 0: ratios.roic = (net_income / invested_capital) * 100 # FCF Yield calculation if not cashflow_stmt.empty and info.get('marketCap'): latest_cashflow = cashflow_stmt.iloc[:, 0] free_cash_flow = latest_cashflow.get('Free Cash Flow') market_cap = info.get('marketCap') if free_cash_flow and market_cap and market_cap != 0: ratios.fcf_yield = (free_cash_flow / market_cap) * 100 # Asset Turnover if not income_stmt.empty and not balance_sheet.empty: latest_income = income_stmt.iloc[:, 0] latest_balance = balance_sheet.iloc[:, 0] revenue = latest_income.get('Total Revenue') total_assets = latest_balance.get('Total Assets') if revenue and total_assets and total_assets != 0: ratios.asset_turnover = revenue / total_assets return ratios except Exception as e: logging.error(f"Error calculating financial ratios: {str(e)}") return FinancialRatios() def process_financial_statement(df: pd.DataFrame, statement_type: str) -> List[Dict]: """Process a financial statement DataFrame into our format""" if df.empty: return [] statements = [] for col in df.columns: # Convert column name to string date period_ending = col.strftime('%Y-%m-%d') if hasattr(col, 'strftime') else str(col) # Convert data to dictionary with proper handling of NaN values data = {} for idx in df.index: value = df.loc[idx, col] if pd.isna(value): data[str(idx)] = None else: try: data[str(idx)] = float(value) except (ValueError, TypeError): data[str(idx)] = None statements.append({ "period_ending": period_ending, "currency": "USD", # Default, could be extracted from ticker info "data": data }) return statements def get_fundamentals(arguments: Dict[str, Any]) -> str: """ Get comprehensive financial fundamentals for a stock Args: arguments: Dictionary containing 'instrument', 'period', and optionally 'years' Returns: JSON string with financial statements and ratios or error message """ try: instrument = arguments.get("instrument", "").upper().strip() period = arguments.get("period", "annual").lower() years = arguments.get("years", 5) if not instrument: return json.dumps({ "error": "Instrument (ticker symbol) is required", "instrument": None }) if period not in ["annual", "quarterly", "ttm"]: return json.dumps({ "error": "Period must be one of: annual, quarterly, ttm", "instrument": instrument }) # Get ticker data ticker = yf.Ticker(instrument) info = ticker.info # Get financial statements based on period if period == "annual": income_stmt = ticker.financials balance_sheet = ticker.balance_sheet cashflow_stmt = ticker.cashflow elif period == "quarterly": income_stmt = ticker.quarterly_financials balance_sheet = ticker.quarterly_balance_sheet cashflow_stmt = ticker.quarterly_cashflow else: # ttm # For TTM, use the most recent annual data income_stmt = ticker.financials balance_sheet = ticker.balance_sheet cashflow_stmt = ticker.cashflow # Limit to requested number of years/periods if not income_stmt.empty and income_stmt.shape[1] > years: income_stmt = income_stmt.iloc[:, :years] if not balance_sheet.empty and balance_sheet.shape[1] > years: balance_sheet = balance_sheet.iloc[:, :years] if not cashflow_stmt.empty and cashflow_stmt.shape[1] > years: cashflow_stmt = cashflow_stmt.iloc[:, :years] # Process financial statements income_statements = process_financial_statement(income_stmt, "income") balance_sheets = process_financial_statement(balance_sheet, "balance") cashflow_statements = process_financial_statement(cashflow_stmt, "cashflow") # Calculate ratios ratios = calculate_financial_ratios(ticker, info) # Create metadata meta = FundamentalsMetadata( currency=info.get('currency', 'USD'), as_of=datetime.now().strftime('%Y-%m-%d'), period_type=period, years_requested=years ) # Create the complete fundamentals response fundamentals = StockFundamentals( symbol=instrument, income=[IncomeStatement(**stmt) for stmt in income_statements], balance=[BalanceSheet(**stmt) for stmt in balance_sheets], cashflow=[CashFlowStatement(**stmt) for stmt in cashflow_statements], ratios=ratios, meta=meta ) return fundamentals.model_dump_json(indent=2) except Exception as e: return json.dumps({ "error": f"Error fetching fundamentals for '{instrument}': {str(e)}", "instrument": instrument }) def calculate_total_return_index(price_data: pd.DataFrame, dividends: pd.DataFrame) -> List[TotalReturnPoint]: """ Calculate total return index with dividends reinvested Args: price_data: DataFrame with price history (Close column) dividends: DataFrame with dividend payments Returns: List of TotalReturnPoint objects """ if price_data.empty: return [] total_return_data = [] # Start with base index of 100 tr_index = 100.0 prev_close = None for date, row in price_data.iterrows(): close_price = row['Close'] if pd.isna(close_price): continue if prev_close is not None: # Calculate price return price_return = (close_price / prev_close) - 1 # Check for dividends on this date dividend_yield = 0.0 if not dividends.empty and date in dividends.index: dividend_amount = dividends.loc[date] # dividends is a Series, not DataFrame if not pd.isna(dividend_amount) and dividend_amount > 0: dividend_yield = dividend_amount / prev_close # Update total return index tr_index = tr_index * (1 + price_return + dividend_yield) total_return_data.append(TotalReturnPoint( t=date.strftime('%Y-%m-%d'), tr=round(tr_index, 4) )) prev_close = close_price return total_return_data def get_price_history(arguments: Dict[str, Any]) -> str: """ Get historical price data with optional total return calculation Args: arguments: Dictionary containing price history parameters Returns: JSON string with price history data or error message """ try: instrument = arguments.get("instrument", "").upper().strip() start = arguments.get("start") end = arguments.get("end") interval = arguments.get("interval", "1d") adjusted = arguments.get("adjusted", True) include_total_return = arguments.get("include_total_return", True) if not instrument: return json.dumps({ "error": "Instrument (ticker symbol) is required", "instrument": None }) if not start or not end: return json.dumps({ "error": "Both start and end dates are required (YYYY-MM-DD format)", "instrument": instrument }) if interval not in ["1d", "1w", "1m"]: return json.dumps({ "error": "Interval must be one of: 1d, 1w, 1m", "instrument": instrument }) # Check for incompatible interval/period combinations from datetime import datetime as dt try: start_date = dt.strptime(start, '%Y-%m-%d') end_date = dt.strptime(end, '%Y-%m-%d') days_diff = (end_date - start_date).days # yfinance limitations for intraday data if interval == "1m" and days_diff > 30: return json.dumps({ "error": f"1-minute interval data is only available for the last 30 days. Your requested period is {days_diff} days. Use '1d' or '1w' for longer periods.", "instrument": instrument }) except ValueError as ve: return json.dumps({ "error": f"Invalid date format. Use YYYY-MM-DD format. Error: {str(ve)}", "instrument": instrument }) # Get ticker data ticker = yf.Ticker(instrument) # Get historical data try: hist = ticker.history( start=start, end=end, interval=interval, auto_adjust=adjusted, back_adjust=True, repair=True, keepna=False ) except Exception as hist_error: return json.dumps({ "error": f"Error fetching data from yfinance for '{instrument}': {str(hist_error)}. Check if the ticker symbol is valid and the date range is supported.", "instrument": instrument }) if hist.empty: return json.dumps({ "error": f"No price data found for '{instrument}' in the specified period ({start} to {end}) with interval '{interval}'. This could be due to: 1) Invalid ticker symbol, 2) Interval limitations (1m data only available for last 30 days), 4) Market closure dates.", "instrument": instrument }) # Convert price data to bars bars = [] for date, row in hist.iterrows(): bars.append(PriceBar( t=date.strftime('%Y-%m-%d'), o=float(row['Open']) if not pd.isna(row['Open']) else None, h=float(row['High']) if not pd.isna(row['High']) else None, l=float(row['Low']) if not pd.isna(row['Low']) else None, c=float(row['Close']) if not pd.isna(row['Close']) else None, v=int(row['Volume']) if not pd.isna(row['Volume']) else None )) # Calculate total return if requested total_return = [] if include_total_return: # Get dividend data dividends = ticker.dividends # Filter dividends to the same period safely try: if not dividends.empty: dividends = dividends.loc[start:end] except Exception: # If date filtering fails, use all dividends pass total_return = calculate_total_return_index(hist, dividends) # Get adjustments data splits = ticker.splits stock_dividends = ticker.dividends # Filter splits and dividends to the same period safely splits_filtered = splits dividends_filtered = stock_dividends try: if not splits.empty: splits_filtered = splits.loc[start:end] except Exception: # If date filtering fails, use all splits pass try: if not stock_dividends.empty: dividends_filtered = stock_dividends.loc[start:end] except Exception: # If date filtering fails, use all dividends pass adjustments = PriceAdjustments( splits={date.strftime('%Y-%m-%d'): float(ratio) for date, ratio in splits_filtered.items()}, dividends={date.strftime('%Y-%m-%d'): float(amount) for date, amount in dividends_filtered.items()}, stock_splits={} # Could be enhanced to separate stock splits from regular splits ) # Create the complete price history response price_history = PriceHistory( symbol=instrument, bars=bars, total_return=total_return, adjustments=adjustments, tz="America/New_York", # Default timezone, could be enhanced interval=interval, start_date=start, end_date=end ) return price_history.model_dump_json(indent=2) except Exception as e: return json.dumps({ "error": f"Error fetching price history for '{instrument}': {str(e)}", "instrument": instrument }) def calculate_dividend_cagr(dividends_series: pd.Series, years: float) -> float: """Calculate dividend CAGR over specified period""" if len(dividends_series) < 2 or years <= 0: return None # Get annual dividend sums annual_dividends = dividends_series.groupby(dividends_series.index.year).sum() if len(annual_dividends) < 2: return None first_year_dividend = annual_dividends.iloc[0] last_year_dividend = annual_dividends.iloc[-1] if first_year_dividend <= 0 or last_year_dividend <= 0: return None # CAGR = (Ending Value / Beginning Value)^(1/years) - 1 cagr = ((last_year_dividend / first_year_dividend) ** (1 / years)) - 1 return round(cagr * 100, 2) def calculate_dividend_consistency(dividends_series: pd.Series) -> float: """Calculate dividend consistency score (0-100)""" if len(dividends_series) < 4: # Need at least 4 quarters of data return None # Group by year and check for dividend payments annual_dividends = dividends_series.groupby(dividends_series.index.year).sum() # Check for dividend cuts or omissions years_with_dividends = len(annual_dividends[annual_dividends > 0]) total_years = len(annual_dividends) if total_years == 0: return 0 # Base consistency score consistency_score = (years_with_dividends / total_years) * 100 # Penalty for dividend cuts dividend_cuts = 0 for i in range(1, len(annual_dividends)): if annual_dividends.iloc[i] < annual_dividends.iloc[i-1] * 0.95: # 5% tolerance dividend_cuts += 1 # Reduce score for each cut consistency_score = max(0, consistency_score - (dividend_cuts * 15)) return round(consistency_score, 1) def calculate_dividend_yield_metrics(dividends_series: pd.Series, current_price: float) -> DividendYield: """Calculate dividend yield metrics""" yield_metrics = DividendYield() if current_price <= 0 or dividends_series.empty: return yield_metrics # Convert datetime to timezone-aware if needed def make_timezone_aware(dt): if dividends_series.index.tz is not None: import pytz if dt.tzinfo is None: return pytz.timezone('America/New_York').localize(dt) return dt return dt # TTM yield one_year_ago = make_timezone_aware(datetime.now() - timedelta(days=365)) ttm_dividends = dividends_series[dividends_series.index >= one_year_ago].sum() if ttm_dividends > 0: yield_metrics.ttm = round((ttm_dividends / current_price) * 100, 2) # 5-year average yield (approximation using available data) if len(dividends_series) >= 5: five_year_ago = make_timezone_aware(datetime.now() - timedelta(days=365 * 5)) five_year_dividends = dividends_series[dividends_series.index >= five_year_ago] if not five_year_dividends.empty: annual_avg_dividend = five_year_dividends.sum() / 5 yield_metrics.five_year_avg = round((annual_avg_dividend / current_price) * 100, 2) return yield_metrics def get_dividends_and_actions(arguments: Dict[str, Any]) -> str: """ Get dividend history, corporate actions, and dividend quality metrics Args: arguments: Dictionary containing dividend and actions parameters Returns: JSON string with dividends, actions, and metrics or error message """ try: instrument = arguments.get("instrument", "").upper().strip() start = arguments.get("start") end = arguments.get("end") if not instrument: return json.dumps({ "error": "Instrument (ticker symbol) is required", "instrument": None }) # Default date range: 10 years if not end: end = datetime.now().strftime('%Y-%m-%d') if not start: start = (datetime.now() - timedelta(days=365 * 10)).strftime('%Y-%m-%d') # Get ticker data ticker = yf.Ticker(instrument) # Get dividend data try: dividends_series = ticker.dividends if not dividends_series.empty: # Convert start and end to datetime for filtering start_dt = pd.to_datetime(start) end_dt = pd.to_datetime(end) # Make dates timezone-aware if needed if dividends_series.index.tz is not None: import pytz tz = dividends_series.index.tz if start_dt.tz is None: start_dt = tz.localize(start_dt) if end_dt.tz is None: end_dt = tz.localize(end_dt) # Filter by date range dividends_series = dividends_series[(dividends_series.index >= start_dt) & (dividends_series.index <= end_dt)] except Exception as e: logging.error(f"Error retrieving dividends data: {e}") dividends_series = pd.Series(dtype=float) # Get stock splits and actions try: splits = ticker.splits if not splits.empty: # Convert start and end to datetime for filtering start_dt = pd.to_datetime(start) end_dt = pd.to_datetime(end) # Make dates timezone-aware if needed if splits.index.tz is not None: import pytz tz = splits.index.tz if start_dt.tz is None: start_dt = tz.localize(start_dt) if end_dt.tz is None: end_dt = tz.localize(end_dt) # Filter by date range splits = splits[(splits.index >= start_dt) & (splits.index <= end_dt)] except Exception as e: logging.error(f"Error retrieving stock splits data: {e}") splits = pd.Series(dtype=float) # Get current price for yield calculation try: current_info = ticker.info current_price = current_info.get('regularMarketPrice') or current_info.get('currentPrice') or 0 except Exception: current_price = 0 # Process dividends dividend_records = [] for date, amount in dividends_series.items(): dividend_records.append(DividendRecord( ex_date=date.strftime('%Y-%m-%d'), amount=round(float(amount), 4), currency="USD" # Default, could be enhanced )) # Calculate yield metrics yield_metrics = calculate_dividend_yield_metrics(dividends_series, current_price) # Calculate growth metrics growth_metrics = DividendGrowth() if len(dividends_series) >= 2: years_of_data = (dividends_series.index[-1] - dividends_series.index[0]).days / 365.25 if years_of_data >= 1: growth_metrics.cagr_5y = calculate_dividend_cagr( dividends_series, min(years_of_data, 5) ) growth_metrics.consistency_score = calculate_dividend_consistency(dividends_series) # Process corporate actions actions = [] for date, ratio in splits.items(): actions.append(CorporateAction( type="split", effective_date=date.strftime('%Y-%m-%d'), factor=round(float(ratio), 4), description=f"Stock split {ratio}:1" )) # Create the complete response dividends_and_actions = DividendsAndActions( symbol=instrument, dividends=dividend_records, yield_metrics=yield_metrics, growth_metrics=growth_metrics, actions=actions, period_start=start, period_end=end ) return dividends_and_actions.model_dump_json(indent=2) except Exception as e: return json.dumps({ "error": f"Error fetching dividends and actions for '{instrument}': {str(e)}", "instrument": instrument })

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/leogue/StockMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server