anomaly_tools.pyβ’34.4 kB
"""μ΄μ μ§ν νμ§ λꡬ"""
import json
import logging
import math
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class AnomalyDetectionTool(BaseTool):
"""μ΄μ μ§ν νμ§ λꡬ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5λΆ
@property
def name(self) -> str:
return "detect_market_anomalies"
@property
def description(self) -> str:
return "μμ₯ μ΄μ μ§νλ₯Ό νμ§ν©λλ€. ν΅κ³μ μ΄μμΉ, λ¨Έμ λ¬λ κΈ°λ° μ΄μ νμ§, μκ³μ΄ ν¨ν΄ λΆμμ μ§μν©λλ€."
def get_tool_definition(self) -> ToolSchema:
"""λꡬ μ μ λ°ν"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "KOSPI",
"description": "λΆμν μμ₯"
},
"detection_methods": {
"type": "array",
"items": {
"type": "string",
"enum": [
"statistical",
"isolation_forest",
"timeseries"
]
},
"minItems": 1,
"default": ["statistical"],
"description": "μ΄μ νμ§ λ°©λ² λͺ©λ‘"
},
"lookback_period": {
"type": "string",
"enum": ["7d", "30d", "60d", "90d"],
"default": "30d",
"description": "λΆμ κΈ°κ°"
},
"sensitivity": {
"type": "number",
"default": 2.0,
"minimum": 0.5,
"maximum": 5.0,
"description": "μ΄μ νμ§ λ―Όκ°λ (Z-score μκ³κ°)"
},
"contamination": {
"type": "number",
"default": 0.1,
"minimum": 0.01,
"maximum": 0.3,
"description": "Isolation Forest μ€μΌλ (μ΄μμΉ λΉμ¨)"
},
"include_feature_analysis": {
"type": "boolean",
"default": False,
"description": "νΌμ² μ€μλ λΆμ ν¬ν¨ μ¬λΆ"
},
"include_risk_assessment": {
"type": "boolean",
"default": False,
"description": "리μ€ν¬ νκ° ν¬ν¨ μ¬λΆ"
},
"include_realtime_alerts": {
"type": "boolean",
"default": False,
"description": "μ€μκ° μλ¦Ό ν¬ν¨ μ¬λΆ"
},
"alert_threshold": {
"type": "string",
"enum": ["low", "medium", "high"],
"default": "medium",
"description": "μλ¦Ό μκ³κ°"
}
},
"required": ["detection_methods"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""μ΄μ μ§ν νμ§ μ€ν"""
try:
# νλΌλ―Έν° μΆμΆ λ° κ²μ¦
market = arguments.get("market", "KOSPI")
detection_methods = arguments.get("detection_methods", ["statistical"])
lookback_period = arguments.get("lookback_period", "30d")
sensitivity = arguments.get("sensitivity", 2.0)
contamination = arguments.get("contamination", 0.1)
include_feature_analysis = arguments.get("include_feature_analysis", False)
include_risk_assessment = arguments.get("include_risk_assessment", False)
include_realtime_alerts = arguments.get("include_realtime_alerts", False)
alert_threshold = arguments.get("alert_threshold", "medium")
self._validate_parameters(market, detection_methods, lookback_period, sensitivity)
# μΊμ νμΈ
cache_key = self._generate_cache_key(
market, detection_methods, lookback_period, sensitivity, contamination
)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# λ°μ΄ν°λ² μ΄μ€μμ λ°μ΄ν° μ‘°ν
data = await self._fetch_anomaly_detection_data(
market, detection_methods, lookback_period, sensitivity, contamination,
include_feature_analysis, include_risk_assessment, include_realtime_alerts,
alert_threshold
)
# μΊμ μ μ₯
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Anomaly detection completed for {market}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in anomaly detection tool: {e}")
raise
def _validate_parameters(self, market: str, methods: List[str], period: str, sensitivity: float):
"""νλΌλ―Έν° κ²μ¦"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if not methods or len(methods) == 0:
raise ValueError("At least one detection method must be specified")
valid_methods = ["statistical", "isolation_forest", "timeseries"]
for method in methods:
if method not in valid_methods:
raise ValueError(f"Invalid detection method: {method}")
valid_periods = ["7d", "30d", "60d", "90d"]
if period not in valid_periods:
raise ValueError(f"Invalid lookback period: {period}")
if sensitivity < 0.5 or sensitivity > 5.0:
raise ValueError("Invalid sensitivity: must be between 0.5 and 5.0")
def _generate_cache_key(self, market: str, methods: List[str], period: str,
sensitivity: float, contamination: float) -> str:
"""μΊμ ν€ μμ±"""
methods_str = "_".join(sorted(methods))
return f"anomaly:{market}:{methods_str}:{period}:{sensitivity}:{contamination}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""λ°μ΄ν° μ μ λ νμΈ"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_anomaly_detection_data(self, market: str, methods: List[str], period: str,
sensitivity: float, contamination: float,
include_features: bool, include_risk: bool,
include_alerts: bool, alert_threshold: str) -> Dict[str, Any]:
"""λ°μ΄ν°λ² μ΄μ€μμ μ΄μ μ§ν νμ§ λ°μ΄ν° μ‘°ν λ° λΆμ"""
try:
days = self._get_period_days(period)
# 쿼리 ꡬμ±
query = """
SELECT date, close_price, volume, daily_return, volatility,
vix, put_call_ratio, market_cap
FROM market_anomaly_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC"
# λ°μ΄ν° μ‘°ν
market_data = await self.db_manager.fetch_all(query, *params)
# λ°μ΄ν° λΆμ‘± 체ν¬
if len(market_data) < 10: # μ΅μ 10μΌ λ°μ΄ν° νμ
return {
"timestamp": datetime.now().isoformat(),
"market": market,
"warning": "Insufficient data for anomaly detection (minimum 10 days required)",
"data_points": len(market_data)
}
# κΈ°λ³Έ κ²°κ³Ό ꡬμ±
result = {
"timestamp": datetime.now().isoformat(),
"market": market,
"lookback_period": period,
"anomaly_detection_results": {},
"anomaly_summary": {}
}
# κ° νμ§ λ°©λ² μ€ν
all_anomalies = []
for method in methods:
try:
if method == "statistical":
stat_result = self._perform_statistical_detection(market_data, sensitivity)
result["anomaly_detection_results"]["statistical"] = stat_result
all_anomalies.extend(stat_result.get("detected_anomalies", []))
elif method == "isolation_forest":
if_result = self._perform_isolation_forest_detection(market_data, contamination)
result["anomaly_detection_results"]["isolation_forest"] = if_result
all_anomalies.extend(if_result.get("detected_anomalies", []))
elif method == "timeseries":
ts_result = self._perform_timeseries_detection(market_data)
result["anomaly_detection_results"]["timeseries"] = ts_result
all_anomalies.extend(ts_result.get("detected_anomalies", []))
except Exception as e:
self.logger.warning(f"Failed to perform {method} detection: {e}")
result["anomaly_detection_results"][method] = {
"error": f"Detection failed: {str(e)}"
}
# μ’
ν© μμ½ μμ±
result["anomaly_summary"] = self._generate_anomaly_summary(all_anomalies)
# νΌμ² λΆμ
if include_features:
result["feature_analysis"] = self._analyze_features(market_data)
# 리μ€ν¬ νκ°
if include_risk:
result["risk_assessment"] = self._assess_risk_level(
result["anomaly_detection_results"]
)
# μ€μκ° μλ¦Ό
if include_alerts:
result["realtime_alerts"] = self._generate_realtime_alerts(
all_anomalies, alert_threshold
)
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch anomaly detection data: {e}")
def _get_period_days(self, period: str) -> int:
"""κΈ°κ°μ μΌμλ‘ λ³ν"""
period_map = {
"7d": 7,
"30d": 30,
"60d": 60,
"90d": 90
}
return period_map.get(period, 30)
def _perform_statistical_detection(self, market_data: List[Dict[str, Any]],
sensitivity: float) -> Dict[str, Any]:
"""ν΅κ³μ μ΄μ νμ§ μν"""
# κ°κ²© λ³λλ₯ λ°μ΄ν° μΆμΆ
returns = [item["daily_return"] for item in market_data if item.get("daily_return") is not None]
volumes = [item["volume"] for item in market_data if item.get("volume") is not None]
volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None]
# Z-score κΈ°λ° μ΄μ νμ§
z_score_anomalies = []
if returns:
z_score_anomalies.extend(self._detect_z_score_anomalies(returns, sensitivity, "daily_return"))
if volumes:
z_score_anomalies.extend(self._detect_z_score_anomalies(volumes, sensitivity, "volume"))
if volatilities:
z_score_anomalies.extend(self._detect_z_score_anomalies(volatilities, sensitivity, "volatility"))
# IQR κΈ°λ° μ΄μ νμ§
iqr_anomalies = []
if returns:
iqr_anomalies.extend(self._detect_iqr_anomalies(returns, "daily_return"))
if volumes:
iqr_anomalies.extend(self._detect_iqr_anomalies(volumes, "volume"))
return {
"z_score_anomalies": z_score_anomalies,
"iqr_anomalies": iqr_anomalies,
"anomaly_count": len(z_score_anomalies) + len(iqr_anomalies),
"anomaly_percentage": round((len(z_score_anomalies) + len(iqr_anomalies)) / len(market_data) * 100, 2),
"detected_anomalies": z_score_anomalies + iqr_anomalies
}
def _detect_z_score_anomalies(self, data: List[float], threshold: float,
feature_name: str = "value") -> List[Dict[str, Any]]:
"""Z-score κΈ°λ° μ΄μμΉ νμ§"""
if len(data) < 3:
return []
mean_val = statistics.mean(data)
std_val = statistics.stdev(data) if len(data) > 1 else 0
if std_val == 0:
return []
anomalies = []
for i, value in enumerate(data):
z_score = (value - mean_val) / std_val
if abs(z_score) > threshold:
anomalies.append({
"index": i,
"value": value,
"z_score": round(z_score, 4),
"feature": feature_name,
"severity": "high" if abs(z_score) > threshold * 1.5 else "medium"
})
return anomalies
def _detect_iqr_anomalies(self, data: List[float], feature_name: str = "value") -> List[Dict[str, Any]]:
"""IQR κΈ°λ° μ΄μμΉ νμ§"""
if len(data) < 4:
return []
sorted_data = sorted(data)
n = len(sorted_data)
q1_idx = n // 4
q3_idx = 3 * n // 4
q1 = sorted_data[q1_idx]
q3 = sorted_data[q3_idx]
iqr = q3 - q1
if iqr == 0:
return []
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
anomalies = []
for i, value in enumerate(data):
if value < lower_bound or value > upper_bound:
distance_from_bound = min(abs(value - lower_bound), abs(value - upper_bound))
severity = "high" if distance_from_bound > iqr else "medium"
anomalies.append({
"index": i,
"value": value,
"feature": feature_name,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
"severity": severity
})
return anomalies
def _perform_isolation_forest_detection(self, market_data: List[Dict[str, Any]],
contamination: float) -> Dict[str, Any]:
"""Isolation Forest κΈ°λ° μ΄μ νμ§ (κ°μν ꡬν)"""
# νΌμ² μΆμΆ
features = self._extract_features(market_data)
if len(features) < 10:
return {"error": "Insufficient data for Isolation Forest"}
# κ°μνλ Isolation Forest (μ€μ λ‘λ scikit-learn μ¬μ© κΆμ₯)
anomalies, scores, model_stats = self._detect_isolation_forest_anomalies(features, contamination)
return {
"anomalies_detected": len(anomalies),
"anomaly_scores": scores,
"feature_importance": self._calculate_feature_importance(features, anomalies),
"model_stats": model_stats,
"detected_anomalies": anomalies
}
def _extract_features(self, market_data: List[Dict[str, Any]]) -> List[List[float]]:
"""μμ₯ λ°μ΄ν°μμ νΌμ² μΆμΆ"""
features = []
for item in market_data:
feature_row = [
item.get("close_price", 0),
item.get("volume", 0) / 1000000, # λ°±λ§ λ¨μλ‘ μ€μΌμΌλ§
item.get("daily_return", 0),
item.get("volatility", 0),
item.get("vix", 20), # κΈ°λ³Έκ°
item.get("put_call_ratio", 0.8) # κΈ°λ³Έκ°
]
features.append(feature_row)
return features
def _detect_isolation_forest_anomalies(self, features: List[List[float]],
contamination: float) -> Tuple[List[Dict], List[float], Dict]:
"""κ°μνλ Isolation Forest μ΄μ νμ§"""
# κ°λ¨ν μ΄μμΉ μ μ κ³μ° (μ€μ Isolation Forest μκ³ λ¦¬μ¦μ κ·Όμ¬μΉ)
n_samples = len(features)
n_features = len(features[0]) if features else 0
# κ° νΌμ²λ³ ν΅κ³ κ³μ°
feature_stats = []
for i in range(n_features):
feature_values = [row[i] for row in features]
feature_stats.append({
"mean": statistics.mean(feature_values),
"std": statistics.stdev(feature_values) if len(feature_values) > 1 else 1.0
})
# μ΄μ μ μ κ³μ° (κ° νΌμ²μ νμ€νΈμ°¨ κΈ°λ°)
scores = []
for row in features:
score = 0
for i, value in enumerate(row):
if feature_stats[i]["std"] > 0:
normalized_value = abs((value - feature_stats[i]["mean"]) / feature_stats[i]["std"])
score += normalized_value
scores.append(score / n_features)
# μκ³κ° κ³μ° (μμ contamination λΉμ¨)
sorted_scores = sorted(scores, reverse=True)
threshold_idx = int(len(sorted_scores) * contamination)
threshold = sorted_scores[threshold_idx] if threshold_idx < len(sorted_scores) else max(scores)
# μ΄μμΉ μΆμΆ
anomalies = []
for i, score in enumerate(scores):
if score > threshold:
anomalies.append({
"index": i,
"anomaly_score": round(score, 4),
"features": features[i],
"severity": "high" if score > threshold * 1.5 else "medium"
})
model_stats = {
"n_estimators": 100, # κ°μμ νΈλ¦¬ κ°μ
"contamination": contamination,
"threshold": threshold,
"n_features": n_features
}
return anomalies, scores, model_stats
def _calculate_feature_importance(self, features: List[List[float]],
anomalies: List[Dict]) -> Dict[str, float]:
"""νΌμ² μ€μλ κ³μ°"""
if not features or not anomalies:
return {}
n_features = len(features[0])
feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"]
# κ° νΌμ²λ³ λΆμ° κΈ°μ¬λ κ³μ°
importance = {}
for i in range(min(n_features, len(feature_names))):
feature_values = [row[i] for row in features]
if len(feature_values) > 1:
variance = statistics.variance(feature_values)
importance[feature_names[i]] = round(variance / sum([statistics.variance([row[j] for row in features]) for j in range(n_features) if len([row[j] for row in features]) > 1]), 4)
else:
importance[feature_names[i]] = 0.0
return importance
def _perform_timeseries_detection(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""μκ³μ΄ μ΄μ ν¨ν΄ νμ§"""
# κ°κ²© λ°μ΄ν° μΆμΆ
prices = [item["close_price"] for item in market_data if item.get("close_price") is not None]
volatilities = [item["volatility"] for item in market_data if item.get("volatility") is not None]
result = {}
# κ³μ μ± μ΄μ νμ§ (κ°μν)
if len(prices) >= 20:
seasonal_anomalies = self._detect_seasonal_anomalies(prices)
result["seasonal_anomalies"] = seasonal_anomalies
# ꡬ쑰μ λ³νμ νμ§
if len(prices) >= 30:
structural_breaks = self._detect_structural_breaks(prices)
result["structural_breaks"] = structural_breaks
# λ³λμ± μ²΄μ λ³ν νμ§
if len(volatilities) >= 20:
volatility_regimes = self._detect_volatility_regimes(volatilities)
result["volatility_regimes"] = volatility_regimes
# νΈλ λ μ΄μ νμ§
trend_breaks = self._detect_trend_breaks(prices)
result["trend_breaks"] = trend_breaks
# λͺ¨λ νμ§λ μ΄μ ν¨ν΄ μμ§
all_anomalies = []
for key, anomaly_list in result.items():
if isinstance(anomaly_list, list):
all_anomalies.extend(anomaly_list)
result["detected_anomalies"] = all_anomalies
return result
def _detect_seasonal_anomalies(self, prices: List[float]) -> List[Dict[str, Any]]:
"""κ³μ μ± μ΄μ νμ§ (κ°μν)"""
# μ£Όκ° ν¨ν΄ λΆμ (7μΌ μ£ΌκΈ°)
if len(prices) < 14:
return []
weekly_patterns = []
for i in range(0, len(prices) - 7, 7):
weekly_change = prices[i] - prices[i + 7] if i + 7 < len(prices) else 0
weekly_patterns.append(weekly_change)
if len(weekly_patterns) < 2:
return []
# μ£Όκ° ν¨ν΄μ μ΄μμΉ νμ§
anomalies = self._detect_z_score_anomalies(weekly_patterns, 2.0, "weekly_pattern")
return [{"type": "seasonal", **anomaly} for anomaly in anomalies]
def _detect_structural_breaks(self, prices: List[float]) -> List[Dict[str, Any]]:
"""ꡬ쑰μ λ³νμ νμ§ (κ°μνλ CUSUM)"""
if len(prices) < 20:
return []
# λμ ν©κ³ κΈ°λ° λ³νμ νμ§
changes = [prices[i] - prices[i-1] for i in range(1, len(prices))]
mean_change = statistics.mean(changes)
cusum_pos = 0
cusum_neg = 0
threshold = 3 * statistics.stdev(changes) if len(changes) > 1 else 1.0
breaks = []
for i, change in enumerate(changes):
cusum_pos = max(0, cusum_pos + change - mean_change)
cusum_neg = min(0, cusum_neg + change - mean_change)
if cusum_pos > threshold or cusum_neg < -threshold:
breaks.append({
"type": "structural_break",
"breakpoint": i + 1,
"cusum_value": cusum_pos if cusum_pos > threshold else cusum_neg,
"severity": "high" if abs(cusum_pos if cusum_pos > threshold else cusum_neg) > threshold * 1.5 else "medium"
})
cusum_pos = 0
cusum_neg = 0
return breaks
def _detect_volatility_regimes(self, volatilities: List[float]) -> List[Dict[str, Any]]:
"""λ³λμ± μ²΄μ νμ§"""
if len(volatilities) < 10:
return []
# λ³λμ± μμ€ λΆλ₯
mean_vol = statistics.mean(volatilities)
std_vol = statistics.stdev(volatilities) if len(volatilities) > 1 else 0
regimes = []
current_regime = None
regime_start = 0
for i, vol in enumerate(volatilities):
if vol > mean_vol + std_vol:
regime_type = "high_volatility"
elif vol < mean_vol - std_vol:
regime_type = "low_volatility"
else:
regime_type = "normal"
if regime_type != current_regime:
if current_regime is not None:
regimes.append({
"type": "volatility_regime",
"regime_type": current_regime,
"start_index": regime_start,
"end_index": i - 1,
"duration": i - regime_start,
"volatility_level": current_regime
})
current_regime = regime_type
regime_start = i
# λ§μ§λ§ 체μ μΆκ°
if current_regime is not None:
regimes.append({
"type": "volatility_regime",
"regime_type": current_regime,
"start_index": regime_start,
"end_index": len(volatilities) - 1,
"duration": len(volatilities) - regime_start,
"volatility_level": current_regime
})
return regimes
def _detect_trend_breaks(self, prices: List[float]) -> List[Dict[str, Any]]:
"""νΈλ λ λ³νμ νμ§"""
if len(prices) < 10:
return []
# μ΄λνκ· μ μ΄μ©ν νΈλ λ λΆμ
window = 5
trends = []
for i in range(window, len(prices)):
before_avg = statistics.mean(prices[i-window:i])
current_price = prices[i]
if current_price > before_avg * 1.02: # 2% μ΄μ μμΉ
trend = "up"
elif current_price < before_avg * 0.98: # 2% μ΄μ νλ½
trend = "down"
else:
trend = "sideways"
trends.append((i, trend))
# νΈλ λ λ³νμ νμ§
breaks = []
current_trend = None
for i, (idx, trend) in enumerate(trends):
if trend != current_trend:
if current_trend is not None:
breaks.append({
"type": "trend_break",
"breakpoint": idx,
"from_trend": current_trend,
"to_trend": trend,
"severity": "medium"
})
current_trend = trend
return breaks
def _generate_anomaly_summary(self, all_anomalies: List[Dict[str, Any]]) -> Dict[str, Any]:
"""μ’
ν© μ΄μ μ§ν μμ½ μμ±"""
if not all_anomalies:
return {
"total_anomalies_detected": 0,
"severity_distribution": {"low": 0, "medium": 0, "high": 0},
"confidence_scores": {"average": 0.0}
}
# μ¬κ°λλ³ λΆν¬
severity_counts = {"low": 0, "medium": 0, "high": 0}
for anomaly in all_anomalies:
severity = anomaly.get("severity", "medium")
if severity in severity_counts:
severity_counts[severity] += 1
# μ λ’°λ μ μ κ³μ°
confidence_scores = [anomaly.get("confidence", 0.7) for anomaly in all_anomalies if "confidence" in anomaly]
avg_confidence = statistics.mean(confidence_scores) if confidence_scores else 0.7
return {
"total_anomalies_detected": len(all_anomalies),
"severity_distribution": severity_counts,
"confidence_scores": {
"average": round(avg_confidence, 3),
"min": round(min(confidence_scores), 3) if confidence_scores else 0.0,
"max": round(max(confidence_scores), 3) if confidence_scores else 0.0
}
}
def _analyze_features(self, market_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""νΌμ² λΆμ"""
features = self._extract_features(market_data)
if not features:
return {"error": "No features available for analysis"}
feature_names = ["price", "volume", "return", "volatility", "vix", "put_call_ratio"]
n_features = min(len(features[0]), len(feature_names))
analysis = {}
for i in range(n_features):
feature_values = [row[i] for row in features]
analysis[feature_names[i]] = {
"mean": round(statistics.mean(feature_values), 4),
"std": round(statistics.stdev(feature_values) if len(feature_values) > 1 else 0, 4),
"min": round(min(feature_values), 4),
"max": round(max(feature_values), 4),
"correlation_with_anomalies": 0.5 # κ°μνλ μκ΄κ΄κ³
}
return analysis
def _assess_risk_level(self, detection_results: Dict[str, Any]) -> Dict[str, Any]:
"""리μ€ν¬ λ 벨 νκ°"""
total_anomalies = 0
severe_anomalies = 0
# κ° νμ§ λ°©λ²λ³ κ²°κ³Ό μ§κ³
for method, result in detection_results.items():
if isinstance(result, dict) and "detected_anomalies" in result:
anomalies = result["detected_anomalies"]
total_anomalies += len(anomalies)
severe_anomalies += len([a for a in anomalies if a.get("severity") == "high"])
# 리μ€ν¬ λ 벨 κ²°μ
if severe_anomalies > 5:
risk_level = "critical"
elif total_anomalies > 10:
risk_level = "high"
elif total_anomalies > 5:
risk_level = "medium"
else:
risk_level = "low"
# μ λ’°λ κ³μ°
confidence = min(1.0, total_anomalies / 20) if total_anomalies > 0 else 0.1
return {
"overall_risk_level": risk_level,
"risk_factors": {
"total_anomalies": total_anomalies,
"severe_anomalies": severe_anomalies,
"anomaly_density": round(total_anomalies / 30, 3) # 30μΌ κΈ°μ€
},
"confidence_score": round(confidence, 3),
"recommended_actions": self._get_risk_recommendations(risk_level)
}
def _get_risk_recommendations(self, risk_level: str) -> List[str]:
"""리μ€ν¬ λ λ²¨λ³ κΆμ₯ μ‘°μΉ"""
recommendations = {
"low": [
"μ μμ μΈ μμ₯ μν©μ
λλ€",
"μ κΈ° λͺ¨λν°λ§μ μ§μνμΈμ"
],
"medium": [
"μμ₯ λ³λμ±μ΄ μ¦κ°νμ΅λλ€",
"ν¬νΈν΄λ¦¬μ€ 리μ€ν¬λ₯Ό μ κ²νμΈμ",
"μ£Όμ μ§νλ₯Ό λ©΄λ°ν λͺ¨λν°λ§νμΈμ"
],
"high": [
"λμ μμ€μ μ΄μ μ§νκ° νμ§λμμ΅λλ€",
"ν¬μ§μ
κ·λͺ¨λ₯Ό μΆμνλ κ²μ κ³ λ €νμΈμ",
"ν€μ§ μ λ΅μ κ²ν νμΈμ",
"μμ₯ λ΄μ€μ μ΄λ²€νΈλ₯Ό μ£Όμ κΉκ² κ΄μ°°νμΈμ"
],
"critical": [
"μ¬κ°ν μμ₯ μ΄μ μν©μ
λλ€",
"μ¦μ 리μ€ν¬ κ΄λ¦¬ μ‘°μΉλ₯Ό μ·¨νμΈμ",
"ν¬νΈν΄λ¦¬μ€ μ¬κ²ν κ° νμν©λλ€",
"μ λ¬Έκ° μλ΄μ λ°λ κ²μ κΆμ₯ν©λλ€"
]
}
return recommendations.get(risk_level, [])
def _generate_realtime_alerts(self, anomalies: List[Dict[str, Any]],
threshold: str) -> Dict[str, Any]:
"""μ€μκ° μλ¦Ό μμ±"""
# μκ³κ°λ³ νν°λ§
threshold_map = {"low": 3, "medium": 5, "high": 10}
min_anomalies = threshold_map.get(threshold, 5)
critical_anomalies = [a for a in anomalies if a.get("severity") == "high"]
if len(critical_anomalies) >= min_anomalies:
alert_level = "critical"
elif len(anomalies) >= min_anomalies:
alert_level = "warning"
else:
alert_level = "normal"
return {
"alert_level": alert_level,
"critical_anomalies": len(critical_anomalies),
"total_anomalies": len(anomalies),
"recommended_actions": self._get_alert_actions(alert_level),
"alert_timestamp": datetime.now().isoformat()
}
def _get_alert_actions(self, alert_level: str) -> List[str]:
"""μλ¦Ό λ λ²¨λ³ μ‘°μΉ μ¬ν"""
actions = {
"normal": ["μ μ μν©μ
λλ€"],
"warning": [
"μμ₯ μν©μ μ£Όμ κΉκ² λͺ¨λν°λ§νμΈμ",
"ν¬νΈν΄λ¦¬μ€ 리μ€ν¬λ₯Ό μ κ²νμΈμ"
],
"critical": [
"μ¦μ 리μ€ν¬ κ΄λ¦¬μμκ² μ리μΈμ",
"κΈ΄κΈ ν¬μ§μ
κ²ν κ° νμν©λλ€",
"μμ₯ μν© λΆμμ μννμΈμ"
]
}
return actions.get(alert_level, [])