"""이상 징후 탐지 도구"""
import json
import logging
import math
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class AnomalyDetectionTool(BaseTool):
"""이상 징후 탐지 도구"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5분
@property
def name(self) -> str:
return "detect_market_anomalies"
@property
def description(self) -> str:
return "시장 이상 징후를 탐지합니다. 통계적 이상치, 머신러닝 기반 이상 탐지, 시계열 패턴 분석을 지원합니다."
def get_tool_definition(self) -> ToolSchema:
"""도구 정의 반환"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "KOSPI",
"description": "분석할 시장"
},
"detection_methods": {
"type": "array",
"items": {
"type": "string",
"enum": [
"statistical",
"isolation_forest",
"timeseries"
]
},
"minItems": 1,
"default": ["statistical"],
"description": "이상 탐지 방법 목록"
},
"lookback_period": {
"type": "string",
"enum": ["7d", "30d", "60d", "90d"],
"default": "30d",
"description": "분석 기간"
},
"sensitivity": {
"type": "number",
"default": 2.0,
"minimum": 0.5,
"maximum": 5.0,
"description": "이상 탐지 민감도 (Z-score 임계값)"
},
"contamination": {
"type": "number",
"default": 0.1,
"minimum": 0.01,
"maximum": 0.3,
"description": "Isolation Forest 오염도 (이상치 비율)"
},
"include_feature_analysis": {
"type": "boolean",
"default": False,
"description": "피처 중요도 분석 포함 여부"
},
"include_risk_assessment": {
"type": "boolean",
"default": False,
"description": "리스크 평가 포함 여부"
},
"include_realtime_alerts": {
"type": "boolean",
"default": False,
"description": "실시간 알림 포함 여부"
},
"alert_threshold": {
"type": "string",
"enum": ["low", "medium", "high"],
"default": "medium",
"description": "알림 임계값"
}
},
"required": ["detection_methods"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""이상 징후 탐지 실행"""
try:
# 파라미터 추출 및 검증
market = arguments.get("market", "KOSPI")
detection_methods = arguments.get("detection_methods", ["statistical"])
lookback_period = arguments.get("lookback_period", "30d")
sensitivity = arguments.get("sensitivity", 2.0)
contamination = arguments.get("contamination", 0.1)
include_feature_analysis = arguments.get("include_feature_analysis", False)
include_risk_assessment = arguments.get("include_risk_assessment", False)
include_realtime_alerts = arguments.get("include_realtime_alerts", False)
alert_threshold = arguments.get("alert_threshold", "medium")
self._validate_parameters(market, detection_methods, lookback_period, sensitivity)
# 캐시 확인
cache_key = self._generate_cache_key(
market, detection_methods, lookback_period, sensitivity, contamination
)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# 데이터베이스에서 데이터 조회
data = await self._fetch_anomaly_detection_data(
market, detection_methods, lookback_period, sensitivity, contamination,
include_feature_analysis, include_risk_assessment, include_realtime_alerts,
alert_threshold
)
# 캐시 저장
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Anomaly detection completed for {market}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in anomaly detection tool: {e}")
raise
def _validate_parameters(self, market: str, methods: List[str], period: str, sensitivity: float):
"""파라미터 검증"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if not methods or len(methods) == 0:
raise ValueError("At least one detection method must be specified")
valid_methods = ["statistical", "isolation_forest", "timeseries"]
for method in methods:
if method not in valid_methods:
raise ValueError(f"Invalid detection method: {method}")
valid_periods = ["7d", "30d", "60d", "90d"]
if period not in valid_periods:
raise ValueError(f"Invalid lookback period: {period}")
if sensitivity < 0.5 or sensitivity > 5.0:
raise ValueError("Invalid sensitivity: must be between 0.5 and 5.0")
def _generate_cache_key(self, market: str, methods: List[str], period: str,
sensitivity: float, contamination: float) -> str:
"""캐시 키 생성"""
methods_str = "_".join(sorted(methods))
return f"anomaly:{market}:{methods_str}:{period}:{sensitivity}:{contamination}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""데이터 신선도 확인"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_anomaly_detection_data(self, market: str, methods: List[str], period: str,
sensitivity: float, contamination: float,
include_features: bool, include_risk: bool,
include_alerts: bool, alert_threshold: str) -> Dict[str, Any]:
"""데이터베이스에서 이상 징후 탐지 데이터 조회 및 분석"""
try:
days = self._get_period_days(period)
# 쿼리 구성
query = """
SELECT date, close_price, volume, daily_return, volatility,
vix, put_call_ratio, market_cap
FROM market_anomaly_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC"
# 데이터 조회
market_data = await self.db_manager.fetch_all(query, *params)
# 데이터 부족 체크
if len(market_data) < 10: # 최소 10일 데이터 필요
return {
"timestamp": datetime.now().isoformat(),
"market": market,
"warning": "Insufficient data for anomaly detection (minimum 10 days required)",
"data_points": len(market_data)
}
# 기본 결과 구성
result = {
"timestamp": datetime.now().isoformat(),
"market": market,
"lookback_period": period,
"anomaly_detection_results": {},
"anomaly_summary": {}
}
# 각 탐지 방법 실행
all_anomalies = []
for method in methods:
try:
if method == "statistical":
stat_result = self._perform_statistical_detection(market_data, sensitivity)
result["anomaly_detection_results"]["statistical"] = stat_result
all_anomalies.extend(stat_result.get("detected_anomalies", []))
elif method == "isolation_forest":
if_result = self._perform_isolation_forest_detection(market_data, contamination)
result["anomaly_detection_results"]["isolation_forest"] = if_result
all_anomalies.extend(if_result.get("detected_anomalies", []))
elif method == "timeseries":
ts_result = self._perform_timeseries_detection(market_data)
result["anomaly_detection_results"]["timeseries"] = ts_result
all_anomalies.extend(ts_result.get("detected_anomalies", []))
except Exception as e:
self.logger.warning(f"Failed to perform {method} detection: {e}")
result["anomaly_detection_results"][method] = {
"error": f"Detection failed: {str(e)}"
}
# 종합 요약 생성
result["anomaly_summary"] = self._generate_anomaly_summary(all_anomalies)
# 피처 분석
if include_features:
result["feature_analysis"] = self._analyze_features(market_data)
# 리스크 평가
if include_risk:
result["risk_assessment"] = self._assess_risk_level(
result["anomaly_detection_results"]
)
# 실시간 알림
if include_alerts:
result["realtime_alerts"] = self._generate_realtime_alerts(
all_anomalies, alert_threshold
)
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch anomaly detection data: {e}")
def _get_period_days(self, period: str) -> int:
"""기간을 일수로 변환"""
period_map = {
"7d": 7,
"30d": 30,
"60d": 60,
"90d": 90
}
return period_map.get(period, 30)
def _perform_statistical_detection(self, market_data: List[Dict[str, Any]],
sensitivity: float) -> Dict[str, Any]:
"""통계적 이상 탐지 수행"""
# 가격 변동률 데이터 추출
returns = [item["daily_return"] for item in market_data if item.get("daily_return") is not None]
volumes = [item["volume"] for item in market_data if item.get("volume") is not None]
volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None]
# Z-score 기반 이상 탐지
z_score_anomalies = []
if returns:
z_score_anomalies.extend(self._detect_z_score_anomalies(returns, sensitivity, "daily_return"))
if volumes:
z_score_anomalies.extend(self._detect_z_score_anomalies(volumes, sensitivity, "volume"))
if volatilities:
z_score_anomalies.extend(self._detect_z_score_anomalies(volatilities, sensitivity, "volatility"))
# IQR 기반 이상 탐지
iqr_anomalies = []
if returns:
iqr_anomalies.extend(self._detect_iqr_anomalies(returns, "daily_return"))
if volumes:
iqr_anomalies.extend(self._detect_iqr_anomalies(volumes, "volume"))
return {
"z_score_anomalies": z_score_anomalies,
"iqr_anomalies": iqr_anomalies,
"anomaly_count": len(z_score_anomalies) + len(iqr_anomalies),
"anomaly_percentage": round((len(z_score_anomalies) + len(iqr_anomalies)) / len(market_data) * 100, 2),
"detected_anomalies": z_score_anomalies + iqr_anomalies
}
def _detect_z_score_anomalies(self, data: List[float], threshold: float,
feature_name: str = "value") -> List[Dict[str, Any]]:
"""Z-score 기반 이상치 탐지"""
if len(data) < 3:
return []
mean_val = statistics.mean(data)
std_val = statistics.stdev(data) if len(data) > 1 else 0
if std_val == 0:
return []
anomalies = []
for i, value in enumerate(data):
z_score = (value - mean_val) / std_val
if abs(z_score) > threshold:
anomalies.append({
"index": i,
"value": value,
"z_score": round(z_score, 4),
"feature": feature_name,
"severity": "high" if abs(z_score) > threshold * 1.5 else "medium"
})
return anomalies
def _detect_iqr_anomalies(self, data: List[float], feature_name: str = "value") -> List[Dict[str, Any]]:
"""IQR 기반 이상치 탐지"""
if len(data) < 4:
return []
sorted_data = sorted(data)
n = len(sorted_data)
q1_idx = n // 4
q3_idx = 3 * n // 4
q1 = sorted_data[q1_idx]
q3 = sorted_data[q3_idx]
iqr = q3 - q1
if iqr == 0:
return []
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
anomalies = []
for i, value in enumerate(data):
if value < lower_bound or value > upper_bound:
distance_from_bound = min(abs(value - lower_bound), abs(value - upper_bound))
severity = "high" if distance_from_bound > iqr else "medium"
anomalies.append({
"index": i,
"value": value,
"feature": feature_name,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
"severity": severity
})
return anomalies
def _perform_isolation_forest_detection(self, market_data: List[Dict[str, Any]],
contamination: float) -> Dict[str, Any]:
"""Isolation Forest 기반 이상 탐지 (간소화 구현)"""
# 피처 추출
features = self._extract_features(market_data)
if len(features) < 10:
return {"error": "Insufficient data for Isolation Forest"}
# 간소화된 Isolation Forest (실제로는 scikit-learn 사용 권장)
anomalies, scores, model_stats = self._detect_isolation_forest_anomalies(features, contamination)
return {
"anomalies_detected": len(anomalies),
"anomaly_scores": scores,
"feature_importance": self._calculate_feature_importance(features, anomalies),
"model_stats": model_stats,
"detected_anomalies": anomalies
}
def _extract_features(self, market_data: List[Dict[str, Any]]) -> List[List[float]]:
"""시장 데이터에서 피처 추출"""
features = []
for item in market_data:
feature_row = [
item.get("close_price", 0),
item.get("volume", 0) / 1000000, # 백만 단위로 스케일링
item.get("daily_return", 0),
item.get("volatility", 0),
item.get("vix", 20), # 기본값
item.get("put_call_ratio", 0.8) # 기본값
]
features.append(feature_row)
return features
def _detect_isolation_forest_anomalies(self, features: List[List[float]],
contamination: float) -> Tuple[List[Dict], List[float], Dict]:
"""간소화된 Isolation Forest 이상 탐지"""
# 간단한 이상치 점수 계산 (실제 Isolation Forest 알고리즘의 근사치)
n_samples = len(features)
n_features = len(features[0]) if features else 0
# 각 피처별 통계 계산
feature_stats = []
for i in range(n_features):
feature_values = [row[i] for row in features]
feature_stats.append({
"mean": statistics.mean(feature_values),
"std": statistics.stdev(feature_values) if len(feature_values) > 1 else 1.0
})
# 이상 점수 계산 (각 피처의 표준편차 기반)
scores = []
for row in features:
score = 0
for i, value in enumerate(row):
if feature_stats[i]["std"] > 0:
normalized_value = abs((value - feature_stats[i]["mean"]) / feature_stats[i]["std"])
score += normalized_value
scores.append(score / n_features)
# 임계값 계산 (상위 contamination 비율)
sorted_scores = sorted(scores, reverse=True)
threshold_idx = int(len(sorted_scores) * contamination)
threshold = sorted_scores[threshold_idx] if threshold_idx < len(sorted_scores) else max(scores)
# 이상치 추출
anomalies = []
for i, score in enumerate(scores):
if score > threshold:
anomalies.append({
"index": i,
"anomaly_score": round(score, 4),
"features": features[i],
"severity": "high" if score > threshold * 1.5 else "medium"
})
model_stats = {
"n_estimators": 100, # 가상의 트리 개수
"contamination": contamination,
"threshold": threshold,
"n_features": n_features
}
return anomalies, scores, model_stats
def _calculate_feature_importance(self, features: List[List[float]],
anomalies: List[Dict]) -> Dict[str, float]:
"""피처 중요도 계산"""
if not features or not anomalies:
return {}
n_features = len(features[0])
feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"]
# 각 피처별 분산 기여도 계산
importance = {}
for i in range(min(n_features, len(feature_names))):
feature_values = [row[i] for row in features]
if len(feature_values) > 1:
variance = statistics.variance(feature_values)
importance[feature_names[i]] = round(variance / sum([statistics.variance([row[j] for row in features]) for j in range(n_features) if len([row[j] for row in features]) > 1]), 4)
else:
importance[feature_names[i]] = 0.0
return importance
def _perform_timeseries_detection(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""시계열 이상 패턴 탐지"""
# 가격 데이터 추출
prices = [item["close_price"] for item in market_data if item.get("close_price") is not None]
volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None]
result = {}
# 계절성 이상 탐지 (간소화)
if len(prices) >= 20:
seasonal_anomalies = self._detect_seasonal_anomalies(prices)
result["seasonal_anomalies"] = seasonal_anomalies
# 구조적 변화점 탐지
if len(prices) >= 30:
structural_breaks = self._detect_structural_breaks(prices)
result["structural_breaks"] = structural_breaks
# 변동성 체제 변화 탐지
if len(volatilities) >= 20:
volatility_regimes = self._detect_volatility_regimes(volatilities)
result["volatility_regimes"] = volatility_regimes
# 트렌드 이상 탐지
trend_breaks = self._detect_trend_breaks(prices)
result["trend_breaks"] = trend_breaks
# 모든 탐지된 이상 패턴 수집
all_anomalies = []
for key, anomaly_list in result.items():
if isinstance(anomaly_list, list):
all_anomalies.extend(anomaly_list)
result["detected_anomalies"] = all_anomalies
return result
def _detect_seasonal_anomalies(self, prices: List[float]) -> List[Dict[str, Any]]:
"""계절성 이상 탐지 (간소화)"""
# 주간 패턴 분석 (7일 주기)
if len(prices) < 14:
return []
weekly_patterns = []
for i in range(0, len(prices) - 7, 7):
weekly_change = prices[i] - prices[i + 7] if i + 7 < len(prices) else 0
weekly_patterns.append(weekly_change)
if len(weekly_patterns) < 2:
return []
# 주간 패턴의 이상치 탐지
anomalies = self._detect_z_score_anomalies(weekly_patterns, 2.0, "weekly_pattern")
return [{"type": "seasonal", **anomaly} for anomaly in anomalies]
def _detect_structural_breaks(self, prices: List[float]) -> List[Dict[str, Any]]:
"""구조적 변화점 탐지 (간소화된 CUSUM)"""
if len(prices) < 20:
return []
# 누적 합계 기반 변화점 탐지
changes = [prices[i] - prices[i-1] for i in range(1, len(prices))]
mean_change = statistics.mean(changes)
cusum_pos = 0
cusum_neg = 0
threshold = 3 * statistics.stdev(changes) if len(changes) > 1 else 1.0
breaks = []
for i, change in enumerate(changes):
cusum_pos = max(0, cusum_pos + change - mean_change)
cusum_neg = min(0, cusum_neg + change - mean_change)
if cusum_pos > threshold or cusum_neg < -threshold:
breaks.append({
"type": "structural_break",
"breakpoint": i + 1,
"cusum_value": cusum_pos if cusum_pos > threshold else cusum_neg,
"severity": "high" if abs(cusum_pos if cusum_pos > threshold else cusum_neg) > threshold * 1.5 else "medium"
})
cusum_pos = 0
cusum_neg = 0
return breaks
def _detect_volatility_regimes(self, volatilities: List[float]) -> List[Dict[str, Any]]:
"""변동성 체제 탐지"""
if len(volatilities) < 10:
return []
# 변동성 수준 분류
mean_vol = statistics.mean(volatilities)
std_vol = statistics.stdev(volatilities) if len(volatilities) > 1 else 0
regimes = []
current_regime = None
regime_start = 0
for i, vol in enumerate(volatilities):
if vol > mean_vol + std_vol:
regime_type = "high_volatility"
elif vol < mean_vol - std_vol:
regime_type = "low_volatility"
else:
regime_type = "normal"
if regime_type != current_regime:
if current_regime is not None:
regimes.append({
"type": "volatility_regime",
"regime_type": current_regime,
"start_index": regime_start,
"end_index": i - 1,
"duration": i - regime_start,
"volatility_level": current_regime
})
current_regime = regime_type
regime_start = i
# 마지막 체제 추가
if current_regime is not None:
regimes.append({
"type": "volatility_regime",
"regime_type": current_regime,
"start_index": regime_start,
"end_index": len(volatilities) - 1,
"duration": len(volatilities) - regime_start,
"volatility_level": current_regime
})
return regimes
def _detect_trend_breaks(self, prices: List[float]) -> List[Dict[str, Any]]:
"""트렌드 변화점 탐지"""
if len(prices) < 10:
return []
# 이동평균을 이용한 트렌드 분석
window = 5
trends = []
for i in range(window, len(prices)):
before_avg = statistics.mean(prices[i-window:i])
current_price = prices[i]
if current_price > before_avg * 1.02: # 2% 이상 상승
trend = "up"
elif current_price < before_avg * 0.98: # 2% 이상 하락
trend = "down"
else:
trend = "sideways"
trends.append((i, trend))
# 트렌드 변화점 탐지
breaks = []
current_trend = None
for i, (idx, trend) in enumerate(trends):
if trend != current_trend:
if current_trend is not None:
breaks.append({
"type": "trend_break",
"breakpoint": idx,
"from_trend": current_trend,
"to_trend": trend,
"severity": "medium"
})
current_trend = trend
return breaks
def _generate_anomaly_summary(self, all_anomalies: List[Dict[str, Any]]) -> Dict[str, Any]:
"""종합 이상 징후 요약 생성"""
if not all_anomalies:
return {
"total_anomalies_detected": 0,
"severity_distribution": {"low": 0, "medium": 0, "high": 0},
"confidence_scores": {"average": 0.0}
}
# 심각도별 분포
severity_counts = {"low": 0, "medium": 0, "high": 0}
for anomaly in all_anomalies:
severity = anomaly.get("severity", "medium")
if severity in severity_counts:
severity_counts[severity] += 1
# 신뢰도 점수 계산
confidence_scores = [anomaly.get("confidence", 0.7) for anomaly in all_anomalies if "confidence" in anomaly]
avg_confidence = statistics.mean(confidence_scores) if confidence_scores else 0.7
return {
"total_anomalies_detected": len(all_anomalies),
"severity_distribution": severity_counts,
"confidence_scores": {
"average": round(avg_confidence, 3),
"min": round(min(confidence_scores), 3) if confidence_scores else 0.0,
"max": round(max(confidence_scores), 3) if confidence_scores else 0.0
}
}
def _analyze_features(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""피처 분석"""
features = self._extract_features(market_data)
if not features:
return {"error": "No features available for analysis"}
feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"]
n_features = min(len(features[0]), len(feature_names))
analysis = {}
for i in range(n_features):
feature_values = [row[i] for row in features]
analysis[feature_names[i]] = {
"mean": round(statistics.mean(feature_values), 4),
"std": round(statistics.stdev(feature_values) if len(feature_values) > 1 else 0, 4),
"min": round(min(feature_values), 4),
"max": round(max(feature_values), 4),
"correlation_with_anomalies": 0.5 # 간소화된 상관관계
}
return analysis
def _assess_risk_level(self, detection_results: Dict[str, Any]) -> Dict[str, Any]:
"""리스크 레벨 평가"""
total_anomalies = 0
severe_anomalies = 0
# 각 탐지 방법별 결과 집계
for method, result in detection_results.items():
if isinstance(result, dict) and "detected_anomalies" in result:
anomalies = result["detected_anomalies"]
total_anomalies += len(anomalies)
severe_anomalies += len([a for a in anomalies if a.get("severity") == "high"])
# 리스크 레벨 결정
if severe_anomalies > 5:
risk_level = "critical"
elif total_anomalies > 10:
risk_level = "high"
elif total_anomalies > 5:
risk_level = "medium"
else:
risk_level = "low"
# 신뢰도 계산
confidence = min(1.0, total_anomalies / 20) if total_anomalies > 0 else 0.1
return {
"overall_risk_level": risk_level,
"risk_factors": {
"total_anomalies": total_anomalies,
"severe_anomalies": severe_anomalies,
"anomaly_density": round(total_anomalies / 30, 3) # 30일 기준
},
"confidence_score": round(confidence, 3),
"recommended_actions": self._get_risk_recommendations(risk_level)
}
def _get_risk_recommendations(self, risk_level: str) -> List[str]:
"""리스크 레벨별 권장 조치"""
recommendations = {
"low": [
"정상적인 시장 상황입니다",
"정기 모니터링을 지속하세요"
],
"medium": [
"시장 변동성이 증가했습니다",
"포트폴리오 리스크를 점검하세요",
"주요 지표를 면밀히 모니터링하세요"
],
"high": [
"높은 수준의 이상 징후가 탐지되었습니다",
"포지션 규모를 축소하는 것을 고려하세요",
"헤지 전략을 검토하세요",
"시장 뉴스와 이벤트를 주의 깊게 관찰하세요"
],
"critical": [
"심각한 시장 이상 상황입니다",
"즉시 리스크 관리 조치를 취하세요",
"포트폴리오 재검토가 필요합니다",
"전문가 상담을 받는 것을 권장합니다"
]
}
return recommendations.get(risk_level, [])
def _generate_realtime_alerts(self, anomalies: List[Dict[str, Any]],
threshold: str) -> Dict[str, Any]:
"""실시간 알림 생성"""
# 임계값별 필터링
threshold_map = {"low": 3, "medium": 5, "high": 10}
min_anomalies = threshold_map.get(threshold, 5)
critical_anomalies = [a for a in anomalies if a.get("severity") == "high"]
if len(critical_anomalies) >= min_anomalies:
alert_level = "critical"
elif len(anomalies) >= min_anomalies:
alert_level = "warning"
else:
alert_level = "normal"
return {
"alert_level": alert_level,
"critical_anomalies": len(critical_anomalies),
"total_anomalies": len(anomalies),
"recommended_actions": self._get_alert_actions(alert_level),
"alert_timestamp": datetime.now().isoformat()
}
def _get_alert_actions(self, alert_level: str) -> List[str]:
"""알림 레벨별 조치 사항"""
actions = {
"normal": ["정상 상황입니다"],
"warning": [
"시장 상황을 주의 깊게 모니터링하세요",
"포트폴리오 리스크를 점검하세요"
],
"critical": [
"즉시 리스크 관리자에게 알리세요",
"긴급 포지션 검토가 필요합니다",
"시장 상황 분석을 수행하세요"
]
}
return actions.get(alert_level, [])