multi_timeframe_analyzer.py•66.2 kB
"""멀티 타임프레임 분석기"""
import asyncio
import statistics
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
from src.exceptions import TimeframeError, AnalysisError, InsufficientDataError
class MultiTimeframeAnalyzer:
"""멀티 타임프레임 분석기 클래스"""
def __init__(self, config: Dict[str, Any]):
"""
Args:
config: 분석기 설정 딕셔너리
"""
self.config = config
self.timeframes = config.get("timeframes", ["1m", "5m", "15m", "1h", "4h", "1d"])
self.aggregation_methods = config.get("aggregation_methods", {})
self.analysis_types = config.get("analysis_types", [])
self.correlation_thresholds = config.get("correlation_thresholds", {})
# 타임프레임 변환 테이블 (분 단위)
self.timeframe_minutes = {
"1m": 1,
"5m": 5,
"15m": 15,
"30m": 30,
"1h": 60,
"4h": 240,
"1d": 1440,
"1w": 10080
}
# 캐시
self.cache = {}
# 성능 메트릭
self.performance_metrics = {
"analysis_count": 0,
"total_processing_time": 0.0,
"cache_hits": 0,
"cache_misses": 0
}
async def aggregate_to_timeframe(self, data: List[Dict[str, Any]],
target_timeframe: str) -> List[Dict[str, Any]]:
"""데이터를 목표 타임프레임으로 집계"""
if target_timeframe not in self.timeframe_minutes:
raise TimeframeError(f"Invalid timeframe: {target_timeframe}")
if not data:
raise InsufficientDataError("No data to aggregate")
try:
# 타임프레임 분 단위 변환
target_minutes = self.timeframe_minutes[target_timeframe]
# 데이터를 시간순으로 정렬
sorted_data = sorted(data, key=lambda x: x["timestamp"])
# 집계된 데이터
aggregated = []
current_bucket = []
bucket_start = None
for item in sorted_data:
timestamp = datetime.fromisoformat(item["timestamp"])
if bucket_start is None:
# 첫 번째 버킷 시작
bucket_start = self._align_timestamp(timestamp, target_minutes)
current_bucket = [item]
else:
# 현재 버킷의 종료 시간
bucket_end = bucket_start + timedelta(minutes=target_minutes)
if timestamp < bucket_end:
# 같은 버킷에 추가
current_bucket.append(item)
else:
# 현재 버킷 집계 및 저장
if current_bucket:
aggregated_candle = self._aggregate_candle(current_bucket)
aggregated_candle["timestamp"] = bucket_start.isoformat()
aggregated.append(aggregated_candle)
# 새 버킷 시작
bucket_start = self._align_timestamp(timestamp, target_minutes)
current_bucket = [item]
# 마지막 버킷 처리
if current_bucket:
aggregated_candle = self._aggregate_candle(current_bucket)
aggregated_candle["timestamp"] = bucket_start.isoformat()
aggregated.append(aggregated_candle)
return aggregated
except Exception as e:
raise AnalysisError(f"Aggregation failed: {str(e)}")
async def analyze_timeframes(self, data: List[Dict[str, Any]],
timeframes: List[str]) -> Dict[str, Any]:
"""여러 타임프레임 분석"""
if not data:
raise InsufficientDataError("No data to analyze")
# 멀티 타임프레임 분석을 위한 최소 데이터 요구사항
min_required_points = max([self.timeframe_minutes.get(tf, 1) for tf in timeframes]) * 2
if len(data) < min_required_points:
raise InsufficientDataError(f"Insufficient data for multi-timeframe analysis. Need at least {min_required_points} data points, got {len(data)}")
try:
self.performance_metrics["analysis_count"] += 1
start_time = datetime.now()
# 각 타임프레임별 데이터 집계
timeframe_data = {}
for tf in timeframes:
if tf == "1m":
# 1분봉은 원본 데이터 사용
timeframe_data[tf] = data
else:
# 다른 타임프레임은 집계
timeframe_data[tf] = await self.aggregate_to_timeframe(data, tf)
# 트렌드 정렬 분석
trend_alignment = await self._analyze_trend_alignment(timeframe_data)
# 모멘텀 분석
momentum_analysis = await self._analyze_momentum(timeframe_data)
# 볼륨 프로파일 분석
volume_profile = await self._analyze_volume_profile(timeframe_data)
# 처리 시간 기록
processing_time = (datetime.now() - start_time).total_seconds()
self.performance_metrics["total_processing_time"] += processing_time
return {
"timeframe_data": timeframe_data,
"trend_alignment": trend_alignment,
"momentum_analysis": momentum_analysis,
"volume_profile": volume_profile
}
except Exception as e:
raise AnalysisError(f"Multi-timeframe analysis failed: {str(e)}")
async def detect_trend_alignment(self, data: List[Dict[str, Any]],
timeframes: List[str],
lookback_periods: Dict[str, int] = None) -> Dict[str, Any]:
"""트렌드 정렬 감지"""
if data is None:
raise AnalysisError("Invalid data for trend analysis")
if not data:
raise InsufficientDataError("No data for trend analysis")
try:
# 기본 룩백 기간 설정
if lookback_periods is None:
lookback_periods = {tf: 20 for tf in timeframes}
# 각 타임프레임별 트렌드 분석
timeframe_trends = {}
for tf in timeframes:
# 타임프레임별 데이터 준비
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 트렌드 계산
lookback = lookback_periods.get(tf, 20)
trend = await self._calculate_trend(tf_data, lookback)
timeframe_trends[tf] = trend
# 정렬 여부 확인
directions = [t["direction"] for t in timeframe_trends.values()]
unique_directions = set(directions)
aligned = len(unique_directions) == 1 and "neutral" not in unique_directions
# 전체 방향 및 강도 계산
if aligned:
direction = directions[0]
strengths = [t["strength"] for t in timeframe_trends.values()]
strength = sum(strengths) / len(strengths)
else:
# 가장 많은 방향
direction_counts = {}
for d in directions:
direction_counts[d] = direction_counts.get(d, 0) + 1
direction = max(direction_counts, key=direction_counts.get)
# 평균 강도
strengths = [t["strength"] for t in timeframe_trends.values()]
strength = sum(strengths) / len(strengths) * 0.5 # 정렬되지 않았으므로 감소
return {
"aligned": aligned,
"direction": direction,
"strength": strength,
"timeframe_trends": timeframe_trends
}
except Exception as e:
raise AnalysisError(f"Trend alignment detection failed: {str(e)}")
async def analyze_momentum_divergence(self, data: List[Dict[str, Any]],
timeframes: List[str],
momentum_indicator: str = "rsi") -> Dict[str, Any]:
"""모멘텀 다이버전스 분석"""
try:
divergences = []
momentum_values = {}
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 모멘텀 계산
if momentum_indicator == "rsi":
momentum = await self._calculate_rsi(tf_data, period=14)
else:
momentum = await self._calculate_momentum(tf_data, period=14)
momentum_values[tf] = momentum
# 다이버전스 검색
price_highs = self._find_peaks(tf_data, "high")
price_lows = self._find_peaks(tf_data, "low", is_valley=True)
momentum_highs = self._find_peaks(momentum, "value")
momentum_lows = self._find_peaks(momentum, "value", is_valley=True)
# Bearish divergence: 가격은 상승하지만 모멘텀은 하락
for i in range(1, len(price_highs)):
if (price_highs[i]["value"] > price_highs[i-1]["value"] and
i < len(momentum_highs) and i-1 < len(momentum_highs) and
momentum_highs[i]["value"] < momentum_highs[i-1]["value"]):
divergences.append({
"type": "bearish_divergence",
"timeframe": tf,
"index": price_highs[i]["index"],
"strength": abs(momentum_highs[i]["value"] - momentum_highs[i-1]["value"]) / 100
})
# Bullish divergence: 가격은 하락하지만 모멘텀은 상승
for i in range(1, len(price_lows)):
if (price_lows[i]["value"] < price_lows[i-1]["value"] and
i < len(momentum_lows) and i-1 < len(momentum_lows) and
momentum_lows[i]["value"] > momentum_lows[i-1]["value"]):
divergences.append({
"type": "bullish_divergence",
"timeframe": tf,
"index": price_lows[i]["index"],
"strength": abs(momentum_lows[i]["value"] - momentum_lows[i-1]["value"]) / 100
})
# 다이버전스 강도 계산
total_strength = sum(d["strength"] for d in divergences) if divergences else 0
return {
"divergences": divergences,
"momentum_values": momentum_values,
"divergence_strength": min(total_strength, 1.0)
}
except Exception as e:
raise AnalysisError(f"Momentum divergence analysis failed: {str(e)}")
async def analyze_volume_profile(self, data: List[Dict[str, Any]],
timeframes: List[str],
profile_bins: int = 20) -> Dict[str, Any]:
"""볼륨 프로파일 분석"""
try:
all_prices = []
all_volumes = []
# 모든 타임프레임 데이터 수집
for tf in timeframes:
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
for candle in tf_data:
if "close" in candle and "volume" in candle:
all_prices.append(candle["close"])
all_volumes.append(candle["volume"])
if not all_prices:
raise AnalysisError("No price data available")
# 가격 범위 계산
price_min = min(all_prices)
price_max = max(all_prices)
price_range = price_max - price_min
if price_range == 0:
raise AnalysisError("Price range is zero")
# 가격 구간별 볼륨 집계
bin_size = price_range / profile_bins
volume_distribution = [0] * profile_bins
price_levels = []
for i in range(profile_bins):
price_level = price_min + i * bin_size
price_levels.append(price_level)
# 볼륨 분배
for price, volume in zip(all_prices, all_volumes):
bin_index = min(int((price - price_min) / bin_size), profile_bins - 1)
volume_distribution[bin_index] += volume
# POC (Point of Control) 찾기
max_volume_index = volume_distribution.index(max(volume_distribution))
poc_price = price_levels[max_volume_index]
poc_volume = volume_distribution[max_volume_index]
# 가치 영역 계산 (70% 볼륨)
total_volume = sum(volume_distribution)
value_area_volume = total_volume * 0.7
# 중심에서 확장하며 가치 영역 찾기
accumulated_volume = poc_volume
low_index = high_index = max_volume_index
while accumulated_volume < value_area_volume:
# 위아래 중 볼륨이 큰 쪽으로 확장
expand_up = high_index < profile_bins - 1
expand_down = low_index > 0
if expand_up and expand_down:
if volume_distribution[high_index + 1] > volume_distribution[low_index - 1]:
high_index += 1
accumulated_volume += volume_distribution[high_index]
else:
low_index -= 1
accumulated_volume += volume_distribution[low_index]
elif expand_up:
high_index += 1
accumulated_volume += volume_distribution[high_index]
elif expand_down:
low_index -= 1
accumulated_volume += volume_distribution[low_index]
else:
break
value_area_low = price_levels[low_index]
value_area_high = price_levels[high_index]
return {
"price_levels": price_levels,
"volume_distribution": volume_distribution,
"poc": {
"price": poc_price,
"volume": poc_volume
},
"value_area": {
"high": value_area_high,
"low": value_area_low,
"volume_percentage": accumulated_volume / total_volume if total_volume > 0 else 0
}
}
except Exception as e:
raise AnalysisError(f"Volume profile analysis failed: {str(e)}")
async def find_support_resistance_levels(self, data: List[Dict[str, Any]],
timeframes: List[str],
min_touches: int = 2) -> Dict[str, Any]:
"""지지/저항 레벨 찾기"""
try:
all_levels = []
timeframe_levels = {}
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 지지/저항 레벨 찾기
tf_levels = await self._find_sr_levels(tf_data, min_touches)
timeframe_levels[tf] = tf_levels
# 전체 레벨에 추가
for level in tf_levels:
# 비슷한 레벨이 이미 있는지 확인
merged = False
for existing in all_levels:
if abs(existing["price"] - level["price"]) / level["price"] < 0.01: # 1% 이내
# 기존 레벨과 병합
existing["strength"] = max(existing["strength"], level["strength"])
existing["timeframes"].append(tf)
merged = True
break
if not merged:
all_levels.append({
"price": level["price"],
"type": level["type"],
"strength": level["strength"],
"timeframes": [tf]
})
# 강도순으로 정렬
all_levels.sort(key=lambda x: x["strength"], reverse=True)
# 컨플루언스 존 찾기
confluence_zones = []
for level in all_levels:
if len(level["timeframes"]) >= 2:
confluence_zones.append({
"price": level["price"],
"timeframe_count": len(level["timeframes"]),
"strength": level["strength"] * len(level["timeframes"])
})
return {
"levels": all_levels,
"timeframe_levels": timeframe_levels,
"confluence_zones": confluence_zones
}
except Exception as e:
raise AnalysisError(f"Support/resistance level detection failed: {str(e)}")
async def calculate_correlation_matrix(self, multi_symbol_data: Dict[str, List[Dict[str, Any]]],
timeframe: str,
correlation_window: int) -> Dict[str, Any]:
"""상관관계 매트릭스 계산"""
try:
symbols = list(multi_symbol_data.keys())
n_symbols = len(symbols)
# 상관관계 매트릭스 초기화
matrix = [[0.0 for _ in range(n_symbols)] for _ in range(n_symbols)]
# 각 심볼 쌍에 대해 상관관계 계산
for i in range(n_symbols):
for j in range(n_symbols):
if i == j:
matrix[i][j] = 1.0
else:
# 두 심볼의 수익률 계산
returns_i = self._calculate_returns(multi_symbol_data[symbols[i]])
returns_j = self._calculate_returns(multi_symbol_data[symbols[j]])
# 상관관계 계산
if len(returns_i) >= correlation_window and len(returns_j) >= correlation_window:
corr = self._calculate_correlation(
returns_i[-correlation_window:],
returns_j[-correlation_window:]
)
matrix[i][j] = corr
else:
matrix[i][j] = 0.0
# 유의미한 상관관계 찾기
significant_correlations = []
for i in range(n_symbols):
for j in range(i+1, n_symbols):
corr = matrix[i][j]
if abs(corr) >= self.correlation_thresholds.get("moderate", 0.6):
significant_correlations.append({
"symbol1": symbols[i],
"symbol2": symbols[j],
"correlation": corr,
"strength": "strong" if abs(corr) >= self.correlation_thresholds.get("strong", 0.8) else "moderate"
})
return {
"matrix": matrix,
"symbols": symbols,
"significant_correlations": significant_correlations
}
except Exception as e:
raise AnalysisError(f"Correlation matrix calculation failed: {str(e)}")
async def rank_timeframe_strength(self, data: List[Dict[str, Any]],
timeframes: List[str],
criteria: List[str]) -> Dict[str, Any]:
"""타임프레임 강도 순위"""
try:
scores = {}
rankings = []
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 각 기준별 점수 계산
breakdown = {}
total_score = 0
if "trend" in criteria:
trend_score = await self._calculate_trend_strength(tf_data)
breakdown["trend"] = trend_score
total_score += trend_score
if "momentum" in criteria:
momentum_score = await self._calculate_momentum_strength(tf_data)
breakdown["momentum"] = momentum_score
total_score += momentum_score
if "volume" in criteria:
volume_score = await self._calculate_volume_strength(tf_data)
breakdown["volume"] = volume_score
total_score += volume_score
# 평균 점수
avg_score = total_score / len(criteria)
scores[tf] = avg_score
rankings.append({
"timeframe": tf,
"score": avg_score,
"breakdown": breakdown
})
# 점수순으로 정렬
rankings.sort(key=lambda x: x["score"], reverse=True)
return {
"rankings": rankings,
"scores": scores,
"strongest_timeframe": rankings[0]["timeframe"] if rankings else None
}
except Exception as e:
raise AnalysisError(f"Timeframe strength ranking failed: {str(e)}")
async def identify_market_regime(self, data: List[Dict[str, Any]],
timeframes: List[str]) -> Dict[str, Any]:
"""시장 체제 식별"""
try:
regime_votes = {"trending": 0, "ranging": 0, "volatile": 0, "quiet": 0}
timeframe_regimes = {}
characteristics = {}
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 체제 특성 분석
volatility = await self._calculate_volatility(tf_data)
trend_strength = await self._calculate_trend_strength(tf_data)
range_ratio = await self._calculate_range_ratio(tf_data)
# 체제 결정
if trend_strength > 0.7:
regime = "trending"
elif range_ratio > 0.8:
regime = "ranging"
elif volatility > 0.02: # 2% 이상
regime = "volatile"
else:
regime = "quiet"
regime_votes[regime] += 1
timeframe_regimes[tf] = regime
# 전체 체제 결정
overall_regime = max(regime_votes, key=regime_votes.get)
confidence = regime_votes[overall_regime] / len(timeframes)
# 특성 계산
all_volatilities = []
all_trend_strengths = []
for tf in timeframes:
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
all_volatilities.append(await self._calculate_volatility(tf_data))
all_trend_strengths.append(await self._calculate_trend_strength(tf_data))
characteristics = {
"volatility": sum(all_volatilities) / len(all_volatilities),
"trend_strength": sum(all_trend_strengths) / len(all_trend_strengths),
"volume_profile": "normal" # 단순화
}
return {
"regime": overall_regime,
"confidence": confidence,
"characteristics": characteristics,
"timeframe_regimes": timeframe_regimes
}
except Exception as e:
raise AnalysisError(f"Market regime identification failed: {str(e)}")
async def detect_divergence_confluence(self, data: List[Dict[str, Any]],
timeframes: List[str],
indicators: List[str]) -> Dict[str, Any]:
"""다이버전스 컨플루언스 감지"""
try:
all_divergences = []
# 각 타임프레임과 지표에 대해 다이버전스 분석
for tf in timeframes:
for indicator in indicators:
divergence_result = await self.analyze_momentum_divergence(
data, [tf], indicator
)
for div in divergence_result["divergences"]:
div["indicator"] = indicator
all_divergences.append(div)
# 컨플루언스 존 찾기
confluence_zones = []
processed_indices = set()
for i, div1 in enumerate(all_divergences):
if i in processed_indices:
continue
# 근처의 다이버전스 찾기
zone_divergences = [div1]
zone_indices = {i}
for j, div2 in enumerate(all_divergences[i+1:], i+1):
if abs(div1["index"] - div2["index"]) <= 5: # 5 캔들 이내
zone_divergences.append(div2)
zone_indices.add(j)
if len(zone_divergences) >= 2:
# 컨플루언스 존 생성
processed_indices.update(zone_indices)
timeframes_involved = list(set(d["timeframe"] for d in zone_divergences))
indicators_involved = list(set(d["indicator"] for d in zone_divergences))
confluence_zones.append({
"start_index": min(d["index"] for d in zone_divergences),
"end_index": max(d["index"] for d in zone_divergences),
"timeframes_involved": timeframes_involved,
"indicators_involved": indicators_involved,
"strength": sum(d["strength"] for d in zone_divergences) / len(zone_divergences)
})
# 강도 분포
strength_distribution = {
"strong": len([z for z in confluence_zones if z["strength"] > 0.7]),
"moderate": len([z for z in confluence_zones if 0.3 <= z["strength"] <= 0.7]),
"weak": len([z for z in confluence_zones if z["strength"] < 0.3])
}
return {
"confluence_zones": confluence_zones,
"divergence_count": len(all_divergences),
"strength_distribution": strength_distribution
}
except Exception as e:
raise AnalysisError(f"Divergence confluence detection failed: {str(e)}")
async def select_optimal_timeframe(self, data: List[Dict[str, Any]],
trading_style: str,
market_conditions: Dict[str, Any]) -> Dict[str, Any]:
"""최적 타임프레임 선택"""
try:
# 거래 스타일별 선호 타임프레임
style_preferences = {
"scalping": ["1m", "5m"],
"day_trading": ["5m", "15m", "1h"],
"swing_trading": ["1h", "4h", "1d"]
}
preferred_timeframes = style_preferences.get(trading_style, ["15m", "1h"])
# 각 타임프레임 평가
scores = {}
for tf in preferred_timeframes:
score = 0.0
# 시장 조건에 따른 점수 조정
if market_conditions.get("volatility") == "high":
# 높은 변동성에서는 짧은 타임프레임 선호
if tf in ["1m", "5m"]:
score += 0.3
elif market_conditions.get("volatility") == "low":
# 낮은 변동성에서는 긴 타임프레임 선호
if tf in ["1h", "4h", "1d"]:
score += 0.3
if market_conditions.get("trend") == "strong":
# 강한 트렌드에서는 중간 타임프레임 선호
if tf in ["15m", "1h"]:
score += 0.4
# 타임프레임별 기본 점수
base_scores = {
"1m": 0.5,
"5m": 0.6,
"15m": 0.7,
"1h": 0.8,
"4h": 0.7,
"1d": 0.6
}
score += base_scores.get(tf, 0.5)
scores[tf] = min(score, 1.0)
# 최적 타임프레임 선택
optimal_tf = max(scores, key=scores.get)
# 대안 타임프레임
alternatives = []
for tf, score in scores.items():
if tf != optimal_tf:
alternatives.append({"timeframe": tf, "score": score})
alternatives.sort(key=lambda x: x["score"], reverse=True)
# 추천 이유 생성
reasoning = []
if trading_style == "scalping":
reasoning.append("짧은 타임프레임이 빠른 진입/청산에 적합")
elif trading_style == "swing_trading":
reasoning.append("긴 타임프레임이 큰 추세 포착에 유리")
if market_conditions.get("volatility") == "high":
reasoning.append("높은 변동성으로 인해 짧은 타임프레임 권장")
return {
"recommended_timeframe": optimal_tf,
"reasoning": reasoning,
"alternative_timeframes": alternatives,
"confidence_score": scores[optimal_tf]
}
except Exception as e:
raise AnalysisError(f"Optimal timeframe selection failed: {str(e)}")
async def analyze_fractals(self, data: List[Dict[str, Any]],
timeframes: List[str],
fractal_period: int = 5) -> Dict[str, Any]:
"""프랙탈 분석"""
try:
fractals = {}
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 프랙탈 찾기
tf_fractals = []
for i in range(fractal_period//2, len(tf_data) - fractal_period//2):
# Up fractal
is_up_fractal = True
center_high = tf_data[i].get("high", tf_data[i].get("price", 0))
for j in range(i - fractal_period//2, i + fractal_period//2 + 1):
if j != i:
compare_high = tf_data[j].get("high", tf_data[j].get("price", 0))
if compare_high >= center_high:
is_up_fractal = False
break
if is_up_fractal:
tf_fractals.append({
"type": "up",
"index": i,
"price": center_high,
"strength": 0.8
})
# Down fractal
is_down_fractal = True
center_low = tf_data[i].get("low", tf_data[i].get("price", 0))
for j in range(i - fractal_period//2, i + fractal_period//2 + 1):
if j != i:
compare_low = tf_data[j].get("low", tf_data[j].get("price", 0))
if compare_low <= center_low:
is_down_fractal = False
break
if is_down_fractal:
tf_fractals.append({
"type": "down",
"index": i,
"price": center_low,
"strength": 0.8
})
fractals[tf] = tf_fractals
# 프랙탈 차원 계산 (간단화)
fractal_dimensions = {}
for tf, tf_fractals in fractals.items():
if len(tf_fractals) > 1:
# 프랙탈 간격의 변동성으로 차원 추정
intervals = []
for i in range(1, len(tf_fractals)):
intervals.append(tf_fractals[i]["index"] - tf_fractals[i-1]["index"])
if intervals:
avg_interval = sum(intervals) / len(intervals)
std_interval = statistics.stdev(intervals) if len(intervals) > 1 else 0
dimension = 1 + (std_interval / avg_interval if avg_interval > 0 else 0)
fractal_dimensions[tf] = min(dimension, 2.0)
else:
fractal_dimensions[tf] = 1.0
else:
fractal_dimensions[tf] = 1.0
# 자기 유사성 점수
self_similarity_score = 0.7 # 단순화된 점수
return {
"fractals": fractals,
"fractal_dimensions": fractal_dimensions,
"self_similarity_score": self_similarity_score
}
except Exception as e:
raise AnalysisError(f"Fractal analysis failed: {str(e)}")
async def analyze_timeframe_transitions(self, data: List[Dict[str, Any]],
from_timeframe: str,
to_timeframe: str) -> Dict[str, Any]:
"""타임프레임 전환 분석"""
try:
# 두 타임프레임 데이터 준비
if from_timeframe == "1m":
from_data = data
else:
from_data = await self.aggregate_to_timeframe(data, from_timeframe)
if to_timeframe == "1m":
to_data = data
else:
to_data = await self.aggregate_to_timeframe(data, to_timeframe)
# 전환점 찾기
transition_points = []
confirmed_count = 0
# 단순화된 신호 생성
from_signals = await self._generate_simple_signals(from_data)
to_signals = await self._generate_simple_signals(to_data)
# 타임프레임 비율 계산
tf_ratio = self.timeframe_minutes[to_timeframe] / self.timeframe_minutes[from_timeframe]
for i, signal in enumerate(from_signals):
if signal["type"] != "hold":
# 대응하는 상위 타임프레임 인덱스
to_index = int(i / tf_ratio)
if to_index < len(to_signals):
# 신호 확인
confirmed = signal["type"] == to_signals[to_index]["type"]
transition_points.append({
f"index_{from_timeframe}": i,
f"index_{to_timeframe}": to_index,
"signal_type": signal["type"],
"confirmed": confirmed
})
if confirmed:
confirmed_count += 1
# 신호 품질 및 확인률 계산
total_signals = len(transition_points)
confirmation_rate = confirmed_count / total_signals if total_signals > 0 else 0
signal_quality = confirmation_rate * 0.8 + 0.2 # 기본 품질 20%
return {
"transition_points": transition_points,
"signal_quality": signal_quality,
"confirmation_rate": confirmation_rate
}
except Exception as e:
raise AnalysisError(f"Timeframe transition analysis failed: {str(e)}")
async def analyze_volatility_profile(self, data: List[Dict[str, Any]],
timeframes: List[str],
volatility_window: int = 20) -> Dict[str, Any]:
"""변동성 프로파일 분석"""
try:
volatility_by_timeframe = {}
for tf in timeframes:
# 타임프레임별 데이터
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
# 변동성 계산
volatilities = []
for i in range(volatility_window, len(tf_data)):
window_data = tf_data[i-volatility_window:i]
vol = await self._calculate_volatility(window_data)
volatilities.append(vol)
if volatilities:
current_vol = volatilities[-1]
avg_vol = sum(volatilities) / len(volatilities)
# 백분위수 계산
sorted_vols = sorted(volatilities)
percentile_index = next(i for i, v in enumerate(sorted_vols) if v >= current_vol)
percentile = (percentile_index / len(sorted_vols)) * 100
volatility_by_timeframe[tf] = {
"average": avg_vol,
"current": current_vol,
"percentile": percentile
}
# 변동성 비율 계산
volatility_ratio = {}
tf_list = list(volatility_by_timeframe.keys())
for i in range(len(tf_list) - 1):
tf1, tf2 = tf_list[i], tf_list[i+1]
if tf1 in volatility_by_timeframe and tf2 in volatility_by_timeframe:
ratio = volatility_by_timeframe[tf1]["current"] / volatility_by_timeframe[tf2]["current"]
volatility_ratio[f"{tf1}/{tf2}"] = ratio
# 확장/수축 판단
avg_current = sum(v["current"] for v in volatility_by_timeframe.values()) / len(volatility_by_timeframe)
avg_average = sum(v["average"] for v in volatility_by_timeframe.values()) / len(volatility_by_timeframe)
if avg_current > avg_average * 1.2:
expansion_contraction = "expansion"
elif avg_current < avg_average * 0.8:
expansion_contraction = "contraction"
else:
expansion_contraction = "normal"
return {
"volatility_by_timeframe": volatility_by_timeframe,
"volatility_ratio": volatility_ratio,
"expansion_contraction": expansion_contraction
}
except Exception as e:
raise AnalysisError(f"Volatility profile analysis failed: {str(e)}")
async def optimize_entry_exit_points(self, data: List[Dict[str, Any]],
primary_timeframe: str,
confirmation_timeframes: List[str],
strategy_type: str) -> Dict[str, Any]:
"""진입/청산 포인트 최적화"""
try:
# 주 타임프레임 신호
if primary_timeframe == "1m":
primary_data = data
else:
primary_data = await self.aggregate_to_timeframe(data, primary_timeframe)
primary_signals = await self._generate_strategy_signals(primary_data, strategy_type)
# 확인 타임프레임 신호
confirmation_signals = {}
for tf in confirmation_timeframes:
if tf == "1m":
tf_data = data
else:
tf_data = await self.aggregate_to_timeframe(data, tf)
confirmation_signals[tf] = await self._generate_strategy_signals(tf_data, strategy_type)
# 최적화된 진입/청산 포인트
entry_points = []
exit_points = []
for i, signal in enumerate(primary_signals):
if signal["type"] == "buy":
# 확인 신호 체크
confirmations = []
for tf, tf_signals in confirmation_signals.items():
# 대응하는 인덱스 찾기
tf_index = self._find_corresponding_index(i, primary_timeframe, tf, len(tf_signals))
if tf_index < len(tf_signals) and tf_signals[tf_index]["type"] == "buy":
confirmations.append(tf)
if confirmations:
entry_points.append({
"index": i,
"price": primary_data[i].get("close", primary_data[i].get("price", 0)),
"confidence": len(confirmations) / len(confirmation_timeframes),
"confirmations": confirmations
})
elif signal["type"] == "sell" and entry_points:
# 청산 포인트
exit_points.append({
"index": i,
"price": primary_data[i].get("close", primary_data[i].get("price", 0)),
"reason": "signal"
})
# 리스크/리워드 비율 계산
risk_reward_ratios = []
for entry, exit in zip(entry_points[:len(exit_points)], exit_points):
reward = exit["price"] - entry["price"]
risk = entry["price"] * 0.02 # 2% 손절
if risk > 0:
risk_reward_ratios.append(abs(reward / risk))
avg_rr = sum(risk_reward_ratios) / len(risk_reward_ratios) if risk_reward_ratios else 0
# 승률 추정
profitable_trades = sum(1 for rr in risk_reward_ratios if rr > 0)
win_rate = profitable_trades / len(risk_reward_ratios) if risk_reward_ratios else 0
return {
"entry_points": entry_points,
"exit_points": exit_points,
"risk_reward_ratio": avg_rr,
"win_rate_estimate": win_rate
}
except Exception as e:
raise AnalysisError(f"Entry/exit optimization failed: {str(e)}")
async def synchronize_timeframe_data(self, multi_tf_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""타임프레임 데이터 동기화"""
try:
# 모든 타임스탬프 수집
all_timestamps = set()
for tf_data in multi_tf_data.values():
for item in tf_data:
all_timestamps.add(item["timestamp"])
# 타임스탬프 정렬
sorted_timestamps = sorted(all_timestamps)
# 동기화된 데이터
aligned_data = {}
for tf, tf_data in multi_tf_data.items():
# 타임스탬프를 키로 하는 딕셔너리 생성
tf_dict = {item["timestamp"]: item for item in tf_data}
# 정렬된 타임스탬프에 맞춰 데이터 정렬
aligned_tf_data = []
for ts in sorted_timestamps:
if ts in tf_dict:
aligned_tf_data.append(tf_dict[ts])
else:
# 누락된 데이터는 이전 값으로 채우기
if aligned_tf_data:
filled_data = aligned_tf_data[-1].copy()
filled_data["timestamp"] = ts
aligned_tf_data.append(filled_data)
aligned_data[tf] = aligned_tf_data
# 정렬 품질 평가 (더 현실적인 계산)
# 각 타임프레임별 예상 데이터 포인트 수 계산
if sorted_timestamps:
start_time = datetime.fromisoformat(sorted_timestamps[0])
end_time = datetime.fromisoformat(sorted_timestamps[-1])
total_minutes = (end_time - start_time).total_seconds() / 60
expected_quality = 0
for tf in multi_tf_data.keys():
tf_minutes = self.timeframe_minutes.get(tf, 1)
expected_points = max(1, int(total_minutes / tf_minutes))
actual_points = len(multi_tf_data[tf])
tf_quality = min(1.0, actual_points / expected_points) if expected_points > 0 else 1.0
expected_quality += tf_quality
alignment_quality = expected_quality / len(multi_tf_data) if multi_tf_data else 0
else:
alignment_quality = 0
return {
"synchronized_timestamps": sorted_timestamps,
"aligned_data": aligned_data,
"alignment_quality": alignment_quality
}
except Exception as e:
raise AnalysisError(f"Data synchronization failed: {str(e)}")
def get_performance_metrics(self) -> Dict[str, Any]:
"""성능 메트릭 조회"""
cache_total = self.performance_metrics["cache_hits"] + self.performance_metrics["cache_misses"]
cache_hit_rate = self.performance_metrics["cache_hits"] / cache_total if cache_total > 0 else 0
avg_processing_time = (self.performance_metrics["total_processing_time"] /
self.performance_metrics["analysis_count"]
if self.performance_metrics["analysis_count"] > 0 else 0)
return {
"analysis_count": self.performance_metrics["analysis_count"],
"average_processing_time": avg_processing_time,
"cache_hit_rate": cache_hit_rate
}
# === 내부 헬퍼 메서드들 ===
def _align_timestamp(self, timestamp: datetime, minutes: int) -> datetime:
"""타임스탬프를 타임프레임에 맞춰 정렬"""
# 분 단위로 내림
aligned_minute = (timestamp.minute // minutes) * minutes
return timestamp.replace(minute=aligned_minute, second=0, microsecond=0)
def _aggregate_candle(self, candles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""캔들 데이터 집계"""
if not candles:
return {}
# OHLCV 집계
opens = [c.get("open", c.get("price", 0)) for c in candles]
highs = [c.get("high", c.get("price", 0)) for c in candles]
lows = [c.get("low", c.get("price", 0)) for c in candles]
closes = [c.get("close", c.get("price", 0)) for c in candles]
volumes = [c.get("volume", 0) for c in candles]
return {
"open": opens[0] if opens else 0,
"high": max(highs) if highs else 0,
"low": min(lows) if lows else 0,
"close": closes[-1] if closes else 0,
"volume": sum(volumes)
}
async def _analyze_trend_alignment(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""트렌드 정렬 분석"""
trends = {}
for tf, data in timeframe_data.items():
if len(data) >= 20:
trend = await self._calculate_trend(data, 20)
trends[tf] = trend
# 정렬 점수 계산
if trends:
directions = [t["direction"] for t in trends.values()]
same_direction = sum(1 for d in directions if d == directions[0])
alignment_score = same_direction / len(directions)
else:
alignment_score = 0
return {
"alignment_score": alignment_score,
"timeframe_trends": trends
}
async def _analyze_momentum(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""모멘텀 분석"""
momentum_data = {}
for tf, data in timeframe_data.items():
if len(data) >= 14:
momentum = await self._calculate_momentum(data, 14)
momentum_data[tf] = {
"current": momentum[-1]["value"] if momentum else 0,
"average": sum(m["value"] for m in momentum) / len(momentum) if momentum else 0
}
return momentum_data
async def _analyze_volume_profile(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
"""볼륨 프로파일 분석"""
total_volume = 0
volume_weighted_price = 0
for tf, data in timeframe_data.items():
for candle in data:
volume = candle.get("volume", 0)
price = candle.get("close", candle.get("price", 0))
total_volume += volume
volume_weighted_price += price * volume
vwap = volume_weighted_price / total_volume if total_volume > 0 else 0
return {
"total_volume": total_volume,
"vwap": vwap
}
async def _calculate_trend(self, data: List[Dict[str, Any]], lookback: int) -> Dict[str, Any]:
"""트렌드 계산"""
if len(data) < lookback:
return {"direction": "neutral", "strength": 0}
# 단순 이동평균 기반 트렌드
recent_data = data[-lookback:]
prices = [d.get("close", d.get("price", 0)) for d in recent_data]
# 선형 회귀
n = len(prices)
if n < 2:
return {"direction": "neutral", "strength": 0}
x_sum = sum(range(n))
y_sum = sum(prices)
xy_sum = sum(i * prices[i] for i in range(n))
x2_sum = sum(i * i for i in range(n))
denominator = n * x2_sum - x_sum * x_sum
if denominator == 0:
return {"direction": "neutral", "strength": 0}
slope = (n * xy_sum - x_sum * y_sum) / denominator
# 방향 결정
if slope > 0.001:
direction = "up"
elif slope < -0.001:
direction = "down"
else:
direction = "neutral"
# 강도 계산 (R-squared)
y_mean = y_sum / n
ss_tot = sum((y - y_mean) ** 2 for y in prices)
if ss_tot > 0:
y_pred = [slope * i + (y_mean - slope * x_sum / n) for i in range(n)]
ss_res = sum((prices[i] - y_pred[i]) ** 2 for i in range(n))
r_squared = 1 - (ss_res / ss_tot)
strength = max(0, min(1, r_squared))
else:
strength = 0
return {"direction": direction, "strength": strength}
async def _calculate_rsi(self, data: List[Dict[str, Any]], period: int = 14) -> List[Dict[str, Any]]:
"""RSI 계산"""
if len(data) < period + 1:
return []
prices = [d.get("close", d.get("price", 0)) for d in data]
# 가격 변화 계산
changes = [prices[i] - prices[i-1] for i in range(1, len(prices))]
# 상승/하락 분리
gains = [c if c > 0 else 0 for c in changes]
losses = [-c if c < 0 else 0 for c in changes]
# 초기 평균
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
rsi_values = []
for i in range(period, len(changes)):
# 평활 이동평균
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
rsi = 100
else:
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
rsi_values.append({
"timestamp": data[i+1]["timestamp"],
"value": rsi
})
return rsi_values
async def _calculate_momentum(self, data: List[Dict[str, Any]], period: int = 14) -> List[Dict[str, Any]]:
"""모멘텀 계산"""
if len(data) < period:
return []
momentum_values = []
for i in range(period, len(data)):
current_price = data[i].get("close", data[i].get("price", 0))
past_price = data[i-period].get("close", data[i-period].get("price", 0))
if past_price > 0:
momentum = ((current_price - past_price) / past_price) * 100
else:
momentum = 0
momentum_values.append({
"timestamp": data[i]["timestamp"],
"value": momentum
})
return momentum_values
def _find_peaks(self, data: List[Dict[str, Any]], value_key: str, is_valley: bool = False) -> List[Dict[str, Any]]:
"""피크/밸리 찾기"""
peaks = []
for i in range(1, len(data) - 1):
current = data[i].get(value_key, 0)
prev = data[i-1].get(value_key, 0)
next_val = data[i+1].get(value_key, 0)
if is_valley:
# 밸리 (저점)
if current < prev and current < next_val:
peaks.append({
"index": i,
"value": current,
"timestamp": data[i].get("timestamp")
})
else:
# 피크 (고점)
if current > prev and current > next_val:
peaks.append({
"index": i,
"value": current,
"timestamp": data[i].get("timestamp")
})
return peaks
async def _find_sr_levels(self, data: List[Dict[str, Any]], min_touches: int) -> List[Dict[str, Any]]:
"""지지/저항 레벨 찾기"""
levels = []
# 가격 범위
prices = [d.get("close", d.get("price", 0)) for d in data]
if not prices:
return levels
price_min = min(prices)
price_max = max(prices)
price_range = price_max - price_min
if price_range == 0:
return levels
# 가격 구간 생성 (1% 단위)
step = price_range * 0.01
price_levels = []
current_level = price_min
while current_level <= price_max:
price_levels.append(current_level)
current_level += step
# 각 레벨에서 터치 횟수 계산
for level in price_levels:
touches = 0
level_type = None
for i, candle in enumerate(data):
high = candle.get("high", candle.get("price", 0))
low = candle.get("low", candle.get("price", 0))
# 레벨 터치 확인 (1% 오차 허용)
if abs(high - level) / level < 0.01:
touches += 1
if not level_type:
level_type = "resistance"
elif abs(low - level) / level < 0.01:
touches += 1
if not level_type:
level_type = "support"
if touches >= min_touches and level_type:
levels.append({
"price": level,
"type": level_type,
"touches": touches,
"strength": min(touches / 10, 1.0)
})
return levels
def _calculate_returns(self, data: List[Dict[str, Any]]) -> List[float]:
"""수익률 계산"""
returns = []
for i in range(1, len(data)):
current = data[i].get("close", data[i].get("price", 0))
previous = data[i-1].get("close", data[i-1].get("price", 0))
if previous > 0:
ret = (current - previous) / previous
returns.append(ret)
return returns
def _calculate_correlation(self, returns1: List[float], returns2: List[float]) -> float:
"""상관관계 계산"""
if len(returns1) != len(returns2) or len(returns1) < 2:
return 0.0
# 평균
mean1 = sum(returns1) / len(returns1)
mean2 = sum(returns2) / len(returns2)
# 공분산
covariance = sum((r1 - mean1) * (r2 - mean2) for r1, r2 in zip(returns1, returns2)) / len(returns1)
# 표준편차
std1 = (sum((r - mean1) ** 2 for r in returns1) / len(returns1)) ** 0.5
std2 = (sum((r - mean2) ** 2 for r in returns2) / len(returns2)) ** 0.5
if std1 == 0 or std2 == 0:
return 0.0
# 상관계수
correlation = covariance / (std1 * std2)
return max(-1, min(1, correlation))
async def _calculate_trend_strength(self, data: List[Dict[str, Any]]) -> float:
"""트렌드 강도 계산"""
if len(data) < 20:
return 0.0
trend = await self._calculate_trend(data, min(20, len(data)))
return trend["strength"]
async def _calculate_momentum_strength(self, data: List[Dict[str, Any]]) -> float:
"""모멘텀 강도 계산"""
if len(data) < 14:
return 0.0
momentum = await self._calculate_momentum(data, 14)
if not momentum:
return 0.0
# 모멘텀의 절대값 평균
avg_momentum = sum(abs(m["value"]) for m in momentum) / len(momentum)
# 0-1 범위로 정규화
return min(avg_momentum / 10, 1.0)
async def _calculate_volume_strength(self, data: List[Dict[str, Any]]) -> float:
"""볼륨 강도 계산"""
if len(data) < 20:
return 0.0
volumes = [d.get("volume", 0) for d in data]
if not volumes:
return 0.0
# 최근 볼륨과 평균 볼륨 비교
recent_volume = sum(volumes[-5:]) / 5
avg_volume = sum(volumes) / len(volumes)
if avg_volume == 0:
return 0.0
volume_ratio = recent_volume / avg_volume
# 0-1 범위로 정규화
return min(volume_ratio, 2.0) / 2.0
async def _calculate_volatility(self, data: List[Dict[str, Any]]) -> float:
"""변동성 계산"""
if len(data) < 2:
return 0.0
returns = self._calculate_returns(data)
if not returns:
return 0.0
# 수익률의 표준편차
mean_return = sum(returns) / len(returns)
variance = sum((r - mean_return) ** 2 for r in returns) / len(returns)
volatility = variance ** 0.5
return volatility
async def _calculate_range_ratio(self, data: List[Dict[str, Any]]) -> float:
"""레인지 비율 계산"""
if len(data) < 20:
return 0.0
prices = [d.get("close", d.get("price", 0)) for d in data[-20:]]
if not prices:
return 0.0
price_range = max(prices) - min(prices)
avg_price = sum(prices) / len(prices)
if avg_price == 0:
return 0.0
# 레인지가 작을수록 높은 비율
range_ratio = 1 - (price_range / avg_price)
return max(0, min(1, range_ratio))
async def _generate_simple_signals(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""간단한 거래 신호 생성"""
signals = []
if len(data) < 20:
return signals
# 이동평균 교차 전략
for i in range(20, len(data)):
short_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-5:i]) / 5
long_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-20:i]) / 20
prev_short_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-6:i-1]) / 5
prev_long_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-21:i-1]) / 20
if prev_short_ma <= prev_long_ma and short_ma > long_ma:
signal_type = "buy"
elif prev_short_ma >= prev_long_ma and short_ma < long_ma:
signal_type = "sell"
else:
signal_type = "hold"
signals.append({
"index": i,
"type": signal_type,
"timestamp": data[i]["timestamp"]
})
return signals
async def _generate_strategy_signals(self, data: List[Dict[str, Any]], strategy_type: str) -> List[Dict[str, Any]]:
"""전략별 신호 생성"""
if strategy_type == "momentum":
# 모멘텀 전략
momentum = await self._calculate_momentum(data, 14)
signals = []
for i, mom in enumerate(momentum):
if mom["value"] > 2: # 2% 이상 상승
signal_type = "buy"
elif mom["value"] < -2: # 2% 이상 하락
signal_type = "sell"
else:
signal_type = "hold"
signals.append({
"index": i + 14, # 모멘텀 계산에 사용된 기간 보정
"type": signal_type,
"timestamp": mom["timestamp"]
})
return signals
else:
# 기본 이동평균 전략
return await self._generate_simple_signals(data)
def _find_corresponding_index(self, primary_index: int, primary_tf: str,
target_tf: str, target_length: int) -> int:
"""대응하는 타임프레임 인덱스 찾기"""
if primary_tf not in self.timeframe_minutes or target_tf not in self.timeframe_minutes:
return 0
ratio = self.timeframe_minutes[target_tf] / self.timeframe_minutes[primary_tf]
target_index = int(primary_index / ratio)
return min(target_index, target_length - 1)