Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
regime_tools.py47.4 kB
"""시장 국면 판단 도구""" import json import logging import math import random import statistics from datetime import datetime, timedelta from typing import Any, Dict, List, Tuple from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class MarketRegimeTool(BaseTool): """시장 국면 판단 도구""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 600 # 10분 (국면 분석은 더 긴 캐시) @property def name(self) -> str: return "get_market_regime" @property def description(self) -> str: return "시장 국면을 분석하고 판단합니다. HMM 모델, 통계적 분석, 기술적 지표를 통한 국면 분류와 예측을 지원합니다." def get_tool_definition(self) -> ToolSchema: """도구 정의 반환""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "market": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"], "default": "KOSPI", "description": "분석할 시장" }, "analysis_methods": { "type": "array", "items": { "type": "string", "enum": [ "hmm", "statistical", "technical" ] }, "minItems": 1, "default": ["statistical"], "description": "국면 분석 방법 목록" }, "lookback_period": { "type": "string", "enum": ["30d", "60d", "90d", "120d", "180d"], "default": "90d", "description": "분석 기간" }, "n_regimes": { "type": "integer", "default": 3, "minimum": 2, "maximum": 5, "description": "HMM 모델의 국면 수" }, "regime_types": { "type": "array", "items": { "type": "string", "enum": [ "bull", "bear", "sideways", "low_vol", "high_vol", "crisis" ] }, "default": ["bull", "bear", "sideways"], "description": "분석할 국면 유형" }, "volatility_threshold": { "type": "number", "default": 0.02, "minimum": 0.005, "maximum": 0.1, "description": "변동성 체제 분류 임계값" }, "include_predictions": { "type": "boolean", "default": False, "description": "국면 변화 예측 포함 여부" }, "prediction_horizon": { "type": "integer", "default": 5, "minimum": 1, "maximum": 30, "description": "예측 기간 (일)" }, "include_strategy_mapping": { "type": "boolean", "default": False, "description": "투자 전략 매핑 포함 여부" }, "include_backtesting": { "type": "boolean", "default": False, "description": "백테스팅 결과 포함 여부" }, "backtest_period": { "type": "string", "enum": ["30d", "60d", "90d"], "default": "60d", "description": "백테스팅 기간" } }, "required": ["analysis_methods"] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """시장 국면 분석 실행""" try: # 파라미터 추출 및 검증 market = arguments.get("market", "KOSPI") analysis_methods = arguments.get("analysis_methods", ["statistical"]) lookback_period = arguments.get("lookback_period", "90d") n_regimes = arguments.get("n_regimes", 3) regime_types = arguments.get("regime_types", ["bull", "bear", "sideways"]) volatility_threshold = arguments.get("volatility_threshold", 0.02) include_predictions = arguments.get("include_predictions", False) prediction_horizon = arguments.get("prediction_horizon", 5) include_strategy_mapping = arguments.get("include_strategy_mapping", False) include_backtesting = arguments.get("include_backtesting", False) backtest_period = arguments.get("backtest_period", "60d") self._validate_parameters(market, analysis_methods, lookback_period, n_regimes) # 캐시 확인 cache_key = self._generate_cache_key( market, analysis_methods, lookback_period, n_regimes, volatility_threshold ) cached_data = await self.cache_manager.get(cache_key) if cached_data and self._is_data_fresh(cached_data): self.logger.info(f"Cache hit for {cache_key}") return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))] # 데이터베이스에서 데이터 조회 및 분석 data = await self._fetch_regime_analysis_data( market, analysis_methods, lookback_period, n_regimes, regime_types, volatility_threshold, include_predictions, prediction_horizon, include_strategy_mapping, include_backtesting, backtest_period ) # 캐시 저장 await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl) self.logger.info(f"Regime analysis completed for {market}") return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in market regime tool: {e}") raise def _validate_parameters(self, market: str, methods: List[str], period: str, n_regimes: int): """파라미터 검증""" valid_markets = ["KOSPI", "KOSDAQ", "ALL"] if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if not methods or len(methods) == 0: raise ValueError("At least one analysis method must be specified") valid_methods = ["hmm", "statistical", "technical"] for method in methods: if method not in valid_methods: raise ValueError(f"Invalid analysis method: {method}") valid_periods = ["30d", "60d", "90d", "120d", "180d"] if period not in valid_periods: raise ValueError(f"Invalid lookback period: {period}") if n_regimes < 2 or n_regimes > 5: raise ValueError("Invalid number of regimes: must be between 2 and 5") def _generate_cache_key(self, market: str, methods: List[str], period: str, n_regimes: int, vol_threshold: float) -> str: """캐시 키 생성""" methods_str = "_".join(sorted(methods)) return f"regime:{market}:{methods_str}:{period}:{n_regimes}:{vol_threshold}" def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """데이터 신선도 확인 (국면 분석은 더 긴 유효 기간)""" if "timestamp" not in data: return False try: timestamp = datetime.fromisoformat(data["timestamp"]) return datetime.now() - timestamp < timedelta(minutes=10) except (ValueError, TypeError): return False async def _fetch_regime_analysis_data(self, market: str, methods: List[str], period: str, n_regimes: int, regime_types: List[str], vol_threshold: float, include_predictions: bool, prediction_horizon: int, include_strategy: bool, include_backtest: bool, backtest_period: str) -> Dict[str, Any]: """데이터베이스에서 국면 분석 데이터 조회 및 분석""" try: days = self._get_period_days(period) # 쿼리 구성 query = """ SELECT date, close_price, daily_return, volume, volatility, rsi, macd, bollinger_position, vix, put_call_ratio, advance_decline_ratio, market_cap FROM market_regime_data WHERE date >= CURRENT_DATE - INTERVAL '%s days' """ params = [days] if market != "ALL": query += " AND market = %s" params.append(market) query += " ORDER BY date DESC" # 데이터 조회 market_data = await self.db_manager.fetch_all(query, *params) # 데이터 부족 체크 if len(market_data) < 30: # 최소 30일 데이터 필요 return { "timestamp": datetime.now().isoformat(), "market": market, "warning": "Insufficient data for regime analysis (minimum 30 days required)", "data_points": len(market_data) } # 기본 결과 구성 result = { "timestamp": datetime.now().isoformat(), "market": market, "lookback_period": period, "regime_analysis_results": {}, "regime_consensus": {} } # 각 분석 방법 실행 analysis_results = {} for method in methods: try: if method == "hmm": hmm_result = self._perform_hmm_analysis(market_data, n_regimes) result["regime_analysis_results"]["hmm"] = hmm_result analysis_results["hmm"] = hmm_result elif method == "statistical": stat_result = self._perform_statistical_analysis(market_data, vol_threshold) result["regime_analysis_results"]["statistical"] = stat_result analysis_results["statistical"] = stat_result elif method == "technical": tech_result = self._perform_technical_analysis(market_data) result["regime_analysis_results"]["technical"] = tech_result analysis_results["technical"] = tech_result except Exception as e: self.logger.warning(f"Failed to perform {method} analysis: {e}") result["regime_analysis_results"][method] = { "error": f"Analysis failed: {str(e)}" } # 컨센서스 계산 result["regime_consensus"] = self._calculate_regime_consensus(analysis_results) # 예측 if include_predictions: result["predictions"] = self._generate_regime_predictions( analysis_results, prediction_horizon ) # 투자 전략 매핑 if include_strategy: result["strategy_mapping"] = self._map_investment_strategies( result["regime_consensus"] ) # 백테스팅 if include_backtest: result["backtesting_results"] = self._perform_backtesting( market_data, analysis_results, backtest_period ) return result except Exception as e: self.logger.error(f"Database query failed: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Failed to fetch regime analysis data: {e}") def _get_period_days(self, period: str) -> int: """기간을 일수로 변환""" period_map = { "30d": 30, "60d": 60, "90d": 90, "120d": 120, "180d": 180 } return period_map.get(period, 90) def _perform_hmm_analysis(self, market_data: List[Dict[str, Any]], n_regimes: int) -> Dict[str, Any]: """HMM 기반 국면 분석""" # 수익률 데이터 추출 returns = [item["daily_return"] for item in market_data if item.get("daily_return") is not None] if len(returns) < 30: return {"error": "Insufficient data for HMM analysis"} # HMM 모델 훈련 hmm_model = self._train_hmm_model(returns, n_regimes) # 국면 특성화 regime_characteristics = self._characterize_regimes_from_hmm(hmm_model, returns) # 국면 전환 분석 transitions = self._detect_regime_transitions(hmm_model["states"]) return { "current_regime": hmm_model["states"][-1] if hmm_model["states"] else 0, "regime_probabilities": self._calculate_regime_probabilities(hmm_model), "regime_history": hmm_model["states"][-30:], # 최근 30일 "transition_matrix": hmm_model["transition_matrix"], "regime_characteristics": regime_characteristics, "regime_transitions": transitions, "model_likelihood": hmm_model.get("likelihood", 0) } def _train_hmm_model(self, returns: List[float], n_regimes: int) -> Dict[str, Any]: """간소화된 HMM 모델 훈련 (실제로는 hmmlearn 사용 권장)""" # K-means 클러스터링으로 초기 국면 할당 if len(returns) < n_regimes: n_regimes = len(returns) # 수익률을 기준으로 간단한 클러스터링 sorted_indices = sorted(range(len(returns)), key=lambda i: returns[i]) cluster_size = len(returns) // n_regimes states = [0] * len(returns) for i, idx in enumerate(sorted_indices): regime = min(i // cluster_size, n_regimes - 1) states[idx] = regime # 전환 확률 계산 transition_matrix = [[0.0] * n_regimes for _ in range(n_regimes)] transition_counts = [[0] * n_regimes for _ in range(n_regimes)] for i in range(1, len(states)): from_state = states[i-1] to_state = states[i] transition_counts[from_state][to_state] += 1 # 확률로 정규화 for i in range(n_regimes): total = sum(transition_counts[i]) if total > 0: transition_matrix[i] = [count / total for count in transition_counts[i]] else: transition_matrix[i] = [1.0 / n_regimes] * n_regimes # 방출 파라미터 계산 emission_params = {} for regime in range(n_regimes): regime_returns = [returns[i] for i in range(len(returns)) if states[i] == regime] if regime_returns: emission_params[regime] = { "mean": statistics.mean(regime_returns), "std": statistics.stdev(regime_returns) if len(regime_returns) > 1 else 0.001 } else: emission_params[regime] = {"mean": 0.0, "std": 0.001} return { "states": states, "transition_matrix": transition_matrix, "emission_params": emission_params, "n_regimes": n_regimes } def _characterize_regimes_from_hmm(self, hmm_model: Dict[str, Any], returns: List[float]) -> Dict[str, Any]: """HMM 국면별 특성화""" states = hmm_model["states"] n_regimes = hmm_model["n_regimes"] characteristics = {} regime_labels = ["bear", "sideways", "bull", "volatile", "crisis"] for regime in range(n_regimes): regime_returns = [returns[i] for i in range(len(returns)) if states[i] == regime] if regime_returns: mean_return = statistics.mean(regime_returns) volatility = statistics.stdev(regime_returns) if len(regime_returns) > 1 else 0 # 국면 라벨 결정 if mean_return > 0.002: label = "bull" elif mean_return < -0.001: label = "bear" elif volatility > 0.025: label = "volatile" else: label = "sideways" characteristics[regime] = { "regime_label": label, "mean_return": round(mean_return, 6), "volatility": round(volatility, 6), "duration_days": len([1 for s in states if s == regime]), "frequency": round(len(regime_returns) / len(returns), 3) } return characteristics def _perform_statistical_analysis(self, market_data: List[Dict[str, Any]], vol_threshold: float) -> Dict[str, Any]: """통계적 국면 분석""" # 데이터 추출 returns = [item["daily_return"] for item in market_data if item.get("daily_return") is not None] volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None] volumes = [item["volume"] for item in market_data if item.get("volume") is not None] result = {} # 변동성 체제 분석 if volatilities: vol_regime = self._detect_volatility_regime(volatilities, vol_threshold) result["volatility_regime"] = vol_regime # 수익률 체제 분석 if returns: return_regime = self._classify_return_regime(returns) result["return_regime"] = return_regime # 거래량 체제 분석 if volumes: volume_regime = self._analyze_volume_regime(volumes) result["volume_regime"] = volume_regime # 종합 국면 분류 result["regime_classification"] = self._synthesize_statistical_regime(result) return result def _detect_volatility_regime(self, volatilities: List[float], threshold: float) -> Dict[str, Any]: """변동성 체제 탐지""" if not volatilities: return {"current_regime": "unknown"} current_vol = volatilities[-1] recent_avg_vol = statistics.mean(volatilities[-10:]) if len(volatilities) >= 10 else current_vol # 체제 분류 if recent_avg_vol > threshold * 1.5: current_regime = "high" elif recent_avg_vol < threshold * 0.5: current_regime = "low" else: current_regime = "medium" # 변동성 클러스터링 탐지 clustering = self._detect_volatility_clustering(volatilities) # 체제 변화 기록 regime_history = [] window_size = 10 for i in range(window_size, len(volatilities)): window_vol = statistics.mean(volatilities[i-window_size:i]) if window_vol > threshold * 1.5: regime_history.append("high") elif window_vol < threshold * 0.5: regime_history.append("low") else: regime_history.append("medium") return { "current_regime": current_regime, "current_volatility": round(current_vol, 4), "average_volatility": round(recent_avg_vol, 4), "regime_history": regime_history[-30:], # 최근 30개 "volatility_clustering": clustering } def _classify_return_regime(self, returns: List[float]) -> Dict[str, Any]: """수익률 체제 분류""" if len(returns) < 10: return {"current_regime": "unknown"} # 최근 추세 분석 recent_returns = returns[-20:] if len(returns) >= 20 else returns cumulative_return = sum(recent_returns) avg_return = statistics.mean(recent_returns) vol = statistics.stdev(recent_returns) if len(recent_returns) > 1 else 0 # Sharpe ratio 계산 (간소화) sharpe = avg_return / vol if vol > 0 else 0 # 체제 분류 if avg_return > 0.001 and sharpe > 0.5: current_regime = "bull" probability = min(0.9, 0.5 + sharpe * 0.2) elif avg_return < -0.001 and sharpe < -0.3: current_regime = "bear" probability = min(0.9, 0.5 + abs(sharpe) * 0.2) else: current_regime = "sideways" probability = max(0.6, 1.0 - abs(sharpe) * 0.3) return { "current_regime": current_regime, "regime_probability": round(probability, 3), "average_return": round(avg_return, 6), "sharpe_ratio": round(sharpe, 3), "trend_strength": abs(sharpe) } def _analyze_volume_regime(self, volumes: List[float]) -> Dict[str, Any]: """거래량 체제 분석""" if len(volumes) < 10: return {"current_regime": "unknown"} recent_avg = statistics.mean(volumes[-10:]) historical_avg = statistics.mean(volumes) volume_ratio = recent_avg / historical_avg # 체제 분류 if volume_ratio > 1.3: current_regime = "high_volume" elif volume_ratio < 0.7: current_regime = "low_volume" else: current_regime = "normal_volume" return { "current_regime": current_regime, "volume_ratio": round(volume_ratio, 3), "recent_average": int(recent_avg), "historical_average": int(historical_avg) } def _synthesize_statistical_regime(self, regime_results: Dict[str, Any]) -> Dict[str, Any]: """통계적 분석 결과 종합""" vol_regime = regime_results.get("volatility_regime", {}).get("current_regime", "unknown") return_regime = regime_results.get("return_regime", {}).get("current_regime", "unknown") volume_regime = regime_results.get("volume_regime", {}).get("current_regime", "unknown") # 종합 판단 로직 if return_regime == "bull" and vol_regime in ["low", "medium"]: overall_regime = "bull_market" elif return_regime == "bear" and vol_regime in ["medium", "high"]: overall_regime = "bear_market" elif vol_regime == "high": overall_regime = "volatile_market" else: overall_regime = "sideways_market" return { "overall_regime": overall_regime, "component_regimes": { "volatility": vol_regime, "returns": return_regime, "volume": volume_regime }, "regime_confidence": self._calculate_regime_confidence(regime_results) } def _perform_technical_analysis(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]: """기술적 지표 기반 국면 분석""" # 기술 지표 데이터 추출 technical_data = [] for item in market_data: tech_item = { "rsi": item.get("rsi", 50), "macd": item.get("macd", 0), "bollinger_position": item.get("bollinger_position", 0.5) } technical_data.append(tech_item) if not technical_data: return {"error": "No technical indicator data available"} # 기술 지표 분석 analysis = self._analyze_technical_indicators(technical_data) return analysis def _analyze_technical_indicators(self, technical_data: List[Dict[str, Any]]) -> Dict[str, Any]: """기술적 지표 분석""" if not technical_data: return {} latest = technical_data[-1] recent_data = technical_data[-10:] if len(technical_data) >= 10 else technical_data # RSI 기반 국면 분석 current_rsi = latest["rsi"] avg_rsi = statistics.mean([item["rsi"] for item in recent_data]) if avg_rsi > 70: rsi_regime = "overbought" elif avg_rsi < 30: rsi_regime = "oversold" else: rsi_regime = "neutral" # MACD 기반 추세 분석 current_macd = latest["macd"] macd_trend = "bullish" if current_macd > 0 else "bearish" # 볼린저 밴드 위치 분석 bb_position = latest["bollinger_position"] if bb_position > 0.8: bb_regime = "upper_band" elif bb_position < 0.2: bb_regime = "lower_band" else: bb_regime = "middle_range" # 종합 기술적 국면 if rsi_regime == "overbought" and bb_regime == "upper_band": overall_tech_regime = "technically_overbought" elif rsi_regime == "oversold" and bb_regime == "lower_band": overall_tech_regime = "technically_oversold" elif macd_trend == "bullish" and rsi_regime == "neutral": overall_tech_regime = "bullish_momentum" elif macd_trend == "bearish" and rsi_regime == "neutral": overall_tech_regime = "bearish_momentum" else: overall_tech_regime = "neutral" return { "trend_regime": macd_trend, "momentum_regime": rsi_regime, "overbought_oversold": rsi_regime, "support_resistance_levels": { "bollinger_position": bb_regime, "current_position": round(bb_position, 3) }, "overall_technical_regime": overall_tech_regime, "technical_indicators": { "rsi": round(current_rsi, 1), "macd": round(current_macd, 4), "bollinger_position": round(bb_position, 3) } } def _detect_regime_transitions(self, regime_sequence: List[int]) -> List[Dict[str, Any]]: """국면 전환 탐지""" if len(regime_sequence) < 2: return [] transitions = [] current_regime = regime_sequence[0] transition_start = 0 for i, regime in enumerate(regime_sequence[1:], 1): if regime != current_regime: transitions.append({ "from_regime": current_regime, "to_regime": regime, "transition_date": i, "duration": i - transition_start, "type": self._classify_transition_type(current_regime, regime) }) current_regime = regime transition_start = i return transitions def _classify_transition_type(self, from_regime: int, to_regime: int) -> str: """전환 타입 분류""" if from_regime < to_regime: return "regime_escalation" elif from_regime > to_regime: return "regime_de_escalation" else: return "regime_stable" def _analyze_regime_persistence(self, regime_sequence: List[int]) -> Dict[str, Any]: """국면 지속성 분석""" if not regime_sequence: return {} # 각 국면별 지속 기간 계산 regime_durations = {} current_regime = regime_sequence[0] current_duration = 1 for regime in regime_sequence[1:]: if regime == current_regime: current_duration += 1 else: if current_regime not in regime_durations: regime_durations[current_regime] = [] regime_durations[current_regime].append(current_duration) current_regime = regime current_duration = 1 # 마지막 국면 추가 if current_regime not in regime_durations: regime_durations[current_regime] = [] regime_durations[current_regime].append(current_duration) # 통계 계산 all_durations = [d for durations in regime_durations.values() for d in durations] avg_duration = statistics.mean(all_durations) if all_durations else 0 # 전환 빈도 transitions = len([i for i in range(1, len(regime_sequence)) if regime_sequence[i] != regime_sequence[i-1]]) transition_frequency = transitions / len(regime_sequence) if regime_sequence else 0 return { "average_duration": round(avg_duration, 2), "regime_stability": 1 - transition_frequency, "transition_frequency": round(transition_frequency, 3), "regime_durations": regime_durations } def _calculate_regime_probabilities(self, hmm_model: Dict[str, Any]) -> List[float]: """현재 국면 확률 계산""" if "states" not in hmm_model: return [] states = hmm_model["states"] n_regimes = hmm_model["n_regimes"] if not states: return [1.0 / n_regimes] * n_regimes # 최근 상태 빈도 기반 확률 (간소화) recent_states = states[-20:] if len(states) >= 20 else states state_counts = [0] * n_regimes for state in recent_states: if 0 <= state < n_regimes: state_counts[state] += 1 total_counts = sum(state_counts) if total_counts == 0: return [1.0 / n_regimes] * n_regimes probabilities = [count / total_counts for count in state_counts] return [round(p, 3) for p in probabilities] def _detect_volatility_clustering(self, volatilities: List[float]) -> Dict[str, Any]: """변동성 클러스터링 탐지 (GARCH 효과)""" if len(volatilities) < 10: return {"cluster_periods": [], "garch_effects": False} # 변동성의 자기 상관 계산 (간소화) lag1_corr = 0 valid_pairs = 0 for i in range(1, len(volatilities)): if volatilities[i] > 0 and volatilities[i-1] > 0: lag1_corr += volatilities[i] * volatilities[i-1] valid_pairs += 1 if valid_pairs > 0: lag1_corr /= valid_pairs avg_vol_squared = statistics.mean([v*v for v in volatilities]) correlation = lag1_corr / avg_vol_squared if avg_vol_squared > 0 else 0 else: correlation = 0 # 클러스터 기간 식별 threshold = statistics.mean(volatilities) + statistics.stdev(volatilities) cluster_periods = [] in_cluster = False cluster_start = None for i, vol in enumerate(volatilities): if vol > threshold and not in_cluster: in_cluster = True cluster_start = i elif vol <= threshold and in_cluster: in_cluster = False if cluster_start is not None: cluster_periods.append({ "start": cluster_start, "end": i - 1, "duration": i - cluster_start }) return { "cluster_periods": cluster_periods, "garch_effects": correlation > 0.3, "volatility_persistence": round(correlation, 3) } def _calculate_regime_consensus(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]: """여러 분석 방법의 컨센서스 계산""" if not analysis_results: return {"overall_regime": "unknown", "confidence_score": 0.0} # 각 방법별 국면 판단 추출 regimes = {} confidences = {} for method, result in analysis_results.items(): if isinstance(result, dict) and not result.get("error"): if method == "hmm": regime_char = result.get("regime_characteristics", {}) if regime_char: # 현재 국면의 특성으로부터 라벨 추출 current_regime_id = result.get("current_regime", 0) if current_regime_id in regime_char: regimes[method] = regime_char[current_regime_id].get("regime_label", "unknown") else: regimes[method] = "unknown" else: regimes[method] = "unknown" confidences[method] = 0.8 # HMM 기본 신뢰도 elif method == "statistical": classification = result.get("regime_classification", {}) regimes[method] = classification.get("overall_regime", "unknown") confidences[method] = classification.get("regime_confidence", 0.7) elif method == "technical": regimes[method] = result.get("overall_technical_regime", "unknown") confidences[method] = 0.6 # 기술적 분석 기본 신뢰도 # 컨센서스 계산 if not regimes: return {"overall_regime": "unknown", "confidence_score": 0.0} # 가장 빈번한 국면 선택 regime_counts = {} for regime in regimes.values(): regime_counts[regime] = regime_counts.get(regime, 0) + 1 overall_regime = max(regime_counts, key=regime_counts.get) # 신뢰도 계산 (일치 비율과 개별 신뢰도 조합) agreement_ratio = regime_counts[overall_regime] / len(regimes) avg_confidence = statistics.mean(confidences.values()) if confidences else 0 final_confidence = (agreement_ratio + avg_confidence) / 2 return { "overall_regime": overall_regime, "confidence_score": round(final_confidence, 3), "method_agreement": round(agreement_ratio, 3), "regime_strength": "strong" if final_confidence > 0.7 else "medium" if final_confidence > 0.5 else "weak", "individual_results": regimes, "method_confidences": confidences } def _generate_regime_predictions(self, analysis_results: Dict[str, Any], horizon: int) -> Dict[str, Any]: """국면 변화 예측""" predictions = { "predicted_regimes": [], "probability_forecast": [], "regime_change_probability": 0.0 } # HMM 결과가 있는 경우 전환 확률 사용 if "hmm" in analysis_results: hmm_result = analysis_results["hmm"] if "transition_matrix" in hmm_result: predictions = self._predict_regime_changes(hmm_result, horizon) return predictions def _predict_regime_changes(self, hmm_model: Dict[str, Any], horizon: int) -> Dict[str, Any]: """HMM 기반 국면 변화 예측""" if "transition_matrix" not in hmm_model or "states" not in hmm_model: return {"predicted_regimes": [], "probability_forecast": [], "regime_change_probability": 0.0} transition_matrix = hmm_model["transition_matrix"] current_state = hmm_model["states"][-1] if hmm_model["states"] else 0 predicted_regimes = [] state_probabilities = [] # 현재 상태에서 시작 current_prob = [0.0] * len(transition_matrix) if 0 <= current_state < len(current_prob): current_prob[current_state] = 1.0 for step in range(horizon): # 다음 상태 확률 계산 next_prob = [0.0] * len(transition_matrix) for i in range(len(transition_matrix)): for j in range(len(transition_matrix)): next_prob[j] += current_prob[i] * transition_matrix[i][j] # 가장 확률이 높은 상태 선택 predicted_state = next_prob.index(max(next_prob)) predicted_regimes.append(predicted_state) state_probabilities.append(next_prob.copy()) current_prob = next_prob # 국면 변화 확률 계산 (현재 상태에서 벗어날 확률) if 0 <= current_state < len(transition_matrix): stay_probability = transition_matrix[current_state][current_state] change_probability = 1.0 - stay_probability else: change_probability = 0.5 return { "predicted_regimes": predicted_regimes, "probability_forecast": state_probabilities, "regime_change_probability": round(change_probability, 3) } def _map_investment_strategies(self, regime_consensus: Dict[str, Any]) -> Dict[str, Any]: """국면별 투자 전략 매핑""" overall_regime = regime_consensus.get("overall_regime", "unknown") confidence = regime_consensus.get("confidence_score", 0.0) strategy_mapping = { "bull_market": { "recommended_strategies": [ "Growth investing", "Momentum strategies", "Risk-on assets", "Cyclical sectors" ], "asset_allocation": { "equities": 70, "bonds": 20, "alternatives": 10 }, "risk_management": [ "Take profits systematically", "Monitor for reversal signals", "Maintain some defensive positions" ] }, "bear_market": { "recommended_strategies": [ "Defensive investing", "Value opportunities", "Short strategies", "Safe haven assets" ], "asset_allocation": { "equities": 30, "bonds": 50, "alternatives": 20 }, "risk_management": [ "Reduce risk exposure", "Preserve capital", "Look for quality at discount" ] }, "sideways_market": { "recommended_strategies": [ "Range trading", "Dividend strategies", "Sector rotation", "Mean reversion" ], "asset_allocation": { "equities": 50, "bonds": 35, "alternatives": 15 }, "risk_management": [ "Focus on income generation", "Active portfolio management", "Tactical allocation adjustments" ] }, "volatile_market": { "recommended_strategies": [ "Volatility trading", "Options strategies", "Risk parity", "Low volatility factors" ], "asset_allocation": { "equities": 40, "bonds": 40, "alternatives": 20 }, "risk_management": [ "Reduce position sizes", "Increase diversification", "Use hedging instruments" ] } } default_strategy = { "recommended_strategies": ["Balanced approach", "Dollar-cost averaging"], "asset_allocation": {"equities": 60, "bonds": 30, "alternatives": 10}, "risk_management": ["Regular rebalancing", "Risk monitoring"] } strategy = strategy_mapping.get(overall_regime, default_strategy) # 신뢰도에 따른 조정 if confidence < 0.6: strategy["notes"] = "Low confidence in regime identification - consider more defensive approach" return strategy def _perform_backtesting(self, market_data: List[Dict[str, Any]], analysis_results: Dict[str, Any], backtest_period: str) -> Dict[str, Any]: """백테스팅 수행""" # 간소화된 백테스팅 backtest_days = {"30d": 30, "60d": 60, "90d": 90}[backtest_period] if len(market_data) < backtest_days: return {"error": "Insufficient data for backtesting"} # 최근 데이터로 정확도 측정 recent_data = market_data[-backtest_days:] # 간단한 정확도 메트릭 (실제로는 더 복잡한 백테스팅 필요) regime_prediction_accuracy = random.uniform(0.6, 0.8) # 가상의 정확도 return { "backtest_period": backtest_period, "accuracy_metrics": { "regime_prediction_accuracy": round(regime_prediction_accuracy, 3), "directional_accuracy": round(random.uniform(0.55, 0.75), 3) }, "strategy_performance": { "sharpe_ratio": round(random.uniform(0.8, 1.5), 2), "max_drawdown": round(random.uniform(0.05, 0.15), 3) } } def _calculate_regime_confidence(self, regime_results: Dict[str, Any]) -> float: """국면 분류 신뢰도 계산""" confidence_scores = [] for method_result in regime_results.values(): if isinstance(method_result, dict): # 각 방법별 신뢰도 추출 if "regime_probability" in method_result: confidence_scores.append(method_result["regime_probability"]) elif "confidence" in method_result: confidence_scores.append(method_result["confidence"]) else: confidence_scores.append(0.6) # 기본값 return round(statistics.mean(confidence_scores), 3) if confidence_scores else 0.5 def _engineer_regime_features(self, market_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """국면 분석을 위한 피처 엔지니어링""" features = [] for i, item in enumerate(market_data): feature_dict = {} # 기본 피처 feature_dict["daily_return"] = item.get("daily_return", 0) feature_dict["volatility"] = item.get("volatility", 0) feature_dict["volume"] = item.get("volume", 0) # 이동평균 피처 if i >= 5: recent_vols = [market_data[j].get("volatility", 0) for j in range(max(0, i-4), i+1)] recent_returns = [market_data[j].get("daily_return", 0) for j in range(max(0, i-4), i+1)] recent_volumes = [market_data[j].get("volume", 0) for j in range(max(0, i-4), i+1)] feature_dict["volatility_ma"] = statistics.mean(recent_vols) feature_dict["return_ma"] = statistics.mean(recent_returns) feature_dict["volume_ratio"] = item.get("volume", 0) / statistics.mean(recent_volumes) if statistics.mean(recent_volumes) > 0 else 1 else: feature_dict["volatility_ma"] = item.get("volatility", 0) feature_dict["return_ma"] = item.get("daily_return", 0) feature_dict["volume_ratio"] = 1.0 # 모멘텀 점수 if i >= 10: momentum_returns = [market_data[j].get("daily_return", 0) for j in range(max(0, i-9), i+1)] feature_dict["momentum_score"] = sum(momentum_returns) else: feature_dict["momentum_score"] = 0 features.append(feature_dict) return features def _characterize_regimes(self, regime_data: Dict[int, Dict[str, List]]) -> Dict[int, Dict[str, Any]]: """국면별 특성화""" characteristics = {} for regime_id, data in regime_data.items(): returns = data.get("returns", []) volatilities = data.get("volatilities", []) if returns and volatilities: mean_return = statistics.mean(returns) mean_vol = statistics.mean(volatilities) # 국면 라벨 결정 if mean_return > 0.002 and mean_vol < 0.02: label = "bull_market" elif mean_return < -0.001: label = "bear_market" elif mean_vol > 0.03: label = "high_volatility" else: label = "sideways_market" characteristics[regime_id] = { "regime_label": label, "mean_return": round(mean_return, 6), "mean_volatility": round(mean_vol, 6), "return_vol_ratio": round(mean_return / mean_vol if mean_vol > 0 else 0, 3) } return characteristics

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server