Skip to main content
Glama
technical_service.py19.6 kB
# src/server/domain/services/technical_service.py """Technical analysis service. Calculates technical indicators and generates trading signals. """ from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd from src.server.utils.logger import logger class TechnicalService: """Technical analysis service.""" def __init__(self, adapter_manager): """Initialize technical analysis service. Args: adapter_manager: AdapterManager instance for fetching price data """ self.adapter_manager = adapter_manager logger.info("Technical analysis service initialized") async def _get_price_data( self, symbol: str, period: str = "90d", interval: str = "1d" ) -> Optional[pd.DataFrame]: """Fetch price data and convert to DataFrame.""" try: if not self.adapter_manager: logger.error("Adapter manager not initialized") return None # Parse period to days days = 90 if period.endswith("d"): days = int(period[:-1]) elif period.endswith("m"): days = int(period[:-1]) * 30 elif period.endswith("y"): days = int(period[:-1]) * 365 end_date = datetime.now() start_date = end_date - timedelta(days=days) # Fetch historical data # Note: adapter_manager.get_historical_prices returns List[AssetPrice] prices = await self.adapter_manager.get_historical_prices( symbol, start_date, end_date, interval ) if not prices: logger.warning(f"No historical data found for {symbol}") return None # Convert to DataFrame data = [p.to_dict() for p in prices] df = pd.DataFrame(data) # Rename columns to match technical analysis expectations if needed # AssetPrice dict keys: ticker, price, currency, timestamp, volume, open_price, high_price, low_price, close_price, ... # We need: open, high, low, close, volume, date (index) rename_map = { "open_price": "open", "high_price": "high", "low_price": "low", "close_price": "close", "timestamp": "date" } df = df.rename(columns=rename_map) # Ensure numeric types for col in ["open", "high", "low", "close", "volume"]: if col in df.columns: df[col] = pd.to_numeric(df[col]) # Set index if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.set_index("date") df = df.sort_index() return df except Exception as e: logger.error(f"Failed to get price data for {symbol}: {e}") return None def _calculate_sma(self, data: pd.DataFrame, period: int) -> pd.Series: return data["close"].rolling(window=period).mean() def _calculate_ema(self, data: pd.DataFrame, period: int) -> pd.Series: return data["close"].ewm(span=period, adjust=False).mean() def _calculate_rsi(self, data: pd.DataFrame, period: int = 14) -> pd.Series: delta = data["close"].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss return 100 - (100 / (1 + rs)) def _calculate_macd( self, data: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9, ) -> Tuple[pd.Series, pd.Series, pd.Series]: ema_fast = data["close"].ewm(span=fast_period, adjust=False).mean() ema_slow = data["close"].ewm(span=slow_period, adjust=False).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=signal_period, adjust=False).mean() histogram = macd_line - signal_line return macd_line, signal_line, histogram def _calculate_bollinger_bands( self, data: pd.DataFrame, period: int = 20, std_dev: float = 2.0 ) -> Tuple[pd.Series, pd.Series, pd.Series]: middle_band = data["close"].rolling(window=period).mean() std = data["close"].rolling(window=period).std() upper_band = middle_band + (std * std_dev) lower_band = middle_band - (std * std_dev) return upper_band, middle_band, lower_band def _calculate_stochastic( self, data: pd.DataFrame, k_period: int = 14, d_period: int = 3 ) -> Tuple[pd.Series, pd.Series]: low_min = data["low"].rolling(window=k_period).min() high_max = data["high"].rolling(window=k_period).max() k_line = 100 * ((data["close"] - low_min) / (high_max - low_min)) d_line = k_line.rolling(window=d_period).mean() return k_line, d_line def _calculate_atr(self, data: pd.DataFrame, period: int = 14) -> pd.Series: high_low = data["high"] - data["low"] high_close = np.abs(data["high"] - data["close"].shift()) low_close = np.abs(data["low"] - data["close"].shift()) ranges = pd.concat([high_low, high_close, low_close], axis=1) true_range = ranges.max(axis=1) return true_range.rolling(window=period).mean() async def calculate_indicators( self, symbol: str, period: str = "90d", interval: str = "1d" ) -> Dict[str, Any]: """Calculate technical indicators and return structured data.""" try: # Force fetch enough data for SMA200 regardless of requested period # Parse requested period to see if we need more days_needed = 250 # Minimum for SMA200 + buffer # Use the longer of requested period or needed days fetch_period = f"{days_needed}d" data = await self._get_price_data(symbol, fetch_period, interval) if data is None or data.empty: return {"error": f"No price data for {symbol}"} # Calculate indicators sma_20 = self._calculate_sma(data, 20) sma_50 = self._calculate_sma(data, 50) sma_200 = self._calculate_sma(data, 200) ema_12 = self._calculate_ema(data, 12) ema_26 = self._calculate_ema(data, 26) rsi = self._calculate_rsi(data, 14) macd_line, signal_line, histogram = self._calculate_macd(data) bb_upper, bb_middle, bb_lower = self._calculate_bollinger_bands(data) k_line, d_line = self._calculate_stochastic(data) atr = self._calculate_atr(data) # Get latest values latest = data.iloc[-1] current_price = float(latest["close"]) def get_val(series): return float(series.iloc[-1]) if not pd.isna(series.iloc[-1]) else None result = { "symbol": symbol, "timestamp": datetime.now().isoformat(), "price": { "current": current_price, "open": float(latest["open"]), "high": float(latest["high"]), "low": float(latest["low"]), "volume": float(latest["volume"]), }, "indicators": { "sma": { "sma_20": get_val(sma_20), "sma_50": get_val(sma_50), "sma_200": get_val(sma_200), }, "ema": { "ema_12": get_val(ema_12), "ema_26": get_val(ema_26), }, "rsi": { "value": get_val(rsi), "signal": "overbought" if get_val(rsi) > 70 else "oversold" if get_val(rsi) < 30 else "neutral" }, "macd": { "macd": get_val(macd_line), "signal": get_val(signal_line), "histogram": get_val(histogram), }, "bollinger_bands": { "upper": get_val(bb_upper), "middle": get_val(bb_middle), "lower": get_val(bb_lower), }, "kdj": { "k": get_val(k_line), "d": get_val(d_line), }, "atr": get_val(atr), } } return result except Exception as e: logger.error(f"Calculate technical indicators failed: {e}", exc_info=True) return {"error": str(e)} async def generate_trading_signal( self, symbol: str, period: str = "90d", interval: str = "1d" ) -> Dict[str, Any]: """Generate trading signals based on technical indicators. Strategy: Moving Average Crossover (Golden Cross / Death Cross) """ try: # 1. Fetch sufficient data (at least 60 days for SMA50 calculation) fetch_period = "100d" # Ensure enough data for SMA50 data = await self._get_price_data(symbol, fetch_period, interval) if data is None or data.empty or len(data) < 50: return { "symbol": symbol, "signal": "neutral", "reason": "Insufficient data for analysis (need > 50 days)" } # 2. Calculate Indicators sma_20 = self._calculate_sma(data, 20) sma_50 = self._calculate_sma(data, 50) rsi = self._calculate_rsi(data, 14) # Get latest and previous values current_sma20 = sma_20.iloc[-1] current_sma50 = sma_50.iloc[-1] prev_sma20 = sma_20.iloc[-2] prev_sma50 = sma_50.iloc[-2] current_rsi = rsi.iloc[-1] current_price = data["close"].iloc[-1] # 3. Determine Signal signal = "neutral" reason = [] confidence = 0.0 # Golden Cross: SMA20 crosses above SMA50 if prev_sma20 <= prev_sma50 and current_sma20 > current_sma50: signal = "buy" reason.append("Golden Cross: SMA20 crossed above SMA50") confidence += 0.6 # Death Cross: SMA20 crosses below SMA50 elif prev_sma20 >= prev_sma50 and current_sma20 < current_sma50: signal = "sell" reason.append("Death Cross: SMA20 crossed below SMA50") confidence += 0.6 # Trend Confirmation if current_sma20 > current_sma50: if signal == "neutral": reason.append("Bullish Trend: SMA20 > SMA50") confidence += 0.2 elif current_sma20 < current_sma50: if signal == "neutral": reason.append("Bearish Trend: SMA20 < SMA50") confidence += 0.2 # RSI Filter if current_rsi < 30: reason.append(f"RSI Oversold ({current_rsi:.1f})") if signal == "buy": confidence += 0.2 elif current_rsi > 70: reason.append(f"RSI Overbought ({current_rsi:.1f})") if signal == "sell": confidence += 0.2 return { "symbol": symbol, "signal": signal, "confidence": min(confidence, 1.0), "timestamp": datetime.now().isoformat(), "price": float(current_price), "indicators": { "sma_20": float(current_sma20), "sma_50": float(current_sma50), "rsi": float(current_rsi) }, "reasons": reason } except Exception as e: logger.error(f"Generate trading signal failed: {e}", exc_info=True) return {"error": str(e)} async def calculate_support_resistance( self, symbol: str, period: str = "90d" ) -> Dict[str, Any]: """Calculate support and resistance levels.""" try: data = await self._get_price_data(symbol, period) if data is None or data.empty: return {"error": "No data"} # Simple pivot point calculation based on recent highs/lows recent_high = data["high"].max() recent_low = data["low"].min() current = data["close"].iloc[-1] levels = [] # Add recent high/low as levels levels.append({"price": float(recent_high), "type": "resistance", "strength": "strong"}) levels.append({"price": float(recent_low), "type": "support", "strength": "strong"}) return { "symbol": symbol, "current_price": float(current), "levels": levels } except Exception as e: return {"error": str(e)} async def analyze_price_patterns(self, symbol: str, period: str = "90d") -> Dict[str, Any]: """Analyze price patterns (Candlestick patterns).""" try: data = await self._get_price_data(symbol, period) if data is None or data.empty: return {"error": "No data"} patterns = [] latest = data.iloc[-1] prev = data.iloc[-2] # 1. Doji (Open approx equal to Close) body_size = abs(latest["close"] - latest["open"]) total_range = latest["high"] - latest["low"] if total_range > 0 and (body_size / total_range) < 0.1: patterns.append({"name": "Doji", "signal": "neutral", "significance": "potential reversal"}) # 2. Hammer (Small body, long lower shadow) lower_shadow = min(latest["open"], latest["close"]) - latest["low"] if total_range > 0 and body_size > 0 and (lower_shadow / body_size) > 2 and (latest["high"] - max(latest["open"], latest["close"])) < body_size: patterns.append({"name": "Hammer", "signal": "buy", "significance": "bullish reversal"}) # 3. Bullish Engulfing if (prev["close"] < prev["open"] and # Previous red latest["close"] > latest["open"] and # Current green latest["open"] < prev["close"] and latest["close"] > prev["open"]): patterns.append({"name": "Bullish Engulfing", "signal": "buy", "significance": "strong bullish"}) return { "symbol": symbol, "patterns": patterns, "last_candle": { "open": float(latest["open"]), "high": float(latest["high"]), "low": float(latest["low"]), "close": float(latest["close"]) } } except Exception as e: return {"error": str(e)} async def analyze_volume_profile(self, symbol: str, period: str = "90d") -> Dict[str, Any]: """Analyze volume profile (Simplified / Visible Range). Calculates volume distribution across price levels to identify high volume nodes (strong support/resistance) and low volume nodes. """ try: data = await self._get_price_data(symbol, period) if data is None or data.empty: return {"error": "No data"} # 1. Determine Price Range min_price = data["low"].min() max_price = data["high"].max() price_range = max_price - min_price if price_range == 0: return {"error": "Price range is zero"} # 2. Create Bins (e.g., 24 bins) num_bins = 24 bin_size = price_range / num_bins # Initialize bins # Each bin: {'min': float, 'max': float, 'volume': float} bins = [] for i in range(num_bins): bins.append({ "min": min_price + (i * bin_size), "max": min_price + ((i + 1) * bin_size), "volume": 0.0 }) # 3. Distribute Volume # We use Typical Price (H+L+C)/3 to attribute volume typical_prices = (data["high"] + data["low"] + data["close"]) / 3 volumes = data["volume"] total_volume = 0.0 for price, vol in zip(typical_prices, volumes): if pd.isna(price) or pd.isna(vol): continue # Find bin index bin_idx = int((price - min_price) / bin_size) # Handle edge case where price == max_price if bin_idx >= num_bins: bin_idx = num_bins - 1 if bin_idx < 0: bin_idx = 0 bins[bin_idx]["volume"] += float(vol) total_volume += float(vol) # 4. Analyze Results # Find POC (Point of Control) - Price level with highest volume poc_bin = max(bins, key=lambda x: x["volume"]) poc_price = (poc_bin["min"] + poc_bin["max"]) / 2 # Calculate Value Area (e.g., 70% of volume) # Sort bins by volume descending sorted_bins = sorted(bins, key=lambda x: x["volume"], reverse=True) accumulated_vol = 0.0 value_area_vol = total_volume * 0.70 value_area_bins = [] for b in sorted_bins: accumulated_vol += b["volume"] value_area_bins.append(b) if accumulated_vol >= value_area_vol: break # Determine Value Area High (VAH) and Low (VAL) vah = max(b["max"] for b in value_area_bins) val = min(b["min"] for b in value_area_bins) # Current price relation current_price = float(data["close"].iloc[-1]) status = "inside_value_area" if current_price > vah: status = "above_value_area (bullish)" elif current_price < val: status = "below_value_area (bearish)" return { "symbol": symbol, "period": period, "poc_price": poc_price, # Point of Control (Strongest Support/Resistance) "value_area": { "high": vah, "low": val, "coverage": "70%" }, "current_price": current_price, "status": status, "volume_profile": bins # Full distribution for visualization } except Exception as e: logger.error(f"Volume profile analysis failed: {e}", exc_info=True) return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server