test_anomaly_tools.pyโข17.5 kB
"""์ด์ ์งํ ํ์ง ๋๊ตฌ ํ
์คํธ"""
import pytest
import random
import math
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
from src.tools.anomaly_tools import AnomalyDetectionTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestAnomalyDetectionTool:
"""์ด์ ์งํ ํ์ง ๋๊ตฌ ํ
์คํธ"""
@pytest.fixture
def mock_db_manager(self):
"""Mock ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋งค๋์ """
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock ์บ์ ๋งค๋์ """
return AsyncMock()
@pytest.fixture
def anomaly_tool(self, mock_db_manager, mock_cache_manager):
"""์ด์ ์งํ ํ์ง ๋๊ตฌ ์ธ์คํด์ค"""
return AnomalyDetectionTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_market_data(self):
"""์ํ ์์ฅ ๋ฐ์ดํฐ (์ ์ ํจํด + ์ด์์น ํฌํจ)"""
base_date = datetime.now().date()
data = []
for i in range(100): # 100์ผ ๋ฐ์ดํฐ
date = base_date - timedelta(days=i)
# ๋๋ถ๋ถ ์ ์ ๋ฐ์ดํฐ
if i < 90:
price = 2650 + random.gauss(0, 30) # ์ ์ ๋ณ๋
volume = 450000000 + random.gauss(0, 50000000) # ์ ์ ๊ฑฐ๋๋
volatility = 0.015 + random.uniform(-0.005, 0.005) # ์ ์ ๋ณ๋์ฑ
else:
# ์ด์์น ๋ฐ์ดํฐ (๋ง์ง๋ง 10์ผ)
price = 2650 + random.gauss(0, 150) # ํฐ ๋ณ๋
volume = 450000000 + random.gauss(0, 200000000) # ๊ฑฐ๋๋ ๊ธ์ฆ
volatility = 0.035 + random.uniform(-0.015, 0.015) # ๋์ ๋ณ๋์ฑ
data.append({
"date": date,
"market": "KOSPI",
"close_price": max(price, 100), # ์์ ๋ฐฉ์ง
"volume": max(int(volume), 1000000), # ์ต์๊ฐ ๋ณด์ฅ
"daily_return": random.gauss(0.001, 0.02),
"volatility": max(volatility, 0.001), # ์์ ๋ณด์ฅ
"vix": 20 + random.gauss(0, 5),
"put_call_ratio": 0.8 + random.uniform(-0.3, 0.3)
})
return data
def test_tool_initialization(self, anomaly_tool, mock_db_manager, mock_cache_manager):
"""๋๊ตฌ ์ด๊ธฐํ ํ
์คํธ"""
assert anomaly_tool.name == "detect_market_anomalies"
assert anomaly_tool.description is not None
assert "์ด์ ์งํ" in anomaly_tool.description or "anomaly" in anomaly_tool.description.lower()
assert anomaly_tool.db_manager == mock_db_manager
assert anomaly_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, anomaly_tool):
"""๋๊ตฌ ์ ์ ํ
์คํธ"""
definition = anomaly_tool.get_tool_definition()
assert definition.name == "detect_market_anomalies"
assert definition.description is not None
assert definition.inputSchema is not None
# ์
๋ ฅ ์คํค๋ง ๊ฒ์ฆ
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "market" in properties
assert "detection_methods" in properties
assert "lookback_period" in properties
assert "sensitivity" in properties
assert "include_realtime_alerts" in properties
# detection_methods ํ๋ผ๋ฏธํฐ ๊ฒ์ฆ
methods_prop = properties["detection_methods"]
assert methods_prop["type"] == "array"
assert "statistical" in str(methods_prop)
assert "isolation_forest" in str(methods_prop)
assert "timeseries" in str(methods_prop)
@pytest.mark.asyncio
async def test_execute_statistical_anomaly_detection(self, anomaly_tool, sample_market_data):
"""ํต๊ณ์ ์ด์ ํ์ง ํ
์คํธ"""
# ์บ์ ๋ฏธ์ค
anomaly_tool.cache_manager.get.return_value = None
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ์๋ต ์ค์
anomaly_tool.db_manager.fetch_all.return_value = sample_market_data
# ์คํ
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"],
"lookback_period": "30d",
"sensitivity": 2.0
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON ํ์ฑํ์ฌ ๋ด์ฉ ํ์ธ
import json
data = json.loads(content.text)
assert "timestamp" in data
assert "market" in data
assert "anomaly_detection_results" in data
# ํต๊ณ์ ์ด์ ํ์ง ๊ฒฐ๊ณผ ๊ฒ์ฆ
results = data["anomaly_detection_results"]
assert "statistical" in results
stat_results = results["statistical"]
assert "z_score_anomalies" in stat_results
assert "iqr_anomalies" in stat_results
assert "anomaly_count" in stat_results
assert "anomaly_percentage" in stat_results
@pytest.mark.asyncio
async def test_execute_isolation_forest_detection(self, anomaly_tool, sample_market_data):
"""Isolation Forest ์ด์ ํ์ง ํ
์คํธ"""
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.return_value = sample_market_data
# ์คํ
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["isolation_forest"],
"lookback_period": "60d",
"contamination": 0.1
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
results = data["anomaly_detection_results"]
assert "isolation_forest" in results
if_results = results["isolation_forest"]
assert "anomaly_scores" in if_results
assert "anomalies_detected" in if_results
assert "feature_importance" in if_results
assert "model_stats" in if_results
@pytest.mark.asyncio
async def test_execute_timeseries_anomaly_detection(self, anomaly_tool, sample_market_data):
"""์๊ณ์ด ์ด์ ํจํด ํ์ง ํ
์คํธ"""
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.return_value = sample_market_data
# ์คํ
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["timeseries"],
"lookback_period": "90d"
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
results = data["anomaly_detection_results"]
assert "timeseries" in results
ts_results = results["timeseries"]
assert "seasonal_anomalies" in ts_results
assert "trend_breaks" in ts_results
assert "volatility_regimes" in ts_results
assert "structural_breaks" in ts_results
@pytest.mark.asyncio
async def test_comprehensive_anomaly_detection(self, anomaly_tool, sample_market_data):
"""์ข
ํฉ ์ด์ ํ์ง ํ
์คํธ (๋ชจ๋ ๋ฐฉ๋ฒ)"""
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.return_value = sample_market_data
# ์คํ
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical", "isolation_forest", "timeseries"],
"lookback_period": "60d",
"sensitivity": 2.5,
"contamination": 0.05,
"include_feature_analysis": True,
"include_risk_assessment": True
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
assert "anomaly_detection_results" in data
assert "anomaly_summary" in data
assert "risk_assessment" in data
assert "feature_analysis" in data
# ๋ชจ๋ ํ์ง ๋ฐฉ๋ฒ ๊ฒฐ๊ณผ ํ์ธ
results = data["anomaly_detection_results"]
assert "statistical" in results
assert "isolation_forest" in results
assert "timeseries" in results
# ์ข
ํฉ ์์ฝ ํ์ธ
summary = data["anomaly_summary"]
assert "total_anomalies_detected" in summary
assert "severity_distribution" in summary
assert "confidence_scores" in summary
def test_z_score_calculation(self, anomaly_tool):
"""Z-score ์ด์ ํ์ง ๊ณ์ฐ ํ
์คํธ"""
# ์ ์ ๋ฐ์ดํฐ + ์ด์์น
data = [1, 2, 3, 2, 1, 2, 3, 50, 2, 1] # 50์ด ์ด์์น
anomalies = anomaly_tool._detect_z_score_anomalies(data, threshold=2.0)
assert len(anomalies) > 0
assert any(item["value"] == 50 for item in anomalies)
assert all(-1 <= item["z_score"] or item["z_score"] >= 1 for item in anomalies)
def test_iqr_anomaly_detection(self, anomaly_tool):
"""IQR ์ด์ ํ์ง ํ
์คํธ"""
# ์ด์์น๊ฐ ํฌํจ๋ ๋ฐ์ดํฐ
data = list(range(10, 20)) + [100, 105] # 100, 105๊ฐ ์ด์์น
anomalies = anomaly_tool._detect_iqr_anomalies(data)
assert len(anomalies) >= 2
assert any(item["value"] == 100 for item in anomalies)
assert any(item["value"] == 105 for item in anomalies)
def test_isolation_forest_model(self, anomaly_tool):
"""Isolation Forest ๋ชจ๋ธ ํ
์คํธ"""
# ๋ค์ฐจ์ ํผ์ฒ ๋ฐ์ดํฐ
features = [
[1, 2, 3],
[2, 3, 4],
[1, 1, 2],
[2, 2, 3],
[10, 15, 20], # ์ด์์น
[1, 2, 2],
[2, 3, 3]
]
anomalies, scores, model_stats = anomaly_tool._detect_isolation_forest_anomalies(
features, contamination=0.2
)
assert len(anomalies) > 0
assert len(scores) == len(features)
assert "n_estimators" in model_stats
assert "contamination" in model_stats
def test_volatility_regime_detection(self, anomaly_tool):
"""๋ณ๋์ฑ ์ฒด์ ํ์ง ํ
์คํธ"""
# ๋ณ๋์ฑ ๋ฐ์ดํฐ (๋ฎ์ -> ๋์ -> ๋ฎ์)
volatility_data = ([0.01] * 30 + [0.05] * 20 + [0.01] * 30)
regimes = anomaly_tool._detect_volatility_regimes(volatility_data)
assert len(regimes) >= 2 # ์ต์ 2๊ฐ์ ์ฒด์ ๋ณํ
assert all("start_index" in regime for regime in regimes)
assert all("regime_type" in regime for regime in regimes)
assert all("volatility_level" in regime for regime in regimes)
def test_structural_break_detection(self, anomaly_tool):
"""๊ตฌ์กฐ์ ๋ณํ์ ํ์ง ํ
์คํธ"""
# ๊ตฌ์กฐ์ ๋ณํ๊ฐ ์๋ ์๊ณ์ด (ํ๊ท ๋ณํ)
data = [10] * 50 + [20] * 50 # 50์ผ ํ ํ๊ท ๋ณํ
breaks = anomaly_tool._detect_structural_breaks(data)
assert len(breaks) > 0
# ๋ณํ์ ์ด ๋๋ต 50 ๊ทผ์ฒ์์ ๋ฐ๊ฒฌ๋์ด์ผ ํจ
assert any(40 < break_info["breakpoint"] < 60 for break_info in breaks)
@pytest.mark.asyncio
async def test_realtime_alert_system(self, anomaly_tool):
"""์ค์๊ฐ ์๋ฆผ ์์คํ
ํ
์คํธ"""
# ์ฌ๊ฐํ ์ด์ ์งํ ๋ฐ์ดํฐ
severe_anomaly_data = [{
"date": datetime.now().date(),
"market": "KOSPI",
"close_price": 1000, # ๊ธ๋ฝ
"volume": 1500000000, # ๊ฑฐ๋๋ ๊ธ์ฆ
"volatility": 0.08, # ๋์ ๋ณ๋์ฑ
"daily_return": -0.15, # ํฐ ํ๋ฝ
"vix": 45, # ๊ณตํฌ์ง์ ์์น
"put_call_ratio": 2.0 # Put ๊ธ์ฆ
}]
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.return_value = severe_anomaly_data
# ์คํ (์ค์๊ฐ ์๋ฆผ ํฌํจ)
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"],
"include_realtime_alerts": True,
"alert_threshold": "high"
})
content = result[0]
import json
data = json.loads(content.text)
# ์ค์๊ฐ ์๋ฆผ ์ ๋ณด ํ์ธ
if "realtime_alerts" in data:
alerts = data["realtime_alerts"]
assert "critical_anomalies" in alerts
assert "alert_level" in alerts
assert "recommended_actions" in alerts
@pytest.mark.asyncio
async def test_cache_functionality(self, anomaly_tool):
"""์บ์ ๊ธฐ๋ฅ ํ
์คํธ"""
# ์บ์ ํํธ ์๋๋ฆฌ์ค
cached_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"anomaly_detection_results": {
"statistical": {"anomaly_count": 5}
}
}
anomaly_tool.cache_manager.get.return_value = cached_data
# ์คํ
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"],
"lookback_period": "30d"
})
# ์บ์์์ ๋ฐ์ดํฐ ๋ฐํ ํ์ธ
content = result[0]
import json
data = json.loads(content.text)
assert data == cached_data
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ํธ์ถ ์์ ํ์ธ
anomaly_tool.db_manager.fetch_all.assert_not_called()
@pytest.mark.asyncio
async def test_error_handling(self, anomaly_tool):
"""์๋ฌ ์ฒ๋ฆฌ ํ
์คํธ"""
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB ์ฐ๊ฒฐ ์คํจ")
with pytest.raises(DatabaseConnectionError):
await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"]
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, anomaly_tool):
"""์๋ชป๋ ํ๋ผ๋ฏธํฐ ํ
์คํธ"""
# ์๋ชป๋ ์์ฅ
with pytest.raises(ValueError, match="Invalid market"):
await anomaly_tool.execute({
"market": "INVALID",
"detection_methods": ["statistical"]
})
# ๋น ํ์ง ๋ฐฉ๋ฒ ๋ชฉ๋ก
with pytest.raises(ValueError, match="At least one detection method"):
await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": []
})
# ์๋ชป๋ ๋ฏผ๊ฐ๋ ๊ฐ
with pytest.raises(ValueError, match="Invalid sensitivity"):
await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"],
"sensitivity": -1.0
})
@pytest.mark.asyncio
async def test_insufficient_data_handling(self, anomaly_tool):
"""๋ฐ์ดํฐ ๋ถ์กฑ ์ฒ๋ฆฌ ํ
์คํธ"""
# ๋ฐ์ดํฐ๊ฐ ๋ถ์กฑํ ๊ฒฝ์ฐ (5์ผ ๋ฐ์ดํฐ)
insufficient_data = [
{
"date": datetime.now().date() - timedelta(days=i),
"market": "KOSPI",
"close_price": 2650,
"volume": 450000000,
"volatility": 0.015
}
for i in range(5)
]
anomaly_tool.cache_manager.get.return_value = None
anomaly_tool.db_manager.fetch_all.return_value = insufficient_data
result = await anomaly_tool.execute({
"market": "KOSPI",
"detection_methods": ["statistical"],
"lookback_period": "30d"
})
content = result[0]
import json
data = json.loads(content.text)
assert "warning" in data or "insufficient data" in str(data).lower()
def test_feature_extraction(self, anomaly_tool, sample_market_data):
"""ํผ์ฒ ์ถ์ถ ํ
์คํธ"""
features = anomaly_tool._extract_features(sample_market_data[:30])
assert len(features) == 30
assert len(features[0]) > 5 # ์ต์ 5๊ฐ ํผ์ฒ
assert all(isinstance(row, list) for row in features)
assert all(isinstance(val, (int, float)) for row in features for val in row)
def test_risk_scoring(self, anomaly_tool):
"""๋ฆฌ์คํฌ ์ค์ฝ์ด๋ง ํ
์คํธ"""
# ๋ค์ํ ์ด์ ํ์ง ๊ฒฐ๊ณผ
detection_results = {
"statistical": {
"anomaly_count": 10,
"severe_anomalies": 3
},
"isolation_forest": {
"anomalies_detected": 8,
"avg_anomaly_score": -0.15
}
}
risk_assessment = anomaly_tool._assess_risk_level(detection_results)
assert "overall_risk_level" in risk_assessment
assert "risk_factors" in risk_assessment
assert "confidence_score" in risk_assessment
assert risk_assessment["overall_risk_level"] in ["low", "medium", "high", "critical"]