test_market_anomaly_detector.pyโข19.7 kB
"""์์ฅ ์ด์ ํ์ง๊ธฐ ํ
์คํธ"""
import pytest
import asyncio
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.ai.market_anomaly_detector import MarketAnomalyDetector
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class TestMarketAnomalyDetector:
"""์์ฅ ์ด์ ํ์ง๊ธฐ ํ
์คํธ"""
@pytest.fixture
def detector_config(self):
"""ํ์ง๊ธฐ ์ค์ """
return {
"anomaly_threshold": 2.5,
"window_size": 20,
"min_anomaly_duration": 3,
"algorithms": ["isolation_forest", "statistical", "lstm_autoencoder"],
"sensitivity": 0.8,
"ensemble_method": "majority_vote",
"feature_weights": {
"price": 0.3,
"volume": 0.25,
"volatility": 0.2,
"sentiment": 0.15,
"technical_indicators": 0.1
}
}
@pytest.fixture
def anomaly_detector(self, detector_config):
"""์์ฅ ์ด์ ํ์ง๊ธฐ ์ธ์คํด์ค"""
return MarketAnomalyDetector(detector_config)
@pytest.fixture
def normal_market_data(self):
"""์ ์ ์์ฅ ๋ฐ์ดํฐ"""
return [
{
"timestamp": "2024-01-15T09:00:00",
"symbol": "005930",
"price": 75000,
"volume": 1000000,
"volatility": 0.02,
"sentiment_score": 0.1,
"rsi": 55
},
{
"timestamp": "2024-01-15T09:30:00",
"symbol": "005930",
"price": 75100,
"volume": 1050000,
"volatility": 0.021,
"sentiment_score": 0.12,
"rsi": 56
},
{
"timestamp": "2024-01-15T10:00:00",
"symbol": "005930",
"price": 75200,
"volume": 980000,
"volatility": 0.019,
"sentiment_score": 0.08,
"rsi": 57
}
]
@pytest.fixture
def anomalous_market_data(self):
"""์ด์ ์์ฅ ๋ฐ์ดํฐ"""
return [
{
"timestamp": "2024-01-15T11:00:00",
"symbol": "005930",
"price": 78000, # ๊ธ๋ฑ
"volume": 5000000, # ๊ฑฐ๋๋ ๊ธ์ฆ
"volatility": 0.08, # ๋ณ๋์ฑ ๊ธ์ฆ
"sentiment_score": 0.7, # ๊ฐํ ๊ธ์ ๊ฐ์
"rsi": 85 # ๊ณผ๋งค์
},
{
"timestamp": "2024-01-15T11:30:00",
"symbol": "005930",
"price": 79500,
"volume": 6000000,
"volatility": 0.09,
"sentiment_score": 0.8,
"rsi": 90
}
]
def test_detector_initialization(self, anomaly_detector, detector_config):
"""ํ์ง๊ธฐ ์ด๊ธฐํ ํ
์คํธ"""
assert anomaly_detector.anomaly_threshold == detector_config["anomaly_threshold"]
assert anomaly_detector.window_size == detector_config["window_size"]
assert anomaly_detector.min_anomaly_duration == detector_config["min_anomaly_duration"]
assert anomaly_detector.algorithms == detector_config["algorithms"]
assert anomaly_detector.sensitivity == detector_config["sensitivity"]
assert anomaly_detector.ensemble_method == detector_config["ensemble_method"]
assert anomaly_detector.feature_weights == detector_config["feature_weights"]
assert anomaly_detector.is_trained == False
@pytest.mark.asyncio
async def test_model_training(self, anomaly_detector, normal_market_data):
"""๋ชจ๋ธ ํ๋ จ ํ
์คํธ"""
# ํ๋ จ ์ ์ํ
assert anomaly_detector.is_trained == False
# ์ถฉ๋ถํ ํ๋ จ ๋ฐ์ดํฐ ์์ฑ (์ ์ ๋ฐ์ดํฐ)
training_data = normal_market_data * 30 # 90๊ฐ ๋ฐ์ดํฐํฌ์ธํธ
# ๋ชจ๋ธ ํ๋ จ
training_result = await anomaly_detector.train(training_data)
# ํ๋ จ ํ ์ํ
assert anomaly_detector.is_trained == True
assert "models" in training_result
assert "training_metrics" in training_result
assert "validation_score" in training_result
@pytest.mark.asyncio
async def test_single_point_anomaly_detection(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""๋จ์ผ ๋ฐ์ดํฐํฌ์ธํธ ์ด์ ํ์ง ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์ ์ ๋ฐ์ดํฐ ํ
์คํธ
normal_result = await anomaly_detector.detect_anomaly(normal_market_data[0])
assert "is_anomaly" in normal_result
assert "anomaly_score" in normal_result
assert "anomaly_type" in normal_result
assert "confidence" in normal_result
assert normal_result["is_anomaly"] == False
# ์ด์ ๋ฐ์ดํฐ ํ
์คํธ
anomalous_result = await anomaly_detector.detect_anomaly(anomalous_market_data[0])
assert anomalous_result["is_anomaly"] == True
assert anomalous_result["anomaly_score"] > normal_result["anomaly_score"]
@pytest.mark.asyncio
async def test_batch_anomaly_detection(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""๋ฐฐ์น ์ด์ ํ์ง ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ํผํฉ ๋ฐ์ดํฐ ๋ฐฐ์น
mixed_data = normal_market_data + anomalous_market_data
batch_results = await anomaly_detector.detect_batch_anomalies(mixed_data)
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
assert len(batch_results) == len(mixed_data)
# ์ ์ ๋ฐ์ดํฐ๋ ์ด์์ผ๋ก ํ์ง๋์ง ์์์ผ ํจ
for i in range(len(normal_market_data)):
assert batch_results[i]["is_anomaly"] == False
# ์ด์ ๋ฐ์ดํฐ๋ ์ด์์ผ๋ก ํ์ง๋์ด์ผ ํจ
for i in range(len(normal_market_data), len(mixed_data)):
assert batch_results[i]["is_anomaly"] == True
@pytest.mark.asyncio
async def test_time_series_anomaly_detection(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""์๊ณ์ด ์ด์ ํ์ง ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์๊ณ์ด ๋ฐ์ดํฐ
time_series_data = normal_market_data + anomalous_market_data + normal_market_data
time_series_results = await anomaly_detector.detect_time_series_anomalies(
time_series_data,
window_size=3
)
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
assert "anomaly_periods" in time_series_results
assert "anomaly_summary" in time_series_results
assert "trend_analysis" in time_series_results
# ์ด์ ๊ธฐ๊ฐ์ด ๊ฐ์ง๋์ด์ผ ํจ
anomaly_periods = time_series_results["anomaly_periods"]
assert len(anomaly_periods) > 0
# ๊ฐ ์ด์ ๊ธฐ๊ฐ์ ๋ํ ์ ๋ณด ํ์ธ
for period in anomaly_periods:
assert "start_time" in period
assert "end_time" in period
assert "severity" in period
assert "anomaly_type" in period
@pytest.mark.asyncio
async def test_anomaly_type_classification(self, anomaly_detector, normal_market_data):
"""์ด์ ์ ํ ๋ถ๋ฅ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ๋ค์ํ ์ ํ์ ์ด์ ๋ฐ์ดํฐ
price_anomaly = {
"timestamp": "2024-01-15T12:00:00",
"symbol": "005930",
"price": 85000, # ๊ธ๋ฑ
"volume": 1000000,
"volatility": 0.02,
"sentiment_score": 0.1,
"rsi": 55
}
volume_anomaly = {
"timestamp": "2024-01-15T12:30:00",
"symbol": "005930",
"price": 75000,
"volume": 10000000, # ๊ฑฐ๋๋ ๊ธ์ฆ
"volatility": 0.02,
"sentiment_score": 0.1,
"rsi": 55
}
volatility_anomaly = {
"timestamp": "2024-01-15T13:00:00",
"symbol": "005930",
"price": 75000,
"volume": 1000000,
"volatility": 0.15, # ๋ณ๋์ฑ ๊ธ์ฆ
"sentiment_score": 0.1,
"rsi": 55
}
# ๊ฐ ์ ํ๋ณ ํ์ง
price_result = await anomaly_detector.detect_anomaly(price_anomaly)
volume_result = await anomaly_detector.detect_anomaly(volume_anomaly)
volatility_result = await anomaly_detector.detect_anomaly(volatility_anomaly)
# ์ด์ ์ ํ์ด ์ฌ๋ฐ๋ฅด๊ฒ ๋ถ๋ฅ๋๋์ง ํ์ธ
assert price_result["anomaly_type"] in ["price_spike", "multi_feature"]
assert volume_result["anomaly_type"] in ["volume_spike", "multi_feature"]
assert volatility_result["anomaly_type"] in ["volatility_spike", "multi_feature"]
@pytest.mark.asyncio
async def test_anomaly_severity_scoring(self, anomaly_detector, normal_market_data):
"""์ด์ ์ฌ๊ฐ๋ ์ ์ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ๋ค์ํ ์ฌ๊ฐ๋์ ์ด์ ๋ฐ์ดํฐ
mild_anomaly = {
"timestamp": "2024-01-15T14:00:00",
"symbol": "005930",
"price": 76000, # ์ฝ๊ฐ ์์น
"volume": 1200000, # ์ฝ๊ฐ ์ฆ๊ฐ
"volatility": 0.025, # ์ฝ๊ฐ ์ฆ๊ฐ
"sentiment_score": 0.2,
"rsi": 60
}
severe_anomaly = {
"timestamp": "2024-01-15T14:30:00",
"symbol": "005930",
"price": 85000, # ๊ธ๋ฑ
"volume": 8000000, # ๊ธ์ฆ
"volatility": 0.12, # ๊ธ์ฆ
"sentiment_score": 0.8,
"rsi": 95
}
mild_result = await anomaly_detector.detect_anomaly(mild_anomaly)
severe_result = await anomaly_detector.detect_anomaly(severe_anomaly)
# ์ฌ๊ฐ๋ ์ ์ ๋น๊ต
assert severe_result["anomaly_score"] > mild_result["anomaly_score"]
assert severe_result["confidence"] >= mild_result["confidence"]
@pytest.mark.asyncio
async def test_ensemble_method_voting(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""์์๋ธ ๋ฐฉ๋ฒ ํฌํ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์์๋ธ ๊ฒฐ๊ณผ ํ์ธ
result = await anomaly_detector.detect_anomaly(anomalous_market_data[0])
assert "ensemble_scores" in result
assert "algorithm_votes" in result
# ๊ฐ ์๊ณ ๋ฆฌ์ฆ์ ๊ฒฐ๊ณผ๊ฐ ํฌํจ๋์ด์ผ ํจ
ensemble_scores = result["ensemble_scores"]
for algorithm in anomaly_detector.algorithms:
assert algorithm in ensemble_scores
@pytest.mark.asyncio
async def test_feature_importance_analysis(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""ํผ์ฒ ์ค์๋ ๋ถ์ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ํผ์ฒ ์ค์๋ ๋ถ์
importance_result = await anomaly_detector.analyze_feature_importance(anomalous_market_data[0])
assert "feature_contributions" in importance_result
assert "top_contributing_features" in importance_result
assert "anomaly_explanation" in importance_result
# ํผ์ฒ ๊ธฐ์ฌ๋ ํ์ธ
feature_contributions = importance_result["feature_contributions"]
assert all(feature in feature_contributions for feature in ["price", "volume", "volatility"])
@pytest.mark.asyncio
async def test_anomaly_threshold_adjustment(self, anomaly_detector, normal_market_data):
"""์ด์ ์๊ณ๊ฐ ์กฐ์ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์๊ณ๊ฐ ๋ณ๊ฒฝ ์ ์ค์
original_threshold = anomaly_detector.anomaly_threshold
# ์๊ณ๊ฐ ์กฐ์
new_threshold = 1.5
await anomaly_detector.adjust_threshold(new_threshold)
assert anomaly_detector.anomaly_threshold == new_threshold
# ์กฐ์ ๋ ์๊ณ๊ฐ์ผ๋ก ๊ฒ์ฆ
test_data = {
"timestamp": "2024-01-15T15:00:00",
"symbol": "005930",
"price": 76500,
"volume": 1500000,
"volatility": 0.03,
"sentiment_score": 0.3,
"rsi": 65
}
result = await anomaly_detector.detect_anomaly(test_data)
assert "threshold_used" in result
assert result["threshold_used"] == new_threshold
@pytest.mark.asyncio
async def test_anomaly_pattern_learning(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""์ด์ ํจํด ํ์ต ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์๋ก์ด ์ด์ ํจํด ํ์ต
new_anomaly_patterns = anomalous_market_data * 5
learning_result = await anomaly_detector.learn_new_patterns(new_anomaly_patterns)
assert "patterns_learned" in learning_result
assert "model_updated" in learning_result
assert learning_result["model_updated"] == True
# ํ์ต ํ ํ์ง ์ฑ๋ฅ ํ์ธ
post_learning_result = await anomaly_detector.detect_anomaly(anomalous_market_data[0])
assert post_learning_result["confidence"] > 0.7
@pytest.mark.asyncio
async def test_real_time_anomaly_detection(self, anomaly_detector, normal_market_data):
"""์ค์๊ฐ ์ด์ ํ์ง ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ์๋ฎฌ๋ ์ด์
streaming_data = []
for i in range(10):
data_point = {
"timestamp": f"2024-01-15T{16+i}:00:00",
"symbol": "005930",
"price": 75000 + i * 100,
"volume": 1000000 + i * 50000,
"volatility": 0.02 + i * 0.001,
"sentiment_score": 0.1 + i * 0.05,
"rsi": 55 + i
}
streaming_data.append(data_point)
# ์ค์๊ฐ ์ฒ๋ฆฌ
real_time_results = []
for data_point in streaming_data:
result = await anomaly_detector.process_real_time_data(data_point)
real_time_results.append(result)
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
assert len(real_time_results) == len(streaming_data)
for result in real_time_results:
assert "is_anomaly" in result
assert "processing_time" in result
assert result["processing_time"] < 1.0 # 1์ด ์ด๋ด ์ฒ๋ฆฌ
@pytest.mark.asyncio
async def test_anomaly_alert_system(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""์ด์ ์๋ฆผ ์์คํ
ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์๋ฆผ ์ค์
alert_config = {
"severity_threshold": "medium",
"notification_channels": ["email", "webhook"],
"cooldown_period": 300 # 5๋ถ
}
await anomaly_detector.configure_alerts(alert_config)
# ์ด์ ํ์ง ๋ฐ ์๋ฆผ ์์ฑ
alert_result = await anomaly_detector.detect_with_alerts(anomalous_market_data[0])
assert "alert_triggered" in alert_result
assert "alert_details" in alert_result
if alert_result["alert_triggered"]:
alert_details = alert_result["alert_details"]
assert "severity" in alert_details
assert "message" in alert_details
assert "timestamp" in alert_details
@pytest.mark.asyncio
async def test_error_handling(self, anomaly_detector):
"""์ค๋ฅ ์ฒ๋ฆฌ ํ
์คํธ"""
# ํ๋ จ๋์ง ์์ ๋ชจ๋ธ๋ก ํ์ง ์๋
with pytest.raises(ModelNotTrainedError):
await anomaly_detector.detect_anomaly({"price": 100})
# ๋ถ์ถฉ๋ถํ ๋ฐ์ดํฐ๋ก ํ๋ จ ์๋
insufficient_data = [{"price": 100}] # ๋๋ฌด ์ ์ ๋ฐ์ดํฐ
with pytest.raises(InsufficientDataError):
await anomaly_detector.train(insufficient_data)
# ์๋ชป๋ ๋ฐ์ดํฐ ํ์
await anomaly_detector.train([{"price": 100, "volume": 1000}] * 50)
invalid_data = {"invalid_field": "value"}
result = await anomaly_detector.detect_anomaly(invalid_data)
assert result["is_anomaly"] == False # ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ฒ๋ฆฌ
@pytest.mark.asyncio
async def test_performance_metrics(self, anomaly_detector, normal_market_data, anomalous_market_data):
"""์ฑ๋ฅ ๋ฉํธ๋ฆญ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
training_data = normal_market_data * 30
await anomaly_detector.train(training_data)
# ์ฑ๋ฅ ํ
์คํธ ๋ฐ์ดํฐ
test_data = normal_market_data + anomalous_market_data
start_time = time.time()
batch_results = await anomaly_detector.detect_batch_anomalies(test_data)
end_time = time.time()
processing_time = end_time - start_time
# ์ฑ๋ฅ ๋ฉํธ๋ฆญ ํ์ธ
metrics = anomaly_detector.get_performance_metrics()
assert "total_detections" in metrics
assert "average_processing_time" in metrics
assert "accuracy_metrics" in metrics
# ์ฒ๋ฆฌ ์๊ฐ์ด ํฉ๋ฆฌ์ ์ธ์ง ํ์ธ
assert processing_time < 10.0 # 10์ด ์ด๋ด
assert len(batch_results) == len(test_data)
def test_statistical_anomaly_detection(self, anomaly_detector):
"""ํต๊ณ์ ์ด์ ํ์ง ํ
์คํธ"""
# ์ ์ ๋ฐ์ดํฐ (ํ๊ท 100, ํ์คํธ์ฐจ 10)
normal_values = [100, 105, 95, 110, 90, 102, 98, 107, 93, 101]
# ์ด์ ๋ฐ์ดํฐ
anomalous_value = 200 # z-score > 3
# ํต๊ณ์ ์ด์ ํ์ง
z_score = anomaly_detector._calculate_z_score(anomalous_value, normal_values)
is_statistical_anomaly = anomaly_detector._is_statistical_anomaly(z_score)
assert abs(z_score) > 3 # ๊ฐํ ์ด์
assert is_statistical_anomaly == True
# ์ ์ ๊ฐ ํ
์คํธ
normal_z_score = anomaly_detector._calculate_z_score(105, normal_values)
is_normal = anomaly_detector._is_statistical_anomaly(normal_z_score)
assert abs(normal_z_score) < 2
assert is_normal == False