Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
technical_analysis.py26.4 kB
""" Technical analysis functions for Maverick-MCP. This module contains functions for performing technical analysis on financial data, including calculating indicators, analyzing trends, and generating trading signals. DISCLAIMER: All technical analysis functions in this module are for educational purposes only. Technical indicators are mathematical calculations based on historical data and do not predict future price movements. Past performance does not guarantee future results. Always conduct thorough research and consult with qualified financial professionals before making investment decisions. """ import logging from collections.abc import Sequence from typing import Any import numpy as np import pandas as pd import pandas_ta as ta from maverick_mcp.config.technical_constants import TECHNICAL_CONFIG # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("maverick_mcp.technical_analysis") def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: """ Add technical indicators to the dataframe Args: df: DataFrame with OHLCV price data Returns: DataFrame with added technical indicators """ # Ensure column names are lowercase df = df.copy() df.columns = [col.lower() for col in df.columns] # Use pandas_ta for all indicators with configurable parameters # EMA df["ema_21"] = ta.ema(df["close"], length=TECHNICAL_CONFIG.EMA_PERIOD) # SMA df["sma_50"] = ta.sma(df["close"], length=TECHNICAL_CONFIG.SMA_SHORT_PERIOD) df["sma_200"] = ta.sma(df["close"], length=TECHNICAL_CONFIG.SMA_LONG_PERIOD) # RSI df["rsi"] = ta.rsi(df["close"], length=TECHNICAL_CONFIG.RSI_PERIOD) # MACD macd = ta.macd( df["close"], fast=TECHNICAL_CONFIG.MACD_FAST_PERIOD, slow=TECHNICAL_CONFIG.MACD_SLOW_PERIOD, signal=TECHNICAL_CONFIG.MACD_SIGNAL_PERIOD, ) if macd is not None and not macd.empty: df["macd_12_26_9"] = macd["MACD_12_26_9"] df["macds_12_26_9"] = macd["MACDs_12_26_9"] df["macdh_12_26_9"] = macd["MACDh_12_26_9"] else: df["macd_12_26_9"] = np.nan df["macds_12_26_9"] = np.nan df["macdh_12_26_9"] = np.nan # Bollinger Bands bbands = ta.bbands(df["close"], length=20, std=2.0) if bbands is not None and not bbands.empty: resolved_columns = _resolve_bollinger_columns(bbands.columns) if resolved_columns: mid_col, upper_col, lower_col = resolved_columns df["sma_20"] = bbands[mid_col] df["bbu_20_2.0"] = bbands[upper_col] df["bbl_20_2.0"] = bbands[lower_col] else: logger.warning( "Bollinger Bands columns missing expected names: %s", list(bbands.columns), ) df["sma_20"] = np.nan df["bbu_20_2.0"] = np.nan df["bbl_20_2.0"] = np.nan else: df["sma_20"] = np.nan df["bbu_20_2.0"] = np.nan df["bbl_20_2.0"] = np.nan df["stdev"] = df["close"].rolling(window=20).std() # ATR df["atr"] = ta.atr(df["high"], df["low"], df["close"], length=14) # Stochastic Oscillator stoch = ta.stoch(df["high"], df["low"], df["close"], k=14, d=3, smooth_k=3) if stoch is not None and not stoch.empty: df["stochk_14_3_3"] = stoch["STOCHk_14_3_3"] df["stochd_14_3_3"] = stoch["STOCHd_14_3_3"] else: df["stochk_14_3_3"] = np.nan df["stochd_14_3_3"] = np.nan # ADX adx = ta.adx(df["high"], df["low"], df["close"], length=14) if adx is not None and not adx.empty: df["adx_14"] = adx["ADX_14"] else: df["adx_14"] = np.nan return df def _resolve_bollinger_columns(columns: Sequence[str]) -> tuple[str, str, str] | None: """Resolve Bollinger Band column names across pandas-ta variants.""" candidate_sets = [ ("BBM_20_2.0", "BBU_20_2.0", "BBL_20_2.0"), ("BBM_20_2", "BBU_20_2", "BBL_20_2"), ] for candidate in candidate_sets: if set(candidate).issubset(columns): return candidate mid_candidates = [column for column in columns if column.startswith("BBM_")] upper_candidates = [column for column in columns if column.startswith("BBU_")] lower_candidates = [column for column in columns if column.startswith("BBL_")] if mid_candidates and upper_candidates and lower_candidates: return mid_candidates[0], upper_candidates[0], lower_candidates[0] return None def identify_support_levels(df: pd.DataFrame) -> list[float]: """ Identify support levels using recent lows Args: df: DataFrame with price data Returns: List of support price levels """ # Use the lowest points in recent periods last_month = df.iloc[-30:] if len(df) >= 30 else df min_price = last_month["low"].min() # Additional support levels support_levels = [ round(min_price, 2), round(df["close"].iloc[-1] * 0.95, 2), # 5% below current price round(df["close"].iloc[-1] * 0.90, 2), # 10% below current price ] return sorted(set(support_levels)) def identify_resistance_levels(df: pd.DataFrame) -> list[float]: """ Identify resistance levels using recent highs Args: df: DataFrame with price data Returns: List of resistance price levels """ # Use the highest points in recent periods last_month = df.iloc[-30:] if len(df) >= 30 else df max_price = last_month["high"].max() # Additional resistance levels resistance_levels = [ round(max_price, 2), round(df["close"].iloc[-1] * 1.05, 2), # 5% above current price round(df["close"].iloc[-1] * 1.10, 2), # 10% above current price ] return sorted(set(resistance_levels)) def analyze_trend(df: pd.DataFrame) -> int: """ Calculate the trend strength of a stock based on various technical indicators. Args: df: DataFrame with price and indicator data Returns: Integer trend strength score (0-7) """ try: trend_strength = 0 close_price = df["close"].iloc[-1] # Check SMA 50 sma_50 = df["sma_50"].iloc[-1] if pd.notna(sma_50) and close_price > sma_50: trend_strength += 1 # Check EMA 21 ema_21 = df["ema_21"].iloc[-1] if pd.notna(ema_21) and close_price > ema_21: trend_strength += 1 # Check EMA 21 vs SMA 50 if pd.notna(ema_21) and pd.notna(sma_50) and ema_21 > sma_50: trend_strength += 1 # Check SMA 50 vs SMA 200 sma_200 = df["sma_200"].iloc[-1] if pd.notna(sma_50) and pd.notna(sma_200) and sma_50 > sma_200: trend_strength += 1 # Check RSI rsi = df["rsi"].iloc[-1] if pd.notna(rsi) and rsi > 50: trend_strength += 1 # Check MACD macd = df["macd_12_26_9"].iloc[-1] if pd.notna(macd) and macd > 0: trend_strength += 1 # Check ADX adx = df["adx_14"].iloc[-1] if pd.notna(adx) and adx > 25: trend_strength += 1 return trend_strength except Exception as e: logger.error(f"Error calculating trend strength: {e}") return 0 def analyze_rsi(df: pd.DataFrame) -> dict[str, Any]: """ Analyze RSI indicator Args: df: DataFrame with price and indicator data Returns: Dictionary with RSI analysis """ try: # Check if dataframe is valid and has RSI column if df.empty: return { "current": None, "signal": "unavailable", "description": "No data available for RSI calculation", } if "rsi" not in df.columns: return { "current": None, "signal": "unavailable", "description": "RSI indicator not calculated", } if len(df) == 0: return { "current": None, "signal": "unavailable", "description": "Insufficient data for RSI calculation", } rsi = df["rsi"].iloc[-1] # Check if RSI is NaN if pd.isna(rsi): return { "current": None, "signal": "unavailable", "description": "RSI data not available (insufficient data points)", } if rsi > 70: signal = "overbought" elif rsi < 30: signal = "oversold" elif rsi > 50: signal = "bullish" else: signal = "bearish" return { "current": round(rsi, 2), "signal": signal, "description": f"RSI is currently at {round(rsi, 2)}, indicating {signal} conditions.", } except Exception as e: logger.error(f"Error analyzing RSI: {e}") return { "current": None, "signal": "error", "description": f"Error calculating RSI: {str(e)}", } def analyze_macd(df: pd.DataFrame) -> dict[str, Any]: """ Analyze MACD indicator Args: df: DataFrame with price and indicator data Returns: Dictionary with MACD analysis """ try: macd = df["macd_12_26_9"].iloc[-1] signal = df["macds_12_26_9"].iloc[-1] histogram = df["macdh_12_26_9"].iloc[-1] # Check if any values are NaN if pd.isna(macd) or pd.isna(signal) or pd.isna(histogram): return { "macd": None, "signal": None, "histogram": None, "indicator": "unavailable", "crossover": "unavailable", "description": "MACD data not available (insufficient data points)", } if macd > signal and histogram > 0: signal_type = "bullish" elif macd < signal and histogram < 0: signal_type = "bearish" elif macd > signal and macd < 0: signal_type = "improving" elif macd < signal and macd > 0: signal_type = "weakening" else: signal_type = "neutral" # Check for crossover (ensure we have enough data) crossover = "no recent crossover" if len(df) >= 2: prev_macd = df["macd_12_26_9"].iloc[-2] prev_signal = df["macds_12_26_9"].iloc[-2] if pd.notna(prev_macd) and pd.notna(prev_signal): if prev_macd <= prev_signal and macd > signal: crossover = "bullish crossover detected" elif prev_macd >= prev_signal and macd < signal: crossover = "bearish crossover detected" return { "macd": round(macd, 2), "signal": round(signal, 2), "histogram": round(histogram, 2), "indicator": signal_type, "crossover": crossover, "description": f"MACD is {signal_type} with {crossover}.", } except Exception as e: logger.error(f"Error analyzing MACD: {e}") return { "macd": None, "signal": None, "histogram": None, "indicator": "error", "crossover": "error", "description": "Error calculating MACD", } def analyze_stochastic(df: pd.DataFrame) -> dict[str, Any]: """ Analyze Stochastic Oscillator Args: df: DataFrame with price and indicator data Returns: Dictionary with stochastic oscillator analysis """ try: k = df["stochk_14_3_3"].iloc[-1] d = df["stochd_14_3_3"].iloc[-1] # Check if values are NaN if pd.isna(k) or pd.isna(d): return { "k": None, "d": None, "signal": "unavailable", "crossover": "unavailable", "description": "Stochastic data not available (insufficient data points)", } if k > 80 and d > 80: signal = "overbought" elif k < 20 and d < 20: signal = "oversold" elif k > d: signal = "bullish" else: signal = "bearish" # Check for crossover (ensure we have enough data) crossover = "no recent crossover" if len(df) >= 2: prev_k = df["stochk_14_3_3"].iloc[-2] prev_d = df["stochd_14_3_3"].iloc[-2] if pd.notna(prev_k) and pd.notna(prev_d): if prev_k <= prev_d and k > d: crossover = "bullish crossover detected" elif prev_k >= prev_d and k < d: crossover = "bearish crossover detected" return { "k": round(k, 2), "d": round(d, 2), "signal": signal, "crossover": crossover, "description": f"Stochastic Oscillator is {signal} with {crossover}.", } except Exception as e: logger.error(f"Error analyzing Stochastic: {e}") return { "k": None, "d": None, "signal": "error", "crossover": "error", "description": "Error calculating Stochastic", } def analyze_bollinger_bands(df: pd.DataFrame) -> dict[str, Any]: """ Analyze Bollinger Bands Args: df: DataFrame with price and indicator data Returns: Dictionary with Bollinger Bands analysis """ try: current_price = df["close"].iloc[-1] upper_band = df["bbu_20_2.0"].iloc[-1] lower_band = df["bbl_20_2.0"].iloc[-1] middle_band = df["sma_20"].iloc[-1] # Check if any values are NaN if pd.isna(upper_band) or pd.isna(lower_band) or pd.isna(middle_band): return { "upper_band": None, "middle_band": None, "lower_band": None, "position": "unavailable", "signal": "unavailable", "volatility": "unavailable", "description": "Bollinger Bands data not available (insufficient data points)", } if current_price > upper_band: position = "above upper band" signal = "overbought" elif current_price < lower_band: position = "below lower band" signal = "oversold" elif current_price > middle_band: position = "above middle band" signal = "bullish" else: position = "below middle band" signal = "bearish" # Check for BB squeeze (volatility contraction) volatility = "stable" if len(df) >= 5: try: bb_widths = [] for i in range(-5, 0): upper = df["bbu_20_2.0"].iloc[i] lower = df["bbl_20_2.0"].iloc[i] middle = df["sma_20"].iloc[i] if ( pd.notna(upper) and pd.notna(lower) and pd.notna(middle) and middle != 0 ): bb_widths.append((upper - lower) / middle) if len(bb_widths) == 5: if all(bb_widths[i] < bb_widths[i - 1] for i in range(1, 5)): volatility = "contracting (potential breakout ahead)" elif all(bb_widths[i] > bb_widths[i - 1] for i in range(1, 5)): volatility = "expanding (increased volatility)" except Exception: # If volatility calculation fails, keep it as stable pass return { "upper_band": round(upper_band, 2), "middle_band": round(middle_band, 2), "lower_band": round(lower_band, 2), "position": position, "signal": signal, "volatility": volatility, "description": f"Price is {position}, indicating {signal} conditions. Volatility is {volatility}.", } except Exception as e: logger.error(f"Error analyzing Bollinger Bands: {e}") return { "upper_band": None, "middle_band": None, "lower_band": None, "position": "error", "signal": "error", "volatility": "error", "description": "Error calculating Bollinger Bands", } def analyze_volume(df: pd.DataFrame) -> dict[str, Any]: """ Analyze volume patterns Args: df: DataFrame with price and volume data Returns: Dictionary with volume analysis """ try: current_volume = df["volume"].iloc[-1] # Check if we have enough data for average if len(df) < 10: avg_volume = df["volume"].mean() else: avg_volume = df["volume"].iloc[-10:].mean() # Check for invalid values if pd.isna(current_volume) or pd.isna(avg_volume) or avg_volume == 0: return { "current": None, "average": None, "ratio": None, "description": "unavailable", "signal": "unavailable", } volume_ratio = current_volume / avg_volume if volume_ratio > 1.5: volume_desc = "above average" if len(df) >= 2 and df["close"].iloc[-1] > df["close"].iloc[-2]: signal = "bullish (high volume on up move)" else: signal = "bearish (high volume on down move)" elif volume_ratio < 0.7: volume_desc = "below average" signal = "weak conviction" else: volume_desc = "average" signal = "neutral" return { "current": int(current_volume), "average": int(avg_volume), "ratio": round(volume_ratio, 2), "description": volume_desc, "signal": signal, } except Exception as e: logger.error(f"Error analyzing volume: {e}") return { "current": None, "average": None, "ratio": None, "description": "error", "signal": "error", } def identify_chart_patterns(df: pd.DataFrame) -> list[str]: """ Identify common chart patterns Args: df: DataFrame with price data Returns: List of identified chart patterns """ patterns = [] # Check for potential double bottom (W formation) if len(df) >= 40: recent_lows = df["low"].iloc[-40:].values potential_bottoms = [] for i in range(1, len(recent_lows) - 1): if ( recent_lows[i] < recent_lows[i - 1] and recent_lows[i] < recent_lows[i + 1] ): potential_bottoms.append(i) if ( len(potential_bottoms) >= 2 and potential_bottoms[-1] - potential_bottoms[-2] >= 5 ): if ( abs( recent_lows[potential_bottoms[-1]] - recent_lows[potential_bottoms[-2]] ) / recent_lows[potential_bottoms[-2]] < 0.05 ): patterns.append("Double Bottom (W)") # Check for potential double top (M formation) if len(df) >= 40: recent_highs = df["high"].iloc[-40:].values potential_tops = [] for i in range(1, len(recent_highs) - 1): if ( recent_highs[i] > recent_highs[i - 1] and recent_highs[i] > recent_highs[i + 1] ): potential_tops.append(i) if len(potential_tops) >= 2 and potential_tops[-1] - potential_tops[-2] >= 5: if ( abs(recent_highs[potential_tops[-1]] - recent_highs[potential_tops[-2]]) / recent_highs[potential_tops[-2]] < 0.05 ): patterns.append("Double Top (M)") # Check for bullish flag/pennant if len(df) >= 20: recent_prices = df["close"].iloc[-20:].values if ( recent_prices[0] < recent_prices[10] and all( recent_prices[i] >= recent_prices[i - 1] * 0.99 for i in range(1, 10) ) and all( abs(recent_prices[i] - recent_prices[i - 1]) / recent_prices[i - 1] < 0.02 for i in range(11, 20) ) ): patterns.append("Bullish Flag/Pennant") # Check for bearish flag/pennant if len(df) >= 20: recent_prices = df["close"].iloc[-20:].values if ( recent_prices[0] > recent_prices[10] and all( recent_prices[i] <= recent_prices[i - 1] * 1.01 for i in range(1, 10) ) and all( abs(recent_prices[i] - recent_prices[i - 1]) / recent_prices[i - 1] < 0.02 for i in range(11, 20) ) ): patterns.append("Bearish Flag/Pennant") return patterns def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.Series: """ Calculate Average True Range (ATR) for the given dataframe. Args: df: DataFrame with high, low, and close price data period: Period for ATR calculation (default: 14) Returns: Series with ATR values """ # Ensure column names are lowercase df_copy = df.copy() df_copy.columns = [col.lower() for col in df_copy.columns] # Use pandas_ta to calculate ATR atr = ta.atr(df_copy["high"], df_copy["low"], df_copy["close"], length=period) # Ensure we return a Series if isinstance(atr, pd.Series): return atr elif isinstance(atr, pd.DataFrame): # If it's a DataFrame, take the first column return pd.Series(atr.iloc[:, 0]) elif atr is not None: # If it's a numpy array or other iterable return pd.Series(atr) else: # Return empty series if calculation failed return pd.Series(dtype=float) def generate_outlook( df: pd.DataFrame, trend: str, rsi_analysis: dict[str, Any], macd_analysis: dict[str, Any], stoch_analysis: dict[str, Any], ) -> str: """ Generate an overall outlook based on technical analysis Args: df: DataFrame with price and indicator data trend: Trend direction from analyze_trend rsi_analysis: RSI analysis from analyze_rsi macd_analysis: MACD analysis from analyze_macd stoch_analysis: Stochastic analysis from analyze_stochastic Returns: String with overall market outlook """ bullish_signals = 0 bearish_signals = 0 # Count signals from different indicators if trend == "uptrend": bullish_signals += 2 elif trend == "downtrend": bearish_signals += 2 if rsi_analysis["signal"] == "bullish" or rsi_analysis["signal"] == "oversold": bullish_signals += 1 elif rsi_analysis["signal"] == "bearish" or rsi_analysis["signal"] == "overbought": bearish_signals += 1 if ( macd_analysis["indicator"] == "bullish" or macd_analysis["crossover"] == "bullish crossover detected" ): bullish_signals += 1 elif ( macd_analysis["indicator"] == "bearish" or macd_analysis["crossover"] == "bearish crossover detected" ): bearish_signals += 1 if stoch_analysis["signal"] == "bullish" or stoch_analysis["signal"] == "oversold": bullish_signals += 1 elif ( stoch_analysis["signal"] == "bearish" or stoch_analysis["signal"] == "overbought" ): bearish_signals += 1 # Generate outlook based on signals if bullish_signals >= 4: return "strongly bullish" elif bullish_signals > bearish_signals: return "moderately bullish" elif bearish_signals >= 4: return "strongly bearish" elif bearish_signals > bullish_signals: return "moderately bearish" else: return "neutral" def calculate_rsi(df: pd.DataFrame, period: int = 14) -> pd.Series: """ Calculate RSI (Relative Strength Index) for the given dataframe. Args: df: DataFrame with price data period: Period for RSI calculation (default: 14) Returns: Series with RSI values """ # Handle both uppercase and lowercase column names df_copy = df.copy() df_copy.columns = [col.lower() for col in df_copy.columns] # Ensure we have the required 'close' column if "close" not in df_copy.columns: raise ValueError("DataFrame must contain a 'close' or 'Close' column") # Use pandas_ta to calculate RSI rsi = ta.rsi(df_copy["close"], length=period) # Ensure we return a Series if isinstance(rsi, pd.Series): return rsi elif rsi is not None: # If it's a numpy array or other iterable return pd.Series(rsi, index=df.index) else: # Return empty series if calculation failed return pd.Series(dtype=float, index=df.index) def calculate_sma(df: pd.DataFrame, period: int) -> pd.Series: """ Calculate Simple Moving Average (SMA) for the given dataframe. Args: df: DataFrame with price data period: Period for SMA calculation Returns: Series with SMA values """ # Handle both uppercase and lowercase column names df_copy = df.copy() df_copy.columns = [col.lower() for col in df_copy.columns] # Ensure we have the required 'close' column if "close" not in df_copy.columns: raise ValueError("DataFrame must contain a 'close' or 'Close' column") # Use pandas_ta to calculate SMA sma = ta.sma(df_copy["close"], length=period) # Ensure we return a Series if isinstance(sma, pd.Series): return sma elif sma is not None: # If it's a numpy array or other iterable return pd.Series(sma, index=df.index) else: # Return empty series if calculation failed return pd.Series(dtype=float, index=df.index)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server