"""Trend-following technical indicators."""
import logging
from typing import Any, Dict, List
from ..config import (
ADX_PERIOD,
ADX_STRONG_TREND,
ADX_WEAK_TREND,
ERROR_INSUFFICIENT_DATA,
ICHIMOKU_KIJUN_PERIOD,
ICHIMOKU_SENKOU_B_PERIOD,
ICHIMOKU_TENKAN_PERIOD,
MA_PERIODS,
MACD_FAST_PERIOD,
MACD_SIGNAL_PERIOD,
MACD_SLOW_PERIOD,
)
from ..utils.formatters import round_price, safe_float
logger = logging.getLogger(__name__)
def calculate_ema(prices: List[float], period: int) -> float:
"""
Calculate Exponential Moving Average (proper implementation).
Args:
prices: List of prices (most recent first)
period: EMA period
Returns:
EMA value
"""
if len(prices) < period:
return sum(prices) / len(prices) if prices else 0
multiplier = 2 / (period + 1)
# Start with SMA
ema = sum(prices[-period:]) / period
# Calculate EMA from oldest to newest
for price in reversed(prices[:-period]):
ema = (price * multiplier) + (ema * (1 - multiplier))
return ema
def calculate_moving_averages(ohlcv_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate Simple Moving Averages for multiple periods.
Args:
ohlcv_data: OHLCV data dictionary from API
Returns:
Dictionary with SMAs for different periods
"""
candles = list(ohlcv_data.items())[: max(MA_PERIODS) + 10]
if not candles:
return {"error": ERROR_INSUFFICIENT_DATA}
closes = [safe_float(c[1].get("4. close", 0)) for c in candles]
result = {"current_price": closes[0] if closes else 0}
for period in MA_PERIODS:
if len(closes) >= period:
sma = sum(closes[:period]) / period
result[f"sma_{period}"] = round_price(sma)
# Calculate EMA as well
ema = calculate_ema(closes[: period * 2], period)
result[f"ema_{period}"] = round_price(ema)
return result
def calculate_macd(ohlcv_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate MACD (Moving Average Convergence Divergence) properly.
Uses proper EMA calculations for MACD line and signal line.
Args:
ohlcv_data: OHLCV data dictionary
Returns:
MACD line, signal line, histogram, and signal
"""
candles = list(ohlcv_data.items())[:100]
if len(candles) < MACD_SLOW_PERIOD + MACD_SIGNAL_PERIOD:
return {"error": ERROR_INSUFFICIENT_DATA}
closes = [safe_float(c[1].get("4. close", 0)) for c in candles]
# Calculate EMAs properly
ema_12 = calculate_ema(closes, MACD_FAST_PERIOD)
ema_26 = calculate_ema(closes, MACD_SLOW_PERIOD)
macd_line = ema_12 - ema_26
# Calculate signal line (EMA of MACD)
# For proper signal line, we'd need MACD history, simplified here
macd_values = []
for i in range(min(MACD_SIGNAL_PERIOD * 2, len(closes) - MACD_SLOW_PERIOD)):
temp_closes = closes[i : i + MACD_SLOW_PERIOD + 10]
if len(temp_closes) >= MACD_SLOW_PERIOD:
temp_ema12 = calculate_ema(temp_closes, MACD_FAST_PERIOD)
temp_ema26 = calculate_ema(temp_closes, MACD_SLOW_PERIOD)
macd_values.append(temp_ema12 - temp_ema26)
signal_line = (
calculate_ema(macd_values, MACD_SIGNAL_PERIOD)
if macd_values
else macd_line * 0.9
)
histogram = macd_line - signal_line
return {
"macd_line": round_price(macd_line),
"signal_line": round_price(signal_line),
"histogram": round_price(histogram),
"signal": "BULLISH" if histogram > 0 else "BEARISH",
}
def calculate_adx(
ohlcv_data: Dict[str, Any], period: int = ADX_PERIOD
) -> Dict[str, Any]:
"""
Calculate ADX (Average Directional Index) properly.
Args:
ohlcv_data: OHLCV data dictionary
period: ADX period (default from config)
Returns:
ADX value, +DI, -DI, and trend strength
"""
candles = list(ohlcv_data.items())[: period * 3]
if len(candles) < period + 1:
return {"error": ERROR_INSUFFICIENT_DATA}
# Calculate True Range and Directional Movement
tr_values = []
dm_plus_values = []
dm_minus_values = []
for i in range(len(candles) - 1):
high = safe_float(candles[i][1].get("2. high", 0))
low = safe_float(candles[i][1].get("3. low", 0))
close = safe_float(candles[i][1].get("4. close", 0))
prev_high = safe_float(candles[i + 1][1].get("2. high", 0))
prev_low = safe_float(candles[i + 1][1].get("3. low", 0))
prev_close = safe_float(candles[i + 1][1].get("4. close", 0))
# True Range
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
tr_values.append(tr)
# Directional Movement
move_up = high - prev_high
move_down = prev_low - low
dm_plus = move_up if move_up > move_down and move_up > 0 else 0
dm_minus = move_down if move_down > move_up and move_down > 0 else 0
dm_plus_values.append(dm_plus)
dm_minus_values.append(dm_minus)
# Smooth using EMA-like smoothing
atr = calculate_ema(tr_values, period)
smoothed_dm_plus = calculate_ema(dm_plus_values, period)
smoothed_dm_minus = calculate_ema(dm_minus_values, period)
# Calculate DI+ and DI-
di_plus = (smoothed_dm_plus / atr * 100) if atr > 0 else 0
di_minus = (smoothed_dm_minus / atr * 100) if atr > 0 else 0
# Calculate DX and ADX
di_sum = di_plus + di_minus
dx = (abs(di_plus - di_minus) / di_sum * 100) if di_sum > 0 else 0
# ADX is EMA of DX
adx = dx # Simplified: proper ADX would need DX history
# Determine trend strength
if adx > ADX_STRONG_TREND:
strength = "STRONG"
elif adx < ADX_WEAK_TREND:
strength = "WEAK"
else:
strength = "MODERATE"
return {
"adx": round(adx, 2),
"di_plus": round(di_plus, 2),
"di_minus": round(di_minus, 2),
"trend_strength": strength,
"trend_direction": "BULLISH" if di_plus > di_minus else "BEARISH",
}
def calculate_ichimoku(ohlcv_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate Ichimoku Cloud components.
Args:
ohlcv_data: OHLCV data dictionary
Returns:
Ichimoku lines and cloud interpretation
"""
candles = list(ohlcv_data.items())[: ICHIMOKU_SENKOU_B_PERIOD + 10]
if len(candles) < ICHIMOKU_SENKOU_B_PERIOD:
return {"error": ERROR_INSUFFICIENT_DATA}
def midpoint(period: int) -> float:
"""Calculate midpoint (highest high + lowest low) / 2."""
period_candles = candles[:period]
highs = [safe_float(c[1].get("2. high", 0)) for c in period_candles]
lows = [safe_float(c[1].get("3. low", 0)) for c in period_candles]
return (max(highs) + min(lows)) / 2 if highs and lows else 0
# Calculate lines
tenkan_sen = midpoint(ICHIMOKU_TENKAN_PERIOD) # Conversion Line
kijun_sen = midpoint(ICHIMOKU_KIJUN_PERIOD) # Base Line
senkou_span_a = (tenkan_sen + kijun_sen) / 2 # Leading Span A
senkou_span_b = midpoint(ICHIMOKU_SENKOU_B_PERIOD) # Leading Span B
current_price = safe_float(candles[0][1].get("4. close", 0))
# Calculate cloud boundaries
cloud_top = max(senkou_span_a, senkou_span_b)
cloud_bottom = min(senkou_span_a, senkou_span_b)
# Determine price position relative to cloud
if current_price > cloud_top:
signal = "ABOVE_CLOUD"
interpretation = "Bullish - Strong uptrend"
elif current_price < cloud_bottom:
signal = "BELOW_CLOUD"
interpretation = "Bearish - Strong downtrend"
else:
signal = "IN_CLOUD"
interpretation = "Neutral - Consolidation or weak trend"
return {
"tenkan_sen": round_price(tenkan_sen),
"kijun_sen": round_price(kijun_sen),
"senkou_span_a": round_price(senkou_span_a),
"senkou_span_b": round_price(senkou_span_b),
"cloud_top": round_price(cloud_top),
"cloud_bottom": round_price(cloud_bottom),
"current_price": current_price,
"signal": signal,
"interpretation": interpretation,
}