Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
multi_timeframe_analyzer.py66.2 kB
"""멀티 타임프레임 분석기""" import asyncio import statistics from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from src.exceptions import TimeframeError, AnalysisError, InsufficientDataError class MultiTimeframeAnalyzer: """멀티 타임프레임 분석기 클래스""" def __init__(self, config: Dict[str, Any]): """ Args: config: 분석기 설정 딕셔너리 """ self.config = config self.timeframes = config.get("timeframes", ["1m", "5m", "15m", "1h", "4h", "1d"]) self.aggregation_methods = config.get("aggregation_methods", {}) self.analysis_types = config.get("analysis_types", []) self.correlation_thresholds = config.get("correlation_thresholds", {}) # 타임프레임 변환 테이블 (분 단위) self.timeframe_minutes = { "1m": 1, "5m": 5, "15m": 15, "30m": 30, "1h": 60, "4h": 240, "1d": 1440, "1w": 10080 } # 캐시 self.cache = {} # 성능 메트릭 self.performance_metrics = { "analysis_count": 0, "total_processing_time": 0.0, "cache_hits": 0, "cache_misses": 0 } async def aggregate_to_timeframe(self, data: List[Dict[str, Any]], target_timeframe: str) -> List[Dict[str, Any]]: """데이터를 목표 타임프레임으로 집계""" if target_timeframe not in self.timeframe_minutes: raise TimeframeError(f"Invalid timeframe: {target_timeframe}") if not data: raise InsufficientDataError("No data to aggregate") try: # 타임프레임 분 단위 변환 target_minutes = self.timeframe_minutes[target_timeframe] # 데이터를 시간순으로 정렬 sorted_data = sorted(data, key=lambda x: x["timestamp"]) # 집계된 데이터 aggregated = [] current_bucket = [] bucket_start = None for item in sorted_data: timestamp = datetime.fromisoformat(item["timestamp"]) if bucket_start is None: # 첫 번째 버킷 시작 bucket_start = self._align_timestamp(timestamp, target_minutes) current_bucket = [item] else: # 현재 버킷의 종료 시간 bucket_end = bucket_start + timedelta(minutes=target_minutes) if timestamp < bucket_end: # 같은 버킷에 추가 current_bucket.append(item) else: # 현재 버킷 집계 및 저장 if current_bucket: aggregated_candle = self._aggregate_candle(current_bucket) aggregated_candle["timestamp"] = bucket_start.isoformat() aggregated.append(aggregated_candle) # 새 버킷 시작 bucket_start = self._align_timestamp(timestamp, target_minutes) current_bucket = [item] # 마지막 버킷 처리 if current_bucket: aggregated_candle = self._aggregate_candle(current_bucket) aggregated_candle["timestamp"] = bucket_start.isoformat() aggregated.append(aggregated_candle) return aggregated except Exception as e: raise AnalysisError(f"Aggregation failed: {str(e)}") async def analyze_timeframes(self, data: List[Dict[str, Any]], timeframes: List[str]) -> Dict[str, Any]: """여러 타임프레임 분석""" if not data: raise InsufficientDataError("No data to analyze") # 멀티 타임프레임 분석을 위한 최소 데이터 요구사항 min_required_points = max([self.timeframe_minutes.get(tf, 1) for tf in timeframes]) * 2 if len(data) < min_required_points: raise InsufficientDataError(f"Insufficient data for multi-timeframe analysis. Need at least {min_required_points} data points, got {len(data)}") try: self.performance_metrics["analysis_count"] += 1 start_time = datetime.now() # 각 타임프레임별 데이터 집계 timeframe_data = {} for tf in timeframes: if tf == "1m": # 1분봉은 원본 데이터 사용 timeframe_data[tf] = data else: # 다른 타임프레임은 집계 timeframe_data[tf] = await self.aggregate_to_timeframe(data, tf) # 트렌드 정렬 분석 trend_alignment = await self._analyze_trend_alignment(timeframe_data) # 모멘텀 분석 momentum_analysis = await self._analyze_momentum(timeframe_data) # 볼륨 프로파일 분석 volume_profile = await self._analyze_volume_profile(timeframe_data) # 처리 시간 기록 processing_time = (datetime.now() - start_time).total_seconds() self.performance_metrics["total_processing_time"] += processing_time return { "timeframe_data": timeframe_data, "trend_alignment": trend_alignment, "momentum_analysis": momentum_analysis, "volume_profile": volume_profile } except Exception as e: raise AnalysisError(f"Multi-timeframe analysis failed: {str(e)}") async def detect_trend_alignment(self, data: List[Dict[str, Any]], timeframes: List[str], lookback_periods: Dict[str, int] = None) -> Dict[str, Any]: """트렌드 정렬 감지""" if data is None: raise AnalysisError("Invalid data for trend analysis") if not data: raise InsufficientDataError("No data for trend analysis") try: # 기본 룩백 기간 설정 if lookback_periods is None: lookback_periods = {tf: 20 for tf in timeframes} # 각 타임프레임별 트렌드 분석 timeframe_trends = {} for tf in timeframes: # 타임프레임별 데이터 준비 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 트렌드 계산 lookback = lookback_periods.get(tf, 20) trend = await self._calculate_trend(tf_data, lookback) timeframe_trends[tf] = trend # 정렬 여부 확인 directions = [t["direction"] for t in timeframe_trends.values()] unique_directions = set(directions) aligned = len(unique_directions) == 1 and "neutral" not in unique_directions # 전체 방향 및 강도 계산 if aligned: direction = directions[0] strengths = [t["strength"] for t in timeframe_trends.values()] strength = sum(strengths) / len(strengths) else: # 가장 많은 방향 direction_counts = {} for d in directions: direction_counts[d] = direction_counts.get(d, 0) + 1 direction = max(direction_counts, key=direction_counts.get) # 평균 강도 strengths = [t["strength"] for t in timeframe_trends.values()] strength = sum(strengths) / len(strengths) * 0.5 # 정렬되지 않았으므로 감소 return { "aligned": aligned, "direction": direction, "strength": strength, "timeframe_trends": timeframe_trends } except Exception as e: raise AnalysisError(f"Trend alignment detection failed: {str(e)}") async def analyze_momentum_divergence(self, data: List[Dict[str, Any]], timeframes: List[str], momentum_indicator: str = "rsi") -> Dict[str, Any]: """모멘텀 다이버전스 분석""" try: divergences = [] momentum_values = {} for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 모멘텀 계산 if momentum_indicator == "rsi": momentum = await self._calculate_rsi(tf_data, period=14) else: momentum = await self._calculate_momentum(tf_data, period=14) momentum_values[tf] = momentum # 다이버전스 검색 price_highs = self._find_peaks(tf_data, "high") price_lows = self._find_peaks(tf_data, "low", is_valley=True) momentum_highs = self._find_peaks(momentum, "value") momentum_lows = self._find_peaks(momentum, "value", is_valley=True) # Bearish divergence: 가격은 상승하지만 모멘텀은 하락 for i in range(1, len(price_highs)): if (price_highs[i]["value"] > price_highs[i-1]["value"] and i < len(momentum_highs) and i-1 < len(momentum_highs) and momentum_highs[i]["value"] < momentum_highs[i-1]["value"]): divergences.append({ "type": "bearish_divergence", "timeframe": tf, "index": price_highs[i]["index"], "strength": abs(momentum_highs[i]["value"] - momentum_highs[i-1]["value"]) / 100 }) # Bullish divergence: 가격은 하락하지만 모멘텀은 상승 for i in range(1, len(price_lows)): if (price_lows[i]["value"] < price_lows[i-1]["value"] and i < len(momentum_lows) and i-1 < len(momentum_lows) and momentum_lows[i]["value"] > momentum_lows[i-1]["value"]): divergences.append({ "type": "bullish_divergence", "timeframe": tf, "index": price_lows[i]["index"], "strength": abs(momentum_lows[i]["value"] - momentum_lows[i-1]["value"]) / 100 }) # 다이버전스 강도 계산 total_strength = sum(d["strength"] for d in divergences) if divergences else 0 return { "divergences": divergences, "momentum_values": momentum_values, "divergence_strength": min(total_strength, 1.0) } except Exception as e: raise AnalysisError(f"Momentum divergence analysis failed: {str(e)}") async def analyze_volume_profile(self, data: List[Dict[str, Any]], timeframes: List[str], profile_bins: int = 20) -> Dict[str, Any]: """볼륨 프로파일 분석""" try: all_prices = [] all_volumes = [] # 모든 타임프레임 데이터 수집 for tf in timeframes: if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) for candle in tf_data: if "close" in candle and "volume" in candle: all_prices.append(candle["close"]) all_volumes.append(candle["volume"]) if not all_prices: raise AnalysisError("No price data available") # 가격 범위 계산 price_min = min(all_prices) price_max = max(all_prices) price_range = price_max - price_min if price_range == 0: raise AnalysisError("Price range is zero") # 가격 구간별 볼륨 집계 bin_size = price_range / profile_bins volume_distribution = [0] * profile_bins price_levels = [] for i in range(profile_bins): price_level = price_min + i * bin_size price_levels.append(price_level) # 볼륨 분배 for price, volume in zip(all_prices, all_volumes): bin_index = min(int((price - price_min) / bin_size), profile_bins - 1) volume_distribution[bin_index] += volume # POC (Point of Control) 찾기 max_volume_index = volume_distribution.index(max(volume_distribution)) poc_price = price_levels[max_volume_index] poc_volume = volume_distribution[max_volume_index] # 가치 영역 계산 (70% 볼륨) total_volume = sum(volume_distribution) value_area_volume = total_volume * 0.7 # 중심에서 확장하며 가치 영역 찾기 accumulated_volume = poc_volume low_index = high_index = max_volume_index while accumulated_volume < value_area_volume: # 위아래 중 볼륨이 큰 쪽으로 확장 expand_up = high_index < profile_bins - 1 expand_down = low_index > 0 if expand_up and expand_down: if volume_distribution[high_index + 1] > volume_distribution[low_index - 1]: high_index += 1 accumulated_volume += volume_distribution[high_index] else: low_index -= 1 accumulated_volume += volume_distribution[low_index] elif expand_up: high_index += 1 accumulated_volume += volume_distribution[high_index] elif expand_down: low_index -= 1 accumulated_volume += volume_distribution[low_index] else: break value_area_low = price_levels[low_index] value_area_high = price_levels[high_index] return { "price_levels": price_levels, "volume_distribution": volume_distribution, "poc": { "price": poc_price, "volume": poc_volume }, "value_area": { "high": value_area_high, "low": value_area_low, "volume_percentage": accumulated_volume / total_volume if total_volume > 0 else 0 } } except Exception as e: raise AnalysisError(f"Volume profile analysis failed: {str(e)}") async def find_support_resistance_levels(self, data: List[Dict[str, Any]], timeframes: List[str], min_touches: int = 2) -> Dict[str, Any]: """지지/저항 레벨 찾기""" try: all_levels = [] timeframe_levels = {} for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 지지/저항 레벨 찾기 tf_levels = await self._find_sr_levels(tf_data, min_touches) timeframe_levels[tf] = tf_levels # 전체 레벨에 추가 for level in tf_levels: # 비슷한 레벨이 이미 있는지 확인 merged = False for existing in all_levels: if abs(existing["price"] - level["price"]) / level["price"] < 0.01: # 1% 이내 # 기존 레벨과 병합 existing["strength"] = max(existing["strength"], level["strength"]) existing["timeframes"].append(tf) merged = True break if not merged: all_levels.append({ "price": level["price"], "type": level["type"], "strength": level["strength"], "timeframes": [tf] }) # 강도순으로 정렬 all_levels.sort(key=lambda x: x["strength"], reverse=True) # 컨플루언스 존 찾기 confluence_zones = [] for level in all_levels: if len(level["timeframes"]) >= 2: confluence_zones.append({ "price": level["price"], "timeframe_count": len(level["timeframes"]), "strength": level["strength"] * len(level["timeframes"]) }) return { "levels": all_levels, "timeframe_levels": timeframe_levels, "confluence_zones": confluence_zones } except Exception as e: raise AnalysisError(f"Support/resistance level detection failed: {str(e)}") async def calculate_correlation_matrix(self, multi_symbol_data: Dict[str, List[Dict[str, Any]]], timeframe: str, correlation_window: int) -> Dict[str, Any]: """상관관계 매트릭스 계산""" try: symbols = list(multi_symbol_data.keys()) n_symbols = len(symbols) # 상관관계 매트릭스 초기화 matrix = [[0.0 for _ in range(n_symbols)] for _ in range(n_symbols)] # 각 심볼 쌍에 대해 상관관계 계산 for i in range(n_symbols): for j in range(n_symbols): if i == j: matrix[i][j] = 1.0 else: # 두 심볼의 수익률 계산 returns_i = self._calculate_returns(multi_symbol_data[symbols[i]]) returns_j = self._calculate_returns(multi_symbol_data[symbols[j]]) # 상관관계 계산 if len(returns_i) >= correlation_window and len(returns_j) >= correlation_window: corr = self._calculate_correlation( returns_i[-correlation_window:], returns_j[-correlation_window:] ) matrix[i][j] = corr else: matrix[i][j] = 0.0 # 유의미한 상관관계 찾기 significant_correlations = [] for i in range(n_symbols): for j in range(i+1, n_symbols): corr = matrix[i][j] if abs(corr) >= self.correlation_thresholds.get("moderate", 0.6): significant_correlations.append({ "symbol1": symbols[i], "symbol2": symbols[j], "correlation": corr, "strength": "strong" if abs(corr) >= self.correlation_thresholds.get("strong", 0.8) else "moderate" }) return { "matrix": matrix, "symbols": symbols, "significant_correlations": significant_correlations } except Exception as e: raise AnalysisError(f"Correlation matrix calculation failed: {str(e)}") async def rank_timeframe_strength(self, data: List[Dict[str, Any]], timeframes: List[str], criteria: List[str]) -> Dict[str, Any]: """타임프레임 강도 순위""" try: scores = {} rankings = [] for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 각 기준별 점수 계산 breakdown = {} total_score = 0 if "trend" in criteria: trend_score = await self._calculate_trend_strength(tf_data) breakdown["trend"] = trend_score total_score += trend_score if "momentum" in criteria: momentum_score = await self._calculate_momentum_strength(tf_data) breakdown["momentum"] = momentum_score total_score += momentum_score if "volume" in criteria: volume_score = await self._calculate_volume_strength(tf_data) breakdown["volume"] = volume_score total_score += volume_score # 평균 점수 avg_score = total_score / len(criteria) scores[tf] = avg_score rankings.append({ "timeframe": tf, "score": avg_score, "breakdown": breakdown }) # 점수순으로 정렬 rankings.sort(key=lambda x: x["score"], reverse=True) return { "rankings": rankings, "scores": scores, "strongest_timeframe": rankings[0]["timeframe"] if rankings else None } except Exception as e: raise AnalysisError(f"Timeframe strength ranking failed: {str(e)}") async def identify_market_regime(self, data: List[Dict[str, Any]], timeframes: List[str]) -> Dict[str, Any]: """시장 체제 식별""" try: regime_votes = {"trending": 0, "ranging": 0, "volatile": 0, "quiet": 0} timeframe_regimes = {} characteristics = {} for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 체제 특성 분석 volatility = await self._calculate_volatility(tf_data) trend_strength = await self._calculate_trend_strength(tf_data) range_ratio = await self._calculate_range_ratio(tf_data) # 체제 결정 if trend_strength > 0.7: regime = "trending" elif range_ratio > 0.8: regime = "ranging" elif volatility > 0.02: # 2% 이상 regime = "volatile" else: regime = "quiet" regime_votes[regime] += 1 timeframe_regimes[tf] = regime # 전체 체제 결정 overall_regime = max(regime_votes, key=regime_votes.get) confidence = regime_votes[overall_regime] / len(timeframes) # 특성 계산 all_volatilities = [] all_trend_strengths = [] for tf in timeframes: if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) all_volatilities.append(await self._calculate_volatility(tf_data)) all_trend_strengths.append(await self._calculate_trend_strength(tf_data)) characteristics = { "volatility": sum(all_volatilities) / len(all_volatilities), "trend_strength": sum(all_trend_strengths) / len(all_trend_strengths), "volume_profile": "normal" # 단순화 } return { "regime": overall_regime, "confidence": confidence, "characteristics": characteristics, "timeframe_regimes": timeframe_regimes } except Exception as e: raise AnalysisError(f"Market regime identification failed: {str(e)}") async def detect_divergence_confluence(self, data: List[Dict[str, Any]], timeframes: List[str], indicators: List[str]) -> Dict[str, Any]: """다이버전스 컨플루언스 감지""" try: all_divergences = [] # 각 타임프레임과 지표에 대해 다이버전스 분석 for tf in timeframes: for indicator in indicators: divergence_result = await self.analyze_momentum_divergence( data, [tf], indicator ) for div in divergence_result["divergences"]: div["indicator"] = indicator all_divergences.append(div) # 컨플루언스 존 찾기 confluence_zones = [] processed_indices = set() for i, div1 in enumerate(all_divergences): if i in processed_indices: continue # 근처의 다이버전스 찾기 zone_divergences = [div1] zone_indices = {i} for j, div2 in enumerate(all_divergences[i+1:], i+1): if abs(div1["index"] - div2["index"]) <= 5: # 5 캔들 이내 zone_divergences.append(div2) zone_indices.add(j) if len(zone_divergences) >= 2: # 컨플루언스 존 생성 processed_indices.update(zone_indices) timeframes_involved = list(set(d["timeframe"] for d in zone_divergences)) indicators_involved = list(set(d["indicator"] for d in zone_divergences)) confluence_zones.append({ "start_index": min(d["index"] for d in zone_divergences), "end_index": max(d["index"] for d in zone_divergences), "timeframes_involved": timeframes_involved, "indicators_involved": indicators_involved, "strength": sum(d["strength"] for d in zone_divergences) / len(zone_divergences) }) # 강도 분포 strength_distribution = { "strong": len([z for z in confluence_zones if z["strength"] > 0.7]), "moderate": len([z for z in confluence_zones if 0.3 <= z["strength"] <= 0.7]), "weak": len([z for z in confluence_zones if z["strength"] < 0.3]) } return { "confluence_zones": confluence_zones, "divergence_count": len(all_divergences), "strength_distribution": strength_distribution } except Exception as e: raise AnalysisError(f"Divergence confluence detection failed: {str(e)}") async def select_optimal_timeframe(self, data: List[Dict[str, Any]], trading_style: str, market_conditions: Dict[str, Any]) -> Dict[str, Any]: """최적 타임프레임 선택""" try: # 거래 스타일별 선호 타임프레임 style_preferences = { "scalping": ["1m", "5m"], "day_trading": ["5m", "15m", "1h"], "swing_trading": ["1h", "4h", "1d"] } preferred_timeframes = style_preferences.get(trading_style, ["15m", "1h"]) # 각 타임프레임 평가 scores = {} for tf in preferred_timeframes: score = 0.0 # 시장 조건에 따른 점수 조정 if market_conditions.get("volatility") == "high": # 높은 변동성에서는 짧은 타임프레임 선호 if tf in ["1m", "5m"]: score += 0.3 elif market_conditions.get("volatility") == "low": # 낮은 변동성에서는 긴 타임프레임 선호 if tf in ["1h", "4h", "1d"]: score += 0.3 if market_conditions.get("trend") == "strong": # 강한 트렌드에서는 중간 타임프레임 선호 if tf in ["15m", "1h"]: score += 0.4 # 타임프레임별 기본 점수 base_scores = { "1m": 0.5, "5m": 0.6, "15m": 0.7, "1h": 0.8, "4h": 0.7, "1d": 0.6 } score += base_scores.get(tf, 0.5) scores[tf] = min(score, 1.0) # 최적 타임프레임 선택 optimal_tf = max(scores, key=scores.get) # 대안 타임프레임 alternatives = [] for tf, score in scores.items(): if tf != optimal_tf: alternatives.append({"timeframe": tf, "score": score}) alternatives.sort(key=lambda x: x["score"], reverse=True) # 추천 이유 생성 reasoning = [] if trading_style == "scalping": reasoning.append("짧은 타임프레임이 빠른 진입/청산에 적합") elif trading_style == "swing_trading": reasoning.append("긴 타임프레임이 큰 추세 포착에 유리") if market_conditions.get("volatility") == "high": reasoning.append("높은 변동성으로 인해 짧은 타임프레임 권장") return { "recommended_timeframe": optimal_tf, "reasoning": reasoning, "alternative_timeframes": alternatives, "confidence_score": scores[optimal_tf] } except Exception as e: raise AnalysisError(f"Optimal timeframe selection failed: {str(e)}") async def analyze_fractals(self, data: List[Dict[str, Any]], timeframes: List[str], fractal_period: int = 5) -> Dict[str, Any]: """프랙탈 분석""" try: fractals = {} for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 프랙탈 찾기 tf_fractals = [] for i in range(fractal_period//2, len(tf_data) - fractal_period//2): # Up fractal is_up_fractal = True center_high = tf_data[i].get("high", tf_data[i].get("price", 0)) for j in range(i - fractal_period//2, i + fractal_period//2 + 1): if j != i: compare_high = tf_data[j].get("high", tf_data[j].get("price", 0)) if compare_high >= center_high: is_up_fractal = False break if is_up_fractal: tf_fractals.append({ "type": "up", "index": i, "price": center_high, "strength": 0.8 }) # Down fractal is_down_fractal = True center_low = tf_data[i].get("low", tf_data[i].get("price", 0)) for j in range(i - fractal_period//2, i + fractal_period//2 + 1): if j != i: compare_low = tf_data[j].get("low", tf_data[j].get("price", 0)) if compare_low <= center_low: is_down_fractal = False break if is_down_fractal: tf_fractals.append({ "type": "down", "index": i, "price": center_low, "strength": 0.8 }) fractals[tf] = tf_fractals # 프랙탈 차원 계산 (간단화) fractal_dimensions = {} for tf, tf_fractals in fractals.items(): if len(tf_fractals) > 1: # 프랙탈 간격의 변동성으로 차원 추정 intervals = [] for i in range(1, len(tf_fractals)): intervals.append(tf_fractals[i]["index"] - tf_fractals[i-1]["index"]) if intervals: avg_interval = sum(intervals) / len(intervals) std_interval = statistics.stdev(intervals) if len(intervals) > 1 else 0 dimension = 1 + (std_interval / avg_interval if avg_interval > 0 else 0) fractal_dimensions[tf] = min(dimension, 2.0) else: fractal_dimensions[tf] = 1.0 else: fractal_dimensions[tf] = 1.0 # 자기 유사성 점수 self_similarity_score = 0.7 # 단순화된 점수 return { "fractals": fractals, "fractal_dimensions": fractal_dimensions, "self_similarity_score": self_similarity_score } except Exception as e: raise AnalysisError(f"Fractal analysis failed: {str(e)}") async def analyze_timeframe_transitions(self, data: List[Dict[str, Any]], from_timeframe: str, to_timeframe: str) -> Dict[str, Any]: """타임프레임 전환 분석""" try: # 두 타임프레임 데이터 준비 if from_timeframe == "1m": from_data = data else: from_data = await self.aggregate_to_timeframe(data, from_timeframe) if to_timeframe == "1m": to_data = data else: to_data = await self.aggregate_to_timeframe(data, to_timeframe) # 전환점 찾기 transition_points = [] confirmed_count = 0 # 단순화된 신호 생성 from_signals = await self._generate_simple_signals(from_data) to_signals = await self._generate_simple_signals(to_data) # 타임프레임 비율 계산 tf_ratio = self.timeframe_minutes[to_timeframe] / self.timeframe_minutes[from_timeframe] for i, signal in enumerate(from_signals): if signal["type"] != "hold": # 대응하는 상위 타임프레임 인덱스 to_index = int(i / tf_ratio) if to_index < len(to_signals): # 신호 확인 confirmed = signal["type"] == to_signals[to_index]["type"] transition_points.append({ f"index_{from_timeframe}": i, f"index_{to_timeframe}": to_index, "signal_type": signal["type"], "confirmed": confirmed }) if confirmed: confirmed_count += 1 # 신호 품질 및 확인률 계산 total_signals = len(transition_points) confirmation_rate = confirmed_count / total_signals if total_signals > 0 else 0 signal_quality = confirmation_rate * 0.8 + 0.2 # 기본 품질 20% return { "transition_points": transition_points, "signal_quality": signal_quality, "confirmation_rate": confirmation_rate } except Exception as e: raise AnalysisError(f"Timeframe transition analysis failed: {str(e)}") async def analyze_volatility_profile(self, data: List[Dict[str, Any]], timeframes: List[str], volatility_window: int = 20) -> Dict[str, Any]: """변동성 프로파일 분석""" try: volatility_by_timeframe = {} for tf in timeframes: # 타임프레임별 데이터 if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) # 변동성 계산 volatilities = [] for i in range(volatility_window, len(tf_data)): window_data = tf_data[i-volatility_window:i] vol = await self._calculate_volatility(window_data) volatilities.append(vol) if volatilities: current_vol = volatilities[-1] avg_vol = sum(volatilities) / len(volatilities) # 백분위수 계산 sorted_vols = sorted(volatilities) percentile_index = next(i for i, v in enumerate(sorted_vols) if v >= current_vol) percentile = (percentile_index / len(sorted_vols)) * 100 volatility_by_timeframe[tf] = { "average": avg_vol, "current": current_vol, "percentile": percentile } # 변동성 비율 계산 volatility_ratio = {} tf_list = list(volatility_by_timeframe.keys()) for i in range(len(tf_list) - 1): tf1, tf2 = tf_list[i], tf_list[i+1] if tf1 in volatility_by_timeframe and tf2 in volatility_by_timeframe: ratio = volatility_by_timeframe[tf1]["current"] / volatility_by_timeframe[tf2]["current"] volatility_ratio[f"{tf1}/{tf2}"] = ratio # 확장/수축 판단 avg_current = sum(v["current"] for v in volatility_by_timeframe.values()) / len(volatility_by_timeframe) avg_average = sum(v["average"] for v in volatility_by_timeframe.values()) / len(volatility_by_timeframe) if avg_current > avg_average * 1.2: expansion_contraction = "expansion" elif avg_current < avg_average * 0.8: expansion_contraction = "contraction" else: expansion_contraction = "normal" return { "volatility_by_timeframe": volatility_by_timeframe, "volatility_ratio": volatility_ratio, "expansion_contraction": expansion_contraction } except Exception as e: raise AnalysisError(f"Volatility profile analysis failed: {str(e)}") async def optimize_entry_exit_points(self, data: List[Dict[str, Any]], primary_timeframe: str, confirmation_timeframes: List[str], strategy_type: str) -> Dict[str, Any]: """진입/청산 포인트 최적화""" try: # 주 타임프레임 신호 if primary_timeframe == "1m": primary_data = data else: primary_data = await self.aggregate_to_timeframe(data, primary_timeframe) primary_signals = await self._generate_strategy_signals(primary_data, strategy_type) # 확인 타임프레임 신호 confirmation_signals = {} for tf in confirmation_timeframes: if tf == "1m": tf_data = data else: tf_data = await self.aggregate_to_timeframe(data, tf) confirmation_signals[tf] = await self._generate_strategy_signals(tf_data, strategy_type) # 최적화된 진입/청산 포인트 entry_points = [] exit_points = [] for i, signal in enumerate(primary_signals): if signal["type"] == "buy": # 확인 신호 체크 confirmations = [] for tf, tf_signals in confirmation_signals.items(): # 대응하는 인덱스 찾기 tf_index = self._find_corresponding_index(i, primary_timeframe, tf, len(tf_signals)) if tf_index < len(tf_signals) and tf_signals[tf_index]["type"] == "buy": confirmations.append(tf) if confirmations: entry_points.append({ "index": i, "price": primary_data[i].get("close", primary_data[i].get("price", 0)), "confidence": len(confirmations) / len(confirmation_timeframes), "confirmations": confirmations }) elif signal["type"] == "sell" and entry_points: # 청산 포인트 exit_points.append({ "index": i, "price": primary_data[i].get("close", primary_data[i].get("price", 0)), "reason": "signal" }) # 리스크/리워드 비율 계산 risk_reward_ratios = [] for entry, exit in zip(entry_points[:len(exit_points)], exit_points): reward = exit["price"] - entry["price"] risk = entry["price"] * 0.02 # 2% 손절 if risk > 0: risk_reward_ratios.append(abs(reward / risk)) avg_rr = sum(risk_reward_ratios) / len(risk_reward_ratios) if risk_reward_ratios else 0 # 승률 추정 profitable_trades = sum(1 for rr in risk_reward_ratios if rr > 0) win_rate = profitable_trades / len(risk_reward_ratios) if risk_reward_ratios else 0 return { "entry_points": entry_points, "exit_points": exit_points, "risk_reward_ratio": avg_rr, "win_rate_estimate": win_rate } except Exception as e: raise AnalysisError(f"Entry/exit optimization failed: {str(e)}") async def synchronize_timeframe_data(self, multi_tf_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """타임프레임 데이터 동기화""" try: # 모든 타임스탬프 수집 all_timestamps = set() for tf_data in multi_tf_data.values(): for item in tf_data: all_timestamps.add(item["timestamp"]) # 타임스탬프 정렬 sorted_timestamps = sorted(all_timestamps) # 동기화된 데이터 aligned_data = {} for tf, tf_data in multi_tf_data.items(): # 타임스탬프를 키로 하는 딕셔너리 생성 tf_dict = {item["timestamp"]: item for item in tf_data} # 정렬된 타임스탬프에 맞춰 데이터 정렬 aligned_tf_data = [] for ts in sorted_timestamps: if ts in tf_dict: aligned_tf_data.append(tf_dict[ts]) else: # 누락된 데이터는 이전 값으로 채우기 if aligned_tf_data: filled_data = aligned_tf_data[-1].copy() filled_data["timestamp"] = ts aligned_tf_data.append(filled_data) aligned_data[tf] = aligned_tf_data # 정렬 품질 평가 (더 현실적인 계산) # 각 타임프레임별 예상 데이터 포인트 수 계산 if sorted_timestamps: start_time = datetime.fromisoformat(sorted_timestamps[0]) end_time = datetime.fromisoformat(sorted_timestamps[-1]) total_minutes = (end_time - start_time).total_seconds() / 60 expected_quality = 0 for tf in multi_tf_data.keys(): tf_minutes = self.timeframe_minutes.get(tf, 1) expected_points = max(1, int(total_minutes / tf_minutes)) actual_points = len(multi_tf_data[tf]) tf_quality = min(1.0, actual_points / expected_points) if expected_points > 0 else 1.0 expected_quality += tf_quality alignment_quality = expected_quality / len(multi_tf_data) if multi_tf_data else 0 else: alignment_quality = 0 return { "synchronized_timestamps": sorted_timestamps, "aligned_data": aligned_data, "alignment_quality": alignment_quality } except Exception as e: raise AnalysisError(f"Data synchronization failed: {str(e)}") def get_performance_metrics(self) -> Dict[str, Any]: """성능 메트릭 조회""" cache_total = self.performance_metrics["cache_hits"] + self.performance_metrics["cache_misses"] cache_hit_rate = self.performance_metrics["cache_hits"] / cache_total if cache_total > 0 else 0 avg_processing_time = (self.performance_metrics["total_processing_time"] / self.performance_metrics["analysis_count"] if self.performance_metrics["analysis_count"] > 0 else 0) return { "analysis_count": self.performance_metrics["analysis_count"], "average_processing_time": avg_processing_time, "cache_hit_rate": cache_hit_rate } # === 내부 헬퍼 메서드들 === def _align_timestamp(self, timestamp: datetime, minutes: int) -> datetime: """타임스탬프를 타임프레임에 맞춰 정렬""" # 분 단위로 내림 aligned_minute = (timestamp.minute // minutes) * minutes return timestamp.replace(minute=aligned_minute, second=0, microsecond=0) def _aggregate_candle(self, candles: List[Dict[str, Any]]) -> Dict[str, Any]: """캔들 데이터 집계""" if not candles: return {} # OHLCV 집계 opens = [c.get("open", c.get("price", 0)) for c in candles] highs = [c.get("high", c.get("price", 0)) for c in candles] lows = [c.get("low", c.get("price", 0)) for c in candles] closes = [c.get("close", c.get("price", 0)) for c in candles] volumes = [c.get("volume", 0) for c in candles] return { "open": opens[0] if opens else 0, "high": max(highs) if highs else 0, "low": min(lows) if lows else 0, "close": closes[-1] if closes else 0, "volume": sum(volumes) } async def _analyze_trend_alignment(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """트렌드 정렬 분석""" trends = {} for tf, data in timeframe_data.items(): if len(data) >= 20: trend = await self._calculate_trend(data, 20) trends[tf] = trend # 정렬 점수 계산 if trends: directions = [t["direction"] for t in trends.values()] same_direction = sum(1 for d in directions if d == directions[0]) alignment_score = same_direction / len(directions) else: alignment_score = 0 return { "alignment_score": alignment_score, "timeframe_trends": trends } async def _analyze_momentum(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """모멘텀 분석""" momentum_data = {} for tf, data in timeframe_data.items(): if len(data) >= 14: momentum = await self._calculate_momentum(data, 14) momentum_data[tf] = { "current": momentum[-1]["value"] if momentum else 0, "average": sum(m["value"] for m in momentum) / len(momentum) if momentum else 0 } return momentum_data async def _analyze_volume_profile(self, timeframe_data: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]: """볼륨 프로파일 분석""" total_volume = 0 volume_weighted_price = 0 for tf, data in timeframe_data.items(): for candle in data: volume = candle.get("volume", 0) price = candle.get("close", candle.get("price", 0)) total_volume += volume volume_weighted_price += price * volume vwap = volume_weighted_price / total_volume if total_volume > 0 else 0 return { "total_volume": total_volume, "vwap": vwap } async def _calculate_trend(self, data: List[Dict[str, Any]], lookback: int) -> Dict[str, Any]: """트렌드 계산""" if len(data) < lookback: return {"direction": "neutral", "strength": 0} # 단순 이동평균 기반 트렌드 recent_data = data[-lookback:] prices = [d.get("close", d.get("price", 0)) for d in recent_data] # 선형 회귀 n = len(prices) if n < 2: return {"direction": "neutral", "strength": 0} x_sum = sum(range(n)) y_sum = sum(prices) xy_sum = sum(i * prices[i] for i in range(n)) x2_sum = sum(i * i for i in range(n)) denominator = n * x2_sum - x_sum * x_sum if denominator == 0: return {"direction": "neutral", "strength": 0} slope = (n * xy_sum - x_sum * y_sum) / denominator # 방향 결정 if slope > 0.001: direction = "up" elif slope < -0.001: direction = "down" else: direction = "neutral" # 강도 계산 (R-squared) y_mean = y_sum / n ss_tot = sum((y - y_mean) ** 2 for y in prices) if ss_tot > 0: y_pred = [slope * i + (y_mean - slope * x_sum / n) for i in range(n)] ss_res = sum((prices[i] - y_pred[i]) ** 2 for i in range(n)) r_squared = 1 - (ss_res / ss_tot) strength = max(0, min(1, r_squared)) else: strength = 0 return {"direction": direction, "strength": strength} async def _calculate_rsi(self, data: List[Dict[str, Any]], period: int = 14) -> List[Dict[str, Any]]: """RSI 계산""" if len(data) < period + 1: return [] prices = [d.get("close", d.get("price", 0)) for d in data] # 가격 변화 계산 changes = [prices[i] - prices[i-1] for i in range(1, len(prices))] # 상승/하락 분리 gains = [c if c > 0 else 0 for c in changes] losses = [-c if c < 0 else 0 for c in changes] # 초기 평균 avg_gain = sum(gains[:period]) / period avg_loss = sum(losses[:period]) / period rsi_values = [] for i in range(period, len(changes)): # 평활 이동평균 avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi_values.append({ "timestamp": data[i+1]["timestamp"], "value": rsi }) return rsi_values async def _calculate_momentum(self, data: List[Dict[str, Any]], period: int = 14) -> List[Dict[str, Any]]: """모멘텀 계산""" if len(data) < period: return [] momentum_values = [] for i in range(period, len(data)): current_price = data[i].get("close", data[i].get("price", 0)) past_price = data[i-period].get("close", data[i-period].get("price", 0)) if past_price > 0: momentum = ((current_price - past_price) / past_price) * 100 else: momentum = 0 momentum_values.append({ "timestamp": data[i]["timestamp"], "value": momentum }) return momentum_values def _find_peaks(self, data: List[Dict[str, Any]], value_key: str, is_valley: bool = False) -> List[Dict[str, Any]]: """피크/밸리 찾기""" peaks = [] for i in range(1, len(data) - 1): current = data[i].get(value_key, 0) prev = data[i-1].get(value_key, 0) next_val = data[i+1].get(value_key, 0) if is_valley: # 밸리 (저점) if current < prev and current < next_val: peaks.append({ "index": i, "value": current, "timestamp": data[i].get("timestamp") }) else: # 피크 (고점) if current > prev and current > next_val: peaks.append({ "index": i, "value": current, "timestamp": data[i].get("timestamp") }) return peaks async def _find_sr_levels(self, data: List[Dict[str, Any]], min_touches: int) -> List[Dict[str, Any]]: """지지/저항 레벨 찾기""" levels = [] # 가격 범위 prices = [d.get("close", d.get("price", 0)) for d in data] if not prices: return levels price_min = min(prices) price_max = max(prices) price_range = price_max - price_min if price_range == 0: return levels # 가격 구간 생성 (1% 단위) step = price_range * 0.01 price_levels = [] current_level = price_min while current_level <= price_max: price_levels.append(current_level) current_level += step # 각 레벨에서 터치 횟수 계산 for level in price_levels: touches = 0 level_type = None for i, candle in enumerate(data): high = candle.get("high", candle.get("price", 0)) low = candle.get("low", candle.get("price", 0)) # 레벨 터치 확인 (1% 오차 허용) if abs(high - level) / level < 0.01: touches += 1 if not level_type: level_type = "resistance" elif abs(low - level) / level < 0.01: touches += 1 if not level_type: level_type = "support" if touches >= min_touches and level_type: levels.append({ "price": level, "type": level_type, "touches": touches, "strength": min(touches / 10, 1.0) }) return levels def _calculate_returns(self, data: List[Dict[str, Any]]) -> List[float]: """수익률 계산""" returns = [] for i in range(1, len(data)): current = data[i].get("close", data[i].get("price", 0)) previous = data[i-1].get("close", data[i-1].get("price", 0)) if previous > 0: ret = (current - previous) / previous returns.append(ret) return returns def _calculate_correlation(self, returns1: List[float], returns2: List[float]) -> float: """상관관계 계산""" if len(returns1) != len(returns2) or len(returns1) < 2: return 0.0 # 평균 mean1 = sum(returns1) / len(returns1) mean2 = sum(returns2) / len(returns2) # 공분산 covariance = sum((r1 - mean1) * (r2 - mean2) for r1, r2 in zip(returns1, returns2)) / len(returns1) # 표준편차 std1 = (sum((r - mean1) ** 2 for r in returns1) / len(returns1)) ** 0.5 std2 = (sum((r - mean2) ** 2 for r in returns2) / len(returns2)) ** 0.5 if std1 == 0 or std2 == 0: return 0.0 # 상관계수 correlation = covariance / (std1 * std2) return max(-1, min(1, correlation)) async def _calculate_trend_strength(self, data: List[Dict[str, Any]]) -> float: """트렌드 강도 계산""" if len(data) < 20: return 0.0 trend = await self._calculate_trend(data, min(20, len(data))) return trend["strength"] async def _calculate_momentum_strength(self, data: List[Dict[str, Any]]) -> float: """모멘텀 강도 계산""" if len(data) < 14: return 0.0 momentum = await self._calculate_momentum(data, 14) if not momentum: return 0.0 # 모멘텀의 절대값 평균 avg_momentum = sum(abs(m["value"]) for m in momentum) / len(momentum) # 0-1 범위로 정규화 return min(avg_momentum / 10, 1.0) async def _calculate_volume_strength(self, data: List[Dict[str, Any]]) -> float: """볼륨 강도 계산""" if len(data) < 20: return 0.0 volumes = [d.get("volume", 0) for d in data] if not volumes: return 0.0 # 최근 볼륨과 평균 볼륨 비교 recent_volume = sum(volumes[-5:]) / 5 avg_volume = sum(volumes) / len(volumes) if avg_volume == 0: return 0.0 volume_ratio = recent_volume / avg_volume # 0-1 범위로 정규화 return min(volume_ratio, 2.0) / 2.0 async def _calculate_volatility(self, data: List[Dict[str, Any]]) -> float: """변동성 계산""" if len(data) < 2: return 0.0 returns = self._calculate_returns(data) if not returns: return 0.0 # 수익률의 표준편차 mean_return = sum(returns) / len(returns) variance = sum((r - mean_return) ** 2 for r in returns) / len(returns) volatility = variance ** 0.5 return volatility async def _calculate_range_ratio(self, data: List[Dict[str, Any]]) -> float: """레인지 비율 계산""" if len(data) < 20: return 0.0 prices = [d.get("close", d.get("price", 0)) for d in data[-20:]] if not prices: return 0.0 price_range = max(prices) - min(prices) avg_price = sum(prices) / len(prices) if avg_price == 0: return 0.0 # 레인지가 작을수록 높은 비율 range_ratio = 1 - (price_range / avg_price) return max(0, min(1, range_ratio)) async def _generate_simple_signals(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """간단한 거래 신호 생성""" signals = [] if len(data) < 20: return signals # 이동평균 교차 전략 for i in range(20, len(data)): short_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-5:i]) / 5 long_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-20:i]) / 20 prev_short_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-6:i-1]) / 5 prev_long_ma = sum(d.get("close", d.get("price", 0)) for d in data[i-21:i-1]) / 20 if prev_short_ma <= prev_long_ma and short_ma > long_ma: signal_type = "buy" elif prev_short_ma >= prev_long_ma and short_ma < long_ma: signal_type = "sell" else: signal_type = "hold" signals.append({ "index": i, "type": signal_type, "timestamp": data[i]["timestamp"] }) return signals async def _generate_strategy_signals(self, data: List[Dict[str, Any]], strategy_type: str) -> List[Dict[str, Any]]: """전략별 신호 생성""" if strategy_type == "momentum": # 모멘텀 전략 momentum = await self._calculate_momentum(data, 14) signals = [] for i, mom in enumerate(momentum): if mom["value"] > 2: # 2% 이상 상승 signal_type = "buy" elif mom["value"] < -2: # 2% 이상 하락 signal_type = "sell" else: signal_type = "hold" signals.append({ "index": i + 14, # 모멘텀 계산에 사용된 기간 보정 "type": signal_type, "timestamp": mom["timestamp"] }) return signals else: # 기본 이동평균 전략 return await self._generate_simple_signals(data) def _find_corresponding_index(self, primary_index: int, primary_tf: str, target_tf: str, target_length: int) -> int: """대응하는 타임프레임 인덱스 찾기""" if primary_tf not in self.timeframe_minutes or target_tf not in self.timeframe_minutes: return 0 ratio = self.timeframe_minutes[target_tf] / self.timeframe_minutes[primary_tf] target_index = int(primary_index / ratio) return min(target_index, target_length - 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server