Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
anomaly_tools.pyβ€’34.4 kB
"""이상 μ§•ν›„ 탐지 도ꡬ""" import json import logging import math import statistics from datetime import datetime, timedelta from typing import Any, Dict, List, Tuple from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class AnomalyDetectionTool(BaseTool): """이상 μ§•ν›„ 탐지 도ꡬ""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 300 # 5λΆ„ @property def name(self) -> str: return "detect_market_anomalies" @property def description(self) -> str: return "μ‹œμž₯ 이상 μ§•ν›„λ₯Ό νƒμ§€ν•©λ‹ˆλ‹€. 톡계적 μ΄μƒμΉ˜, λ¨Έμ‹ λŸ¬λ‹ 기반 이상 탐지, μ‹œκ³„μ—΄ νŒ¨ν„΄ 뢄석을 μ§€μ›ν•©λ‹ˆλ‹€." def get_tool_definition(self) -> ToolSchema: """도ꡬ μ •μ˜ λ°˜ν™˜""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "market": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"], "default": "KOSPI", "description": "뢄석할 μ‹œμž₯" }, "detection_methods": { "type": "array", "items": { "type": "string", "enum": [ "statistical", "isolation_forest", "timeseries" ] }, "minItems": 1, "default": ["statistical"], "description": "이상 탐지 방법 λͺ©λ‘" }, "lookback_period": { "type": "string", "enum": ["7d", "30d", "60d", "90d"], "default": "30d", "description": "뢄석 κΈ°κ°„" }, "sensitivity": { "type": "number", "default": 2.0, "minimum": 0.5, "maximum": 5.0, "description": "이상 탐지 민감도 (Z-score μž„κ³„κ°’)" }, "contamination": { "type": "number", "default": 0.1, "minimum": 0.01, "maximum": 0.3, "description": "Isolation Forest μ˜€μ—Όλ„ (μ΄μƒμΉ˜ λΉ„μœ¨)" }, "include_feature_analysis": { "type": "boolean", "default": False, "description": "ν”Όμ²˜ μ€‘μš”λ„ 뢄석 포함 μ—¬λΆ€" }, "include_risk_assessment": { "type": "boolean", "default": False, "description": "리슀크 평가 포함 μ—¬λΆ€" }, "include_realtime_alerts": { "type": "boolean", "default": False, "description": "μ‹€μ‹œκ°„ μ•Œλ¦Ό 포함 μ—¬λΆ€" }, "alert_threshold": { "type": "string", "enum": ["low", "medium", "high"], "default": "medium", "description": "μ•Œλ¦Ό μž„κ³„κ°’" } }, "required": ["detection_methods"] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """이상 μ§•ν›„ 탐지 μ‹€ν–‰""" try: # νŒŒλΌλ―Έν„° μΆ”μΆœ 및 검증 market = arguments.get("market", "KOSPI") detection_methods = arguments.get("detection_methods", ["statistical"]) lookback_period = arguments.get("lookback_period", "30d") sensitivity = arguments.get("sensitivity", 2.0) contamination = arguments.get("contamination", 0.1) include_feature_analysis = arguments.get("include_feature_analysis", False) include_risk_assessment = arguments.get("include_risk_assessment", False) include_realtime_alerts = arguments.get("include_realtime_alerts", False) alert_threshold = arguments.get("alert_threshold", "medium") self._validate_parameters(market, detection_methods, lookback_period, sensitivity) # μΊμ‹œ 확인 cache_key = self._generate_cache_key( market, detection_methods, lookback_period, sensitivity, contamination ) cached_data = await self.cache_manager.get(cache_key) if cached_data and self._is_data_fresh(cached_data): self.logger.info(f"Cache hit for {cache_key}") return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))] # λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ 데이터 쑰회 data = await self._fetch_anomaly_detection_data( market, detection_methods, lookback_period, sensitivity, contamination, include_feature_analysis, include_risk_assessment, include_realtime_alerts, alert_threshold ) # μΊμ‹œ μ €μž₯ await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl) self.logger.info(f"Anomaly detection completed for {market}") return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in anomaly detection tool: {e}") raise def _validate_parameters(self, market: str, methods: List[str], period: str, sensitivity: float): """νŒŒλΌλ―Έν„° 검증""" valid_markets = ["KOSPI", "KOSDAQ", "ALL"] if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if not methods or len(methods) == 0: raise ValueError("At least one detection method must be specified") valid_methods = ["statistical", "isolation_forest", "timeseries"] for method in methods: if method not in valid_methods: raise ValueError(f"Invalid detection method: {method}") valid_periods = ["7d", "30d", "60d", "90d"] if period not in valid_periods: raise ValueError(f"Invalid lookback period: {period}") if sensitivity < 0.5 or sensitivity > 5.0: raise ValueError("Invalid sensitivity: must be between 0.5 and 5.0") def _generate_cache_key(self, market: str, methods: List[str], period: str, sensitivity: float, contamination: float) -> str: """μΊμ‹œ ν‚€ 생성""" methods_str = "_".join(sorted(methods)) return f"anomaly:{market}:{methods_str}:{period}:{sensitivity}:{contamination}" def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """데이터 신선도 확인""" if "timestamp" not in data: return False try: timestamp = datetime.fromisoformat(data["timestamp"]) return datetime.now() - timestamp < timedelta(minutes=5) except (ValueError, TypeError): return False async def _fetch_anomaly_detection_data(self, market: str, methods: List[str], period: str, sensitivity: float, contamination: float, include_features: bool, include_risk: bool, include_alerts: bool, alert_threshold: str) -> Dict[str, Any]: """λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ 이상 μ§•ν›„ 탐지 데이터 쑰회 및 뢄석""" try: days = self._get_period_days(period) # 쿼리 ꡬ성 query = """ SELECT date, close_price, volume, daily_return, volatility, vix, put_call_ratio, market_cap FROM market_anomaly_data WHERE date >= CURRENT_DATE - INTERVAL '%s days' """ params = [days] if market != "ALL": query += " AND market = %s" params.append(market) query += " ORDER BY date DESC" # 데이터 쑰회 market_data = await self.db_manager.fetch_all(query, *params) # 데이터 λΆ€μ‘± 체크 if len(market_data) < 10: # μ΅œμ†Œ 10일 데이터 ν•„μš” return { "timestamp": datetime.now().isoformat(), "market": market, "warning": "Insufficient data for anomaly detection (minimum 10 days required)", "data_points": len(market_data) } # κΈ°λ³Έ κ²°κ³Ό ꡬ성 result = { "timestamp": datetime.now().isoformat(), "market": market, "lookback_period": period, "anomaly_detection_results": {}, "anomaly_summary": {} } # 각 탐지 방법 μ‹€ν–‰ all_anomalies = [] for method in methods: try: if method == "statistical": stat_result = self._perform_statistical_detection(market_data, sensitivity) result["anomaly_detection_results"]["statistical"] = stat_result all_anomalies.extend(stat_result.get("detected_anomalies", [])) elif method == "isolation_forest": if_result = self._perform_isolation_forest_detection(market_data, contamination) result["anomaly_detection_results"]["isolation_forest"] = if_result all_anomalies.extend(if_result.get("detected_anomalies", [])) elif method == "timeseries": ts_result = self._perform_timeseries_detection(market_data) result["anomaly_detection_results"]["timeseries"] = ts_result all_anomalies.extend(ts_result.get("detected_anomalies", [])) except Exception as e: self.logger.warning(f"Failed to perform {method} detection: {e}") result["anomaly_detection_results"][method] = { "error": f"Detection failed: {str(e)}" } # μ’…ν•© μš”μ•½ 생성 result["anomaly_summary"] = self._generate_anomaly_summary(all_anomalies) # ν”Όμ²˜ 뢄석 if include_features: result["feature_analysis"] = self._analyze_features(market_data) # 리슀크 평가 if include_risk: result["risk_assessment"] = self._assess_risk_level( result["anomaly_detection_results"] ) # μ‹€μ‹œκ°„ μ•Œλ¦Ό if include_alerts: result["realtime_alerts"] = self._generate_realtime_alerts( all_anomalies, alert_threshold ) return result except Exception as e: self.logger.error(f"Database query failed: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Failed to fetch anomaly detection data: {e}") def _get_period_days(self, period: str) -> int: """기간을 일수둜 λ³€ν™˜""" period_map = { "7d": 7, "30d": 30, "60d": 60, "90d": 90 } return period_map.get(period, 30) def _perform_statistical_detection(self, market_data: List[Dict[str, Any]], sensitivity: float) -> Dict[str, Any]: """톡계적 이상 탐지 μˆ˜ν–‰""" # 가격 변동λ₯  데이터 μΆ”μΆœ returns = [item["daily_return"] for item in market_data if item.get("daily_return") is not None] volumes = [item["volume"] for item in market_data if item.get("volume") is not None] volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None] # Z-score 기반 이상 탐지 z_score_anomalies = [] if returns: z_score_anomalies.extend(self._detect_z_score_anomalies(returns, sensitivity, "daily_return")) if volumes: z_score_anomalies.extend(self._detect_z_score_anomalies(volumes, sensitivity, "volume")) if volatilities: z_score_anomalies.extend(self._detect_z_score_anomalies(volatilities, sensitivity, "volatility")) # IQR 기반 이상 탐지 iqr_anomalies = [] if returns: iqr_anomalies.extend(self._detect_iqr_anomalies(returns, "daily_return")) if volumes: iqr_anomalies.extend(self._detect_iqr_anomalies(volumes, "volume")) return { "z_score_anomalies": z_score_anomalies, "iqr_anomalies": iqr_anomalies, "anomaly_count": len(z_score_anomalies) + len(iqr_anomalies), "anomaly_percentage": round((len(z_score_anomalies) + len(iqr_anomalies)) / len(market_data) * 100, 2), "detected_anomalies": z_score_anomalies + iqr_anomalies } def _detect_z_score_anomalies(self, data: List[float], threshold: float, feature_name: str = "value") -> List[Dict[str, Any]]: """Z-score 기반 μ΄μƒμΉ˜ 탐지""" if len(data) < 3: return [] mean_val = statistics.mean(data) std_val = statistics.stdev(data) if len(data) > 1 else 0 if std_val == 0: return [] anomalies = [] for i, value in enumerate(data): z_score = (value - mean_val) / std_val if abs(z_score) > threshold: anomalies.append({ "index": i, "value": value, "z_score": round(z_score, 4), "feature": feature_name, "severity": "high" if abs(z_score) > threshold * 1.5 else "medium" }) return anomalies def _detect_iqr_anomalies(self, data: List[float], feature_name: str = "value") -> List[Dict[str, Any]]: """IQR 기반 μ΄μƒμΉ˜ 탐지""" if len(data) < 4: return [] sorted_data = sorted(data) n = len(sorted_data) q1_idx = n // 4 q3_idx = 3 * n // 4 q1 = sorted_data[q1_idx] q3 = sorted_data[q3_idx] iqr = q3 - q1 if iqr == 0: return [] lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr anomalies = [] for i, value in enumerate(data): if value < lower_bound or value > upper_bound: distance_from_bound = min(abs(value - lower_bound), abs(value - upper_bound)) severity = "high" if distance_from_bound > iqr else "medium" anomalies.append({ "index": i, "value": value, "feature": feature_name, "lower_bound": lower_bound, "upper_bound": upper_bound, "severity": severity }) return anomalies def _perform_isolation_forest_detection(self, market_data: List[Dict[str, Any]], contamination: float) -> Dict[str, Any]: """Isolation Forest 기반 이상 탐지 (κ°„μ†Œν™” κ΅¬ν˜„)""" # ν”Όμ²˜ μΆ”μΆœ features = self._extract_features(market_data) if len(features) < 10: return {"error": "Insufficient data for Isolation Forest"} # κ°„μ†Œν™”λœ Isolation Forest (μ‹€μ œλ‘œλŠ” scikit-learn μ‚¬μš© ꢌμž₯) anomalies, scores, model_stats = self._detect_isolation_forest_anomalies(features, contamination) return { "anomalies_detected": len(anomalies), "anomaly_scores": scores, "feature_importance": self._calculate_feature_importance(features, anomalies), "model_stats": model_stats, "detected_anomalies": anomalies } def _extract_features(self, market_data: List[Dict[str, Any]]) -> List[List[float]]: """μ‹œμž₯ λ°μ΄ν„°μ—μ„œ ν”Όμ²˜ μΆ”μΆœ""" features = [] for item in market_data: feature_row = [ item.get("close_price", 0), item.get("volume", 0) / 1000000, # 백만 λ‹¨μœ„λ‘œ μŠ€μΌ€μΌλ§ item.get("daily_return", 0), item.get("volatility", 0), item.get("vix", 20), # κΈ°λ³Έκ°’ item.get("put_call_ratio", 0.8) # κΈ°λ³Έκ°’ ] features.append(feature_row) return features def _detect_isolation_forest_anomalies(self, features: List[List[float]], contamination: float) -> Tuple[List[Dict], List[float], Dict]: """κ°„μ†Œν™”λœ Isolation Forest 이상 탐지""" # κ°„λ‹¨ν•œ μ΄μƒμΉ˜ 점수 계산 (μ‹€μ œ Isolation Forest μ•Œκ³ λ¦¬μ¦˜μ˜ κ·Όμ‚¬μΉ˜) n_samples = len(features) n_features = len(features[0]) if features else 0 # 각 ν”Όμ²˜λ³„ 톡계 계산 feature_stats = [] for i in range(n_features): feature_values = [row[i] for row in features] feature_stats.append({ "mean": statistics.mean(feature_values), "std": statistics.stdev(feature_values) if len(feature_values) > 1 else 1.0 }) # 이상 점수 계산 (각 ν”Όμ²˜μ˜ ν‘œμ€€νŽΈμ°¨ 기반) scores = [] for row in features: score = 0 for i, value in enumerate(row): if feature_stats[i]["std"] > 0: normalized_value = abs((value - feature_stats[i]["mean"]) / feature_stats[i]["std"]) score += normalized_value scores.append(score / n_features) # μž„κ³„κ°’ 계산 (μƒμœ„ contamination λΉ„μœ¨) sorted_scores = sorted(scores, reverse=True) threshold_idx = int(len(sorted_scores) * contamination) threshold = sorted_scores[threshold_idx] if threshold_idx < len(sorted_scores) else max(scores) # μ΄μƒμΉ˜ μΆ”μΆœ anomalies = [] for i, score in enumerate(scores): if score > threshold: anomalies.append({ "index": i, "anomaly_score": round(score, 4), "features": features[i], "severity": "high" if score > threshold * 1.5 else "medium" }) model_stats = { "n_estimators": 100, # κ°€μƒμ˜ 트리 개수 "contamination": contamination, "threshold": threshold, "n_features": n_features } return anomalies, scores, model_stats def _calculate_feature_importance(self, features: List[List[float]], anomalies: List[Dict]) -> Dict[str, float]: """ν”Όμ²˜ μ€‘μš”λ„ 계산""" if not features or not anomalies: return {} n_features = len(features[0]) feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"] # 각 ν”Όμ²˜λ³„ λΆ„μ‚° 기여도 계산 importance = {} for i in range(min(n_features, len(feature_names))): feature_values = [row[i] for row in features] if len(feature_values) > 1: variance = statistics.variance(feature_values) importance[feature_names[i]] = round(variance / sum([statistics.variance([row[j] for row in features]) for j in range(n_features) if len([row[j] for row in features]) > 1]), 4) else: importance[feature_names[i]] = 0.0 return importance def _perform_timeseries_detection(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]: """μ‹œκ³„μ—΄ 이상 νŒ¨ν„΄ 탐지""" # 가격 데이터 μΆ”μΆœ prices = [item["close_price"] for item in market_data if item.get("close_price") is not None] volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None] result = {} # κ³„μ ˆμ„± 이상 탐지 (κ°„μ†Œν™”) if len(prices) >= 20: seasonal_anomalies = self._detect_seasonal_anomalies(prices) result["seasonal_anomalies"] = seasonal_anomalies # ꡬ쑰적 변화점 탐지 if len(prices) >= 30: structural_breaks = self._detect_structural_breaks(prices) result["structural_breaks"] = structural_breaks # 변동성 체제 λ³€ν™” 탐지 if len(volatilities) >= 20: volatility_regimes = self._detect_volatility_regimes(volatilities) result["volatility_regimes"] = volatility_regimes # νŠΈλ Œλ“œ 이상 탐지 trend_breaks = self._detect_trend_breaks(prices) result["trend_breaks"] = trend_breaks # λͺ¨λ“  νƒμ§€λœ 이상 νŒ¨ν„΄ μˆ˜μ§‘ all_anomalies = [] for key, anomaly_list in result.items(): if isinstance(anomaly_list, list): all_anomalies.extend(anomaly_list) result["detected_anomalies"] = all_anomalies return result def _detect_seasonal_anomalies(self, prices: List[float]) -> List[Dict[str, Any]]: """κ³„μ ˆμ„± 이상 탐지 (κ°„μ†Œν™”)""" # μ£Όκ°„ νŒ¨ν„΄ 뢄석 (7일 μ£ΌκΈ°) if len(prices) < 14: return [] weekly_patterns = [] for i in range(0, len(prices) - 7, 7): weekly_change = prices[i] - prices[i + 7] if i + 7 < len(prices) else 0 weekly_patterns.append(weekly_change) if len(weekly_patterns) < 2: return [] # μ£Όκ°„ νŒ¨ν„΄μ˜ μ΄μƒμΉ˜ 탐지 anomalies = self._detect_z_score_anomalies(weekly_patterns, 2.0, "weekly_pattern") return [{"type": "seasonal", **anomaly} for anomaly in anomalies] def _detect_structural_breaks(self, prices: List[float]) -> List[Dict[str, Any]]: """ꡬ쑰적 변화점 탐지 (κ°„μ†Œν™”λœ CUSUM)""" if len(prices) < 20: return [] # λˆ„μ  합계 기반 변화점 탐지 changes = [prices[i] - prices[i-1] for i in range(1, len(prices))] mean_change = statistics.mean(changes) cusum_pos = 0 cusum_neg = 0 threshold = 3 * statistics.stdev(changes) if len(changes) > 1 else 1.0 breaks = [] for i, change in enumerate(changes): cusum_pos = max(0, cusum_pos + change - mean_change) cusum_neg = min(0, cusum_neg + change - mean_change) if cusum_pos > threshold or cusum_neg < -threshold: breaks.append({ "type": "structural_break", "breakpoint": i + 1, "cusum_value": cusum_pos if cusum_pos > threshold else cusum_neg, "severity": "high" if abs(cusum_pos if cusum_pos > threshold else cusum_neg) > threshold * 1.5 else "medium" }) cusum_pos = 0 cusum_neg = 0 return breaks def _detect_volatility_regimes(self, volatilities: List[float]) -> List[Dict[str, Any]]: """변동성 체제 탐지""" if len(volatilities) < 10: return [] # 변동성 μˆ˜μ€€ λΆ„λ₯˜ mean_vol = statistics.mean(volatilities) std_vol = statistics.stdev(volatilities) if len(volatilities) > 1 else 0 regimes = [] current_regime = None regime_start = 0 for i, vol in enumerate(volatilities): if vol > mean_vol + std_vol: regime_type = "high_volatility" elif vol < mean_vol - std_vol: regime_type = "low_volatility" else: regime_type = "normal" if regime_type != current_regime: if current_regime is not None: regimes.append({ "type": "volatility_regime", "regime_type": current_regime, "start_index": regime_start, "end_index": i - 1, "duration": i - regime_start, "volatility_level": current_regime }) current_regime = regime_type regime_start = i # λ§ˆμ§€λ§‰ 체제 μΆ”κ°€ if current_regime is not None: regimes.append({ "type": "volatility_regime", "regime_type": current_regime, "start_index": regime_start, "end_index": len(volatilities) - 1, "duration": len(volatilities) - regime_start, "volatility_level": current_regime }) return regimes def _detect_trend_breaks(self, prices: List[float]) -> List[Dict[str, Any]]: """νŠΈλ Œλ“œ 변화점 탐지""" if len(prices) < 10: return [] # 이동평균을 μ΄μš©ν•œ νŠΈλ Œλ“œ 뢄석 window = 5 trends = [] for i in range(window, len(prices)): before_avg = statistics.mean(prices[i-window:i]) current_price = prices[i] if current_price > before_avg * 1.02: # 2% 이상 μƒμŠΉ trend = "up" elif current_price < before_avg * 0.98: # 2% 이상 ν•˜λ½ trend = "down" else: trend = "sideways" trends.append((i, trend)) # νŠΈλ Œλ“œ 변화점 탐지 breaks = [] current_trend = None for i, (idx, trend) in enumerate(trends): if trend != current_trend: if current_trend is not None: breaks.append({ "type": "trend_break", "breakpoint": idx, "from_trend": current_trend, "to_trend": trend, "severity": "medium" }) current_trend = trend return breaks def _generate_anomaly_summary(self, all_anomalies: List[Dict[str, Any]]) -> Dict[str, Any]: """μ’…ν•© 이상 μ§•ν›„ μš”μ•½ 생성""" if not all_anomalies: return { "total_anomalies_detected": 0, "severity_distribution": {"low": 0, "medium": 0, "high": 0}, "confidence_scores": {"average": 0.0} } # 심각도별 뢄포 severity_counts = {"low": 0, "medium": 0, "high": 0} for anomaly in all_anomalies: severity = anomaly.get("severity", "medium") if severity in severity_counts: severity_counts[severity] += 1 # 신뒰도 점수 계산 confidence_scores = [anomaly.get("confidence", 0.7) for anomaly in all_anomalies if "confidence" in anomaly] avg_confidence = statistics.mean(confidence_scores) if confidence_scores else 0.7 return { "total_anomalies_detected": len(all_anomalies), "severity_distribution": severity_counts, "confidence_scores": { "average": round(avg_confidence, 3), "min": round(min(confidence_scores), 3) if confidence_scores else 0.0, "max": round(max(confidence_scores), 3) if confidence_scores else 0.0 } } def _analyze_features(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]: """ν”Όμ²˜ 뢄석""" features = self._extract_features(market_data) if not features: return {"error": "No features available for analysis"} feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"] n_features = min(len(features[0]), len(feature_names)) analysis = {} for i in range(n_features): feature_values = [row[i] for row in features] analysis[feature_names[i]] = { "mean": round(statistics.mean(feature_values), 4), "std": round(statistics.stdev(feature_values) if len(feature_values) > 1 else 0, 4), "min": round(min(feature_values), 4), "max": round(max(feature_values), 4), "correlation_with_anomalies": 0.5 # κ°„μ†Œν™”λœ 상관관계 } return analysis def _assess_risk_level(self, detection_results: Dict[str, Any]) -> Dict[str, Any]: """리슀크 레벨 평가""" total_anomalies = 0 severe_anomalies = 0 # 각 탐지 방법별 κ²°κ³Ό 집계 for method, result in detection_results.items(): if isinstance(result, dict) and "detected_anomalies" in result: anomalies = result["detected_anomalies"] total_anomalies += len(anomalies) severe_anomalies += len([a for a in anomalies if a.get("severity") == "high"]) # 리슀크 레벨 κ²°μ • if severe_anomalies > 5: risk_level = "critical" elif total_anomalies > 10: risk_level = "high" elif total_anomalies > 5: risk_level = "medium" else: risk_level = "low" # 신뒰도 계산 confidence = min(1.0, total_anomalies / 20) if total_anomalies > 0 else 0.1 return { "overall_risk_level": risk_level, "risk_factors": { "total_anomalies": total_anomalies, "severe_anomalies": severe_anomalies, "anomaly_density": round(total_anomalies / 30, 3) # 30일 κΈ°μ€€ }, "confidence_score": round(confidence, 3), "recommended_actions": self._get_risk_recommendations(risk_level) } def _get_risk_recommendations(self, risk_level: str) -> List[str]: """리슀크 λ ˆλ²¨λ³„ ꢌμž₯ 쑰치""" recommendations = { "low": [ "정상적인 μ‹œμž₯ μƒν™©μž…λ‹ˆλ‹€", "μ •κΈ° λͺ¨λ‹ˆν„°λ§μ„ μ§€μ†ν•˜μ„Έμš”" ], "medium": [ "μ‹œμž₯ 변동성이 μ¦κ°€ν–ˆμŠ΅λ‹ˆλ‹€", "포트폴리였 리슀크λ₯Ό μ κ²€ν•˜μ„Έμš”", "μ£Όμš” μ§€ν‘œλ₯Ό λ©΄λ°€νžˆ λͺ¨λ‹ˆν„°λ§ν•˜μ„Έμš”" ], "high": [ "높은 μˆ˜μ€€μ˜ 이상 μ§•ν›„κ°€ νƒμ§€λ˜μ—ˆμŠ΅λ‹ˆλ‹€", "ν¬μ§€μ…˜ 규λͺ¨λ₯Ό μΆ•μ†Œν•˜λŠ” 것을 κ³ λ €ν•˜μ„Έμš”", "ν—€μ§€ μ „λž΅μ„ κ²€ν† ν•˜μ„Έμš”", "μ‹œμž₯ λ‰΄μŠ€μ™€ 이벀트λ₯Ό 주의 깊게 κ΄€μ°°ν•˜μ„Έμš”" ], "critical": [ "μ‹¬κ°ν•œ μ‹œμž₯ 이상 μƒν™©μž…λ‹ˆλ‹€", "μ¦‰μ‹œ 리슀크 관리 쑰치λ₯Ό μ·¨ν•˜μ„Έμš”", "포트폴리였 μž¬κ²€ν† κ°€ ν•„μš”ν•©λ‹ˆλ‹€", "μ „λ¬Έκ°€ 상담을 λ°›λŠ” 것을 ꢌμž₯ν•©λ‹ˆλ‹€" ] } return recommendations.get(risk_level, []) def _generate_realtime_alerts(self, anomalies: List[Dict[str, Any]], threshold: str) -> Dict[str, Any]: """μ‹€μ‹œκ°„ μ•Œλ¦Ό 생성""" # μž„κ³„κ°’λ³„ 필터링 threshold_map = {"low": 3, "medium": 5, "high": 10} min_anomalies = threshold_map.get(threshold, 5) critical_anomalies = [a for a in anomalies if a.get("severity") == "high"] if len(critical_anomalies) >= min_anomalies: alert_level = "critical" elif len(anomalies) >= min_anomalies: alert_level = "warning" else: alert_level = "normal" return { "alert_level": alert_level, "critical_anomalies": len(critical_anomalies), "total_anomalies": len(anomalies), "recommended_actions": self._get_alert_actions(alert_level), "alert_timestamp": datetime.now().isoformat() } def _get_alert_actions(self, alert_level: str) -> List[str]: """μ•Œλ¦Ό λ ˆλ²¨λ³„ 쑰치 사항""" actions = { "normal": ["정상 μƒν™©μž…λ‹ˆλ‹€"], "warning": [ "μ‹œμž₯ 상황을 주의 깊게 λͺ¨λ‹ˆν„°λ§ν•˜μ„Έμš”", "포트폴리였 리슀크λ₯Ό μ κ²€ν•˜μ„Έμš”" ], "critical": [ "μ¦‰μ‹œ 리슀크 κ΄€λ¦¬μžμ—κ²Œ μ•Œλ¦¬μ„Έμš”", "κΈ΄κΈ‰ ν¬μ§€μ…˜ κ²€ν† κ°€ ν•„μš”ν•©λ‹ˆλ‹€", "μ‹œμž₯ 상황 뢄석을 μˆ˜ν–‰ν•˜μ„Έμš”" ] } return actions.get(alert_level, [])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server