test_ai_prediction_system.py•70.9 kB
"""AI 예측 시스템 및 고급 데이터 분석 테스트"""
import pytest
import numpy as np
import pandas as pd
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any, Optional
from src.ai.price_predictor import PricePredictor
from src.ai.sentiment_analyzer import SentimentAnalyzer
from src.ai.market_anomaly_detector import MarketAnomalyDetector
from src.ai.risk_assessment_engine import RiskAssessmentEngine
from src.analytics.advanced_pattern_recognition import AdvancedPatternRecognition
from src.analytics.multi_timeframe_analyzer import MultiTimeframeAnalyzer
from src.analytics.correlation_engine import CorrelationEngine
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class TestPricePredictor:
"""가격 예측 모델 테스트"""
@pytest.fixture
def model_config(self):
"""모델 설정"""
return {
"model_type": "lstm",
"sequence_length": 60,
"prediction_horizon": 5,
"features": ["price", "volume", "rsi", "macd", "bollinger_bands"],
"epochs": 100,
"batch_size": 32,
"validation_split": 0.2,
"early_stopping": True,
"learning_rate": 0.001
}
@pytest.fixture
def price_predictor(self, model_config):
"""가격 예측기 인스턴스"""
return PricePredictor(model_config)
@pytest.fixture
def sample_market_data(self):
"""샘플 시장 데이터"""
dates = pd.date_range(start='2023-01-01', end='2024-01-01', freq='D')
np.random.seed(42)
# 가격 데이터 생성 (트렌드 + 노이즈)
trend = np.linspace(70000, 85000, len(dates))
noise = np.random.normal(0, 2000, len(dates))
prices = trend + noise
# 거래량 데이터
volumes = np.random.lognormal(15, 0.5, len(dates))
return pd.DataFrame({
'timestamp': dates,
'symbol': '005930', # 삼성전자
'price': prices,
'volume': volumes,
'high': prices * 1.02,
'low': prices * 0.98,
'open': prices * 1.001,
'close': prices
})
def test_predictor_initialization(self, price_predictor, model_config):
"""예측기 초기화 테스트"""
assert price_predictor.model_type == model_config["model_type"]
assert price_predictor.sequence_length == model_config["sequence_length"]
assert price_predictor.prediction_horizon == model_config["prediction_horizon"]
assert price_predictor.features == model_config["features"]
assert price_predictor.is_trained == False
assert price_predictor.model is None
@pytest.mark.asyncio
async def test_feature_engineering(self, price_predictor, sample_market_data):
"""피처 엔지니어링 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
# 기본 피처 확인
assert 'price' in features.columns
assert 'volume' in features.columns
# 기술적 지표 확인
assert 'rsi' in features.columns
assert 'macd' in features.columns
assert 'bollinger_upper' in features.columns
assert 'bollinger_lower' in features.columns
assert 'sma_20' in features.columns
assert 'ema_12' in features.columns
# 데이터 품질 확인
assert len(features) > 0
assert not features.isnull().all().any() # 모든 열이 null이 아님
@pytest.mark.asyncio
async def test_data_preprocessing(self, price_predictor, sample_market_data):
"""데이터 전처리 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 시퀀스 데이터 형태 확인
assert len(X.shape) == 3 # (samples, sequence_length, features)
assert X.shape[1] == price_predictor.sequence_length
assert len(y.shape) == 2 # (samples, prediction_horizon)
assert y.shape[1] == price_predictor.prediction_horizon
# 데이터 범위 확인 (정규화됨)
assert X.min() >= -3 and X.max() <= 3 # 표준화된 데이터
@pytest.mark.asyncio
async def test_model_training(self, price_predictor, sample_market_data):
"""모델 훈련 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 훈련 전 상태 확인
assert price_predictor.is_trained == False
# 모델 훈련
training_history = await price_predictor.train(X, y)
# 훈련 후 상태 확인
assert price_predictor.is_trained == True
assert price_predictor.model is not None
assert 'loss' in training_history.history
assert 'val_loss' in training_history.history
# 훈련 품질 확인
final_loss = training_history.history['loss'][-1]
assert final_loss < training_history.history['loss'][0] # 손실 감소
@pytest.mark.asyncio
async def test_price_prediction(self, price_predictor, sample_market_data):
"""가격 예측 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 최근 데이터로 예측
recent_data = features.tail(price_predictor.sequence_length)
predictions = await price_predictor.predict(recent_data)
# 예측 결과 확인
assert 'predictions' in predictions
assert 'confidence_intervals' in predictions
assert 'probability_distribution' in predictions
pred_values = predictions['predictions']
assert len(pred_values) == price_predictor.prediction_horizon
assert all(isinstance(p, (int, float)) for p in pred_values)
# 신뢰구간 확인
ci = predictions['confidence_intervals']
assert 'lower' in ci and 'upper' in ci
assert len(ci['lower']) == price_predictor.prediction_horizon
assert all(ci['lower'][i] <= pred_values[i] <= ci['upper'][i] for i in range(len(pred_values)))
@pytest.mark.asyncio
async def test_batch_prediction(self, price_predictor, sample_market_data):
"""배치 예측 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 여러 심볼 데이터
symbols = ['005930', '000660', '035420']
batch_data = []
for symbol in symbols:
symbol_data = sample_market_data.copy()
symbol_data['symbol'] = symbol
symbol_data['price'] = symbol_data['price'] * np.random.uniform(0.8, 1.2)
batch_data.append(symbol_data)
# 배치 예측
batch_predictions = await price_predictor.predict_batch(batch_data)
# 결과 확인
assert len(batch_predictions) == len(symbols)
for symbol, prediction in batch_predictions.items():
assert symbol in symbols
assert 'predictions' in prediction
assert len(prediction['predictions']) == price_predictor.prediction_horizon
@pytest.mark.asyncio
async def test_model_evaluation(self, price_predictor, sample_market_data):
"""모델 평가 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 훈련/테스트 분할
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# 모델 훈련
await price_predictor.train(X_train, y_train)
# 모델 평가
evaluation_metrics = await price_predictor.evaluate(X_test, y_test)
# 평가 지표 확인
assert 'mse' in evaluation_metrics
assert 'mae' in evaluation_metrics
assert 'rmse' in evaluation_metrics
assert 'mape' in evaluation_metrics
assert 'directional_accuracy' in evaluation_metrics
# 지표 범위 확인
assert evaluation_metrics['mse'] >= 0
assert evaluation_metrics['mae'] >= 0
assert evaluation_metrics['rmse'] >= 0
assert 0 <= evaluation_metrics['directional_accuracy'] <= 1
@pytest.mark.asyncio
async def test_model_save_load(self, price_predictor, sample_market_data, tmp_path):
"""모델 저장/로드 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 모델 저장
model_path = tmp_path / "test_model.h5"
await price_predictor.save_model(str(model_path))
assert model_path.exists()
# 새 예측기 인스턴스 생성
new_predictor = PricePredictor(price_predictor.config)
assert new_predictor.is_trained == False
# 모델 로드
await new_predictor.load_model(str(model_path))
assert new_predictor.is_trained == True
# 동일한 예측 결과 확인
recent_data = features.tail(price_predictor.sequence_length)
original_pred = await price_predictor.predict(recent_data)
loaded_pred = await new_predictor.predict(recent_data)
np.testing.assert_array_almost_equal(
original_pred['predictions'],
loaded_pred['predictions'],
decimal=4
)
@pytest.mark.asyncio
async def test_insufficient_data_error(self, price_predictor):
"""데이터 부족 에러 테스트"""
# 매우 적은 데이터
small_data = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=10, freq='D'),
'price': [100] * 10,
'volume': [1000] * 10
})
with pytest.raises(InsufficientDataError):
features = await price_predictor.engineer_features(small_data)
X, y = await price_predictor.preprocess_data(features)
@pytest.mark.asyncio
async def test_untrained_model_error(self, price_predictor, sample_market_data):
"""훈련되지 않은 모델 에러 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
recent_data = features.tail(price_predictor.sequence_length)
with pytest.raises(ModelNotTrainedError):
await price_predictor.predict(recent_data)
def test_model_architecture_creation(self, price_predictor):
"""모델 아키텍처 생성 테스트"""
input_shape = (price_predictor.sequence_length, len(price_predictor.features))
output_size = price_predictor.prediction_horizon
model = price_predictor._create_model(input_shape, output_size)
# 모델 구조 확인
assert model is not None
assert len(model.layers) > 0
assert model.input_shape[1:] == input_shape
assert model.output_shape[1] == output_size
class TestSentimentAnalyzer:
"""감정 분석기 테스트"""
@pytest.fixture
def analyzer_config(self):
"""분석기 설정"""
return {
"model_name": "klue/bert-base",
"max_length": 512,
"batch_size": 16,
"confidence_threshold": 0.7,
"sentiment_labels": ["negative", "neutral", "positive"],
"language": "ko"
}
@pytest.fixture
def sentiment_analyzer(self, analyzer_config):
"""감정 분석기 인스턴스"""
return SentimentAnalyzer(analyzer_config)
@pytest.fixture
def sample_news_data(self):
"""샘플 뉴스 데이터"""
return [
{
"title": "삼성전자 3분기 실적 예상치 상회, 주가 상승 기대",
"content": "삼성전자가 3분기 실적에서 시장 예상치를 크게 상회하며 주가 상승 모멘텀을 확보했다.",
"timestamp": "2024-01-15T10:30:00",
"source": "경제신문",
"symbols": ["005930"]
},
{
"title": "반도체 업계 전망 부정적, 메모리 가격 하락 우려",
"content": "글로벌 반도체 수요 둔화로 메모리 반도체 가격 하락이 예상된다는 분석이 나왔다.",
"timestamp": "2024-01-15T14:20:00",
"source": "IT뉴스",
"symbols": ["005930", "000660"]
},
{
"title": "코스피 지수 2500선 회복, 외국인 순매수 지속",
"content": "외국인 투자자들의 순매수세가 이어지면서 코스피가 2500선을 회복했다.",
"timestamp": "2024-01-15T16:00:00",
"source": "증권뉴스",
"symbols": ["KOSPI"]
}
]
def test_analyzer_initialization(self, sentiment_analyzer, analyzer_config):
"""분석기 초기화 테스트"""
assert sentiment_analyzer.model_name == analyzer_config["model_name"]
assert sentiment_analyzer.max_length == analyzer_config["max_length"]
assert sentiment_analyzer.batch_size == analyzer_config["batch_size"]
assert sentiment_analyzer.confidence_threshold == analyzer_config["confidence_threshold"]
assert sentiment_analyzer.sentiment_labels == analyzer_config["sentiment_labels"]
assert sentiment_analyzer.language == analyzer_config["language"]
@pytest.mark.asyncio
async def test_single_text_analysis(self, sentiment_analyzer):
"""단일 텍스트 감정 분석 테스트"""
positive_text = "삼성전자 주가가 크게 상승하며 투자자들의 기대감이 높아지고 있다."
result = await sentiment_analyzer.analyze_text(positive_text)
# 결과 구조 확인
assert 'sentiment' in result
assert 'confidence' in result
assert 'scores' in result
# 감정 라벨 확인
assert result['sentiment'] in sentiment_analyzer.sentiment_labels
# 신뢰도 확인
assert 0 <= result['confidence'] <= 1
# 점수 확인
scores = result['scores']
assert len(scores) == len(sentiment_analyzer.sentiment_labels)
assert abs(sum(scores.values()) - 1.0) < 0.01 # 합이 1에 가까움
@pytest.mark.asyncio
async def test_batch_text_analysis(self, sentiment_analyzer, sample_news_data):
"""배치 텍스트 감정 분석 테스트"""
texts = [news['title'] + ' ' + news['content'] for news in sample_news_data]
results = await sentiment_analyzer.analyze_batch(texts)
# 결과 개수 확인
assert len(results) == len(texts)
# 각 결과 확인
for result in results:
assert 'sentiment' in result
assert 'confidence' in result
assert 'scores' in result
assert result['sentiment'] in sentiment_analyzer.sentiment_labels
@pytest.mark.asyncio
async def test_news_sentiment_analysis(self, sentiment_analyzer, sample_news_data):
"""뉴스 감정 분석 테스트"""
results = await sentiment_analyzer.analyze_news(sample_news_data)
# 결과 구조 확인
assert len(results) == len(sample_news_data)
for i, result in enumerate(results):
assert 'news_id' in result
assert 'sentiment' in result
assert 'confidence' in result
assert 'symbols' in result
assert 'timestamp' in result
assert 'source' in result
# 원본 데이터와 매칭 확인
assert result['symbols'] == sample_news_data[i]['symbols']
assert result['timestamp'] == sample_news_data[i]['timestamp']
@pytest.mark.asyncio
async def test_symbol_sentiment_aggregation(self, sentiment_analyzer, sample_news_data):
"""심볼별 감정 집계 테스트"""
news_results = await sentiment_analyzer.analyze_news(sample_news_data)
symbol_sentiments = await sentiment_analyzer.aggregate_sentiment_by_symbol(news_results)
# 집계 결과 확인
assert '005930' in symbol_sentiments
assert 'KOSPI' in symbol_sentiments
# 집계 데이터 구조 확인
for symbol, sentiment_data in symbol_sentiments.items():
assert 'overall_sentiment' in sentiment_data
assert 'sentiment_score' in sentiment_data
assert 'news_count' in sentiment_data
assert 'confidence_avg' in sentiment_data
assert 'sentiment_distribution' in sentiment_data
# 점수 범위 확인
assert -1 <= sentiment_data['sentiment_score'] <= 1
assert sentiment_data['news_count'] > 0
@pytest.mark.asyncio
async def test_time_series_sentiment(self, sentiment_analyzer, sample_news_data):
"""시계열 감정 분석 테스트"""
# 시간대별 뉴스 데이터 생성
extended_news = []
for i in range(24): # 24시간
for news in sample_news_data:
news_copy = news.copy()
news_copy['timestamp'] = f"2024-01-15T{i:02d}:00:00"
extended_news.append(news_copy)
news_results = await sentiment_analyzer.analyze_news(extended_news)
time_series = await sentiment_analyzer.get_sentiment_time_series(news_results, interval='1H')
# 시계열 데이터 확인
assert len(time_series) > 0
for timestamp, sentiment_data in time_series.items():
assert 'sentiment_score' in sentiment_data
assert 'news_count' in sentiment_data
assert 'dominant_sentiment' in sentiment_data
@pytest.mark.asyncio
async def test_sentiment_impact_analysis(self, sentiment_analyzer, sample_news_data):
"""감정 영향도 분석 테스트"""
# 가격 데이터 (감정 분석 결과와 연동)
price_data = {
'005930': {
'2024-01-15T09:00:00': 75000,
'2024-01-15T15:00:00': 76500,
'2024-01-15T16:30:00': 77200
}
}
news_results = await sentiment_analyzer.analyze_news(sample_news_data)
impact_analysis = await sentiment_analyzer.analyze_sentiment_impact(news_results, price_data)
# 영향도 분석 결과 확인
assert 'correlation_analysis' in impact_analysis
assert 'impact_scores' in impact_analysis
assert 'significant_events' in impact_analysis
# 상관관계 분석 확인
correlation = impact_analysis['correlation_analysis']
assert 'sentiment_price_correlation' in correlation
assert -1 <= correlation['sentiment_price_correlation'] <= 1
def test_preprocessing_methods(self, sentiment_analyzer):
"""텍스트 전처리 메서드 테스트"""
raw_text = " 삼성전자!!! 주가가 @#$% 상승했습니다... "
# 텍스트 정제
cleaned = sentiment_analyzer._clean_text(raw_text)
assert cleaned.strip() == cleaned
assert not any(char in cleaned for char in ['@', '#', '$', '%'])
# 토큰화
tokens = sentiment_analyzer._tokenize(cleaned)
assert isinstance(tokens, list)
assert len(tokens) > 0
# 특수 토큰 처리
processed = sentiment_analyzer._prepare_input(cleaned)
assert isinstance(processed, dict)
assert 'input_ids' in processed
assert 'attention_mask' in processed
class TestMarketAnomalyDetector:
"""시장 이상 탐지기 테스트"""
@pytest.fixture
def detector_config(self):
"""탐지기 설정"""
return {
"algorithm": "isolation_forest",
"contamination": 0.1,
"window_size": 50,
"features": ["price_change", "volume_change", "volatility", "trading_intensity"],
"anomaly_threshold": 0.05,
"min_samples": 100
}
@pytest.fixture
def anomaly_detector(self, detector_config):
"""이상 탐지기 인스턴스"""
return MarketAnomalyDetector(detector_config)
@pytest.fixture
def sample_trading_data(self):
"""샘플 거래 데이터 (이상치 포함)"""
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2024-01-01', freq='5min')
# 정상 데이터
normal_prices = np.random.normal(75000, 1000, len(dates))
normal_volumes = np.random.lognormal(12, 0.5, len(dates))
# 이상치 주입
anomaly_indices = np.random.choice(len(dates), size=int(len(dates) * 0.02), replace=False)
prices = normal_prices.copy()
volumes = normal_volumes.copy()
# 가격 급등/급락 이상치
for idx in anomaly_indices[:len(anomaly_indices)//2]:
prices[idx] = normal_prices[idx] * np.random.uniform(1.1, 1.3) # 급등
for idx in anomaly_indices[len(anomaly_indices)//2:]:
prices[idx] = normal_prices[idx] * np.random.uniform(0.7, 0.9) # 급락
# 거래량 급증 이상치
volume_anomaly_indices = np.random.choice(len(dates), size=int(len(dates) * 0.01), replace=False)
for idx in volume_anomaly_indices:
volumes[idx] = normal_volumes[idx] * np.random.uniform(5, 10)
return pd.DataFrame({
'timestamp': dates,
'symbol': '005930',
'price': prices,
'volume': volumes,
'high': prices * 1.01,
'low': prices * 0.99,
'open': np.roll(prices, 1),
'close': prices
})
def test_detector_initialization(self, anomaly_detector, detector_config):
"""탐지기 초기화 테스트"""
assert anomaly_detector.algorithm == detector_config["algorithm"]
assert anomaly_detector.contamination == detector_config["contamination"]
assert anomaly_detector.window_size == detector_config["window_size"]
assert anomaly_detector.features == detector_config["features"]
assert anomaly_detector.is_trained == False
@pytest.mark.asyncio
async def test_feature_extraction(self, anomaly_detector, sample_trading_data):
"""피처 추출 테스트"""
features = await anomaly_detector.extract_features(sample_trading_data)
# 기본 피처 확인
expected_features = anomaly_detector.features
for feature in expected_features:
assert feature in features.columns
# 추가 파생 피처 확인
assert 'price_change_rate' in features.columns
assert 'volume_ma_ratio' in features.columns
assert 'price_volatility' in features.columns
# 데이터 품질 확인
assert len(features) > 0
assert not features.isnull().all().any()
@pytest.mark.asyncio
async def test_model_training(self, anomaly_detector, sample_trading_data):
"""모델 훈련 테스트"""
features = await anomaly_detector.extract_features(sample_trading_data)
# 훈련 전 상태
assert anomaly_detector.is_trained == False
# 모델 훈련
training_result = await anomaly_detector.train(features)
# 훈련 후 상태
assert anomaly_detector.is_trained == True
assert anomaly_detector.model is not None
# 훈련 결과 확인
assert 'training_score' in training_result
assert 'contamination_detected' in training_result
assert 'feature_importance' in training_result
@pytest.mark.asyncio
async def test_anomaly_detection(self, anomaly_detector, sample_trading_data):
"""이상 탐지 테스트"""
# 모델 훈련
features = await anomaly_detector.extract_features(sample_trading_data)
await anomaly_detector.train(features)
# 이상 탐지
anomalies = await anomaly_detector.detect_anomalies(sample_trading_data)
# 결과 구조 확인
assert 'anomaly_scores' in anomalies
assert 'is_anomaly' in anomalies
assert 'anomaly_details' in anomalies
assert 'severity_levels' in anomalies
# 이상치 탐지 확인
anomaly_flags = anomalies['is_anomaly']
assert isinstance(anomaly_flags, (list, np.ndarray))
assert len(anomaly_flags) == len(sample_trading_data)
# 이상치가 탐지되었는지 확인 (인위적으로 주입했으므로)
assert sum(anomaly_flags) > 0
@pytest.mark.asyncio
async def test_real_time_detection(self, anomaly_detector, sample_trading_data):
"""실시간 이상 탐지 테스트"""
# 모델 훈련
training_data = sample_trading_data.iloc[:-100] # 마지막 100개 제외하고 훈련
features = await anomaly_detector.extract_features(training_data)
await anomaly_detector.train(features)
# 실시간 데이터로 이상 탐지
real_time_data = sample_trading_data.iloc[-10:] # 최근 10개
for _, data_point in real_time_data.iterrows():
anomaly_result = await anomaly_detector.detect_real_time(data_point.to_dict())
assert 'is_anomaly' in anomaly_result
assert 'anomaly_score' in anomaly_result
assert 'severity' in anomaly_result
assert 'explanation' in anomaly_result
# 점수 범위 확인
assert -1 <= anomaly_result['anomaly_score'] <= 1
@pytest.mark.asyncio
async def test_anomaly_clustering(self, anomaly_detector, sample_trading_data):
"""이상치 클러스터링 테스트"""
# 모델 훈련
features = await anomaly_detector.extract_features(sample_trading_data)
await anomaly_detector.train(features)
# 이상치 탐지
anomalies = await anomaly_detector.detect_anomalies(sample_trading_data)
# 이상치 클러스터링
clusters = await anomaly_detector.cluster_anomalies(anomalies)
# 클러스터 결과 확인
assert 'cluster_labels' in clusters
assert 'cluster_centers' in clusters
assert 'cluster_descriptions' in clusters
# 클러스터 개수 확인
unique_clusters = len(set(clusters['cluster_labels']))
assert unique_clusters > 0
@pytest.mark.asyncio
async def test_anomaly_explanation(self, anomaly_detector, sample_trading_data):
"""이상치 설명 테스트"""
# 모델 훈련
features = await anomaly_detector.extract_features(sample_trading_data)
await anomaly_detector.train(features)
# 이상치 하나 선택
anomalies = await anomaly_detector.detect_anomalies(sample_trading_data)
anomaly_indices = np.where(anomalies['is_anomaly'])[0]
if len(anomaly_indices) > 0:
anomaly_idx = anomaly_indices[0]
explanation = await anomaly_detector.explain_anomaly(
sample_trading_data.iloc[anomaly_idx].to_dict()
)
# 설명 구조 확인
assert 'primary_reason' in explanation
assert 'contributing_factors' in explanation
assert 'feature_impacts' in explanation
assert 'severity_breakdown' in explanation
def test_algorithm_comparison(self, detector_config):
"""다양한 알고리즘 비교 테스트"""
algorithms = ['isolation_forest', 'one_class_svm', 'local_outlier_factor']
for algorithm in algorithms:
config = detector_config.copy()
config['algorithm'] = algorithm
detector = MarketAnomalyDetector(config)
assert detector.algorithm == algorithm
@pytest.mark.asyncio
async def test_insufficient_data_handling(self, anomaly_detector):
"""데이터 부족 처리 테스트"""
# 매우 적은 데이터
small_data = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=10, freq='5min'),
'price': [75000] * 10,
'volume': [1000] * 10
})
with pytest.raises(InsufficientDataError):
features = await anomaly_detector.extract_features(small_data)
await anomaly_detector.train(features)
class TestRiskAssessmentEngine:
"""리스크 평가 엔진 테스트"""
@pytest.fixture
def engine_config(self):
"""엔진 설정"""
return {
"risk_models": ["var", "expected_shortfall", "max_drawdown"],
"confidence_levels": [0.95, 0.99],
"time_horizons": [1, 5, 22], # 1일, 1주, 1개월
"portfolio_optimization": True,
"stress_testing": True,
"monte_carlo_simulations": 10000
}
@pytest.fixture
def risk_engine(self, engine_config):
"""리스크 평가 엔진 인스턴스"""
return RiskAssessmentEngine(engine_config)
@pytest.fixture
def portfolio_data(self):
"""포트폴리오 데이터"""
return {
"assets": {
"005930": {"weight": 0.4, "shares": 100, "avg_price": 75000},
"000660": {"weight": 0.3, "shares": 200, "avg_price": 45000},
"035420": {"weight": 0.2, "shares": 50, "avg_price": 180000},
"207940": {"weight": 0.1, "shares": 150, "avg_price": 35000}
},
"total_value": 20000000, # 2천만원
"currency": "KRW",
"rebalancing_frequency": "monthly"
}
def test_engine_initialization(self, risk_engine, engine_config):
"""엔진 초기화 테스트"""
assert risk_engine.risk_models == engine_config["risk_models"]
assert risk_engine.confidence_levels == engine_config["confidence_levels"]
assert risk_engine.time_horizons == engine_config["time_horizons"]
assert risk_engine.portfolio_optimization == engine_config["portfolio_optimization"]
assert risk_engine.stress_testing == engine_config["stress_testing"]
@pytest.mark.asyncio
async def test_var_calculation(self, risk_engine, portfolio_data):
"""VaR 계산 테스트"""
# 포트폴리오 수익률 시뮬레이션
np.random.seed(42)
returns = np.random.normal(0.001, 0.02, 1000) # 일간 수익률
var_results = await risk_engine.calculate_var(returns, portfolio_data)
# 결과 구조 확인
for confidence in risk_engine.confidence_levels:
assert confidence in var_results
for horizon in risk_engine.time_horizons:
assert horizon in var_results[confidence]
var_value = var_results[confidence][horizon]
assert isinstance(var_value, (int, float))
assert var_value < 0 # VaR는 음수 (손실)
@pytest.mark.asyncio
async def test_expected_shortfall(self, risk_engine, portfolio_data):
"""Expected Shortfall 계산 테스트"""
np.random.seed(42)
returns = np.random.normal(0.001, 0.02, 1000)
es_results = await risk_engine.calculate_expected_shortfall(returns, portfolio_data)
# ES 결과 확인
for confidence in risk_engine.confidence_levels:
assert confidence in es_results
for horizon in risk_engine.time_horizons:
assert horizon in es_results[confidence]
es_value = es_results[confidence][horizon]
assert isinstance(es_value, (int, float))
assert es_value < 0 # ES는 음수 (손실)
@pytest.mark.asyncio
async def test_stress_testing(self, risk_engine, portfolio_data):
"""스트레스 테스트"""
# 스트레스 시나리오 정의
stress_scenarios = {
"market_crash": {"market_shock": -0.3, "volatility_increase": 2.0},
"interest_rate_shock": {"rate_change": 0.02, "duration_impact": -0.1},
"sector_rotation": {"tech_shock": -0.2, "finance_boost": 0.1}
}
stress_results = await risk_engine.run_stress_tests(portfolio_data, stress_scenarios)
# 스트레스 테스트 결과 확인
assert len(stress_results) == len(stress_scenarios)
for scenario_name in stress_scenarios:
assert scenario_name in stress_results
scenario_result = stress_results[scenario_name]
assert 'portfolio_pnl' in scenario_result
assert 'asset_impacts' in scenario_result
assert 'risk_metrics' in scenario_result
@pytest.mark.asyncio
async def test_monte_carlo_simulation(self, risk_engine, portfolio_data):
"""몬테카를로 시뮬레이션 테스트"""
# 자산별 수익률 분포 파라미터
asset_params = {
"005930": {"mean": 0.0008, "std": 0.025, "skew": -0.2},
"000660": {"mean": 0.0006, "std": 0.030, "skew": 0.1},
"035420": {"mean": 0.0012, "std": 0.035, "skew": -0.1},
"207940": {"mean": 0.0010, "std": 0.028, "skew": 0.0}
}
mc_results = await risk_engine.monte_carlo_simulation(
portfolio_data,
asset_params,
time_horizon=22, # 1개월
num_simulations=1000
)
# 시뮬레이션 결과 확인
assert 'portfolio_returns' in mc_results
assert 'var_estimates' in mc_results
assert 'expected_return' in mc_results
assert 'return_distribution' in mc_results
# 포트폴리오 수익률 확인
portfolio_returns = mc_results['portfolio_returns']
assert len(portfolio_returns) == 1000
assert isinstance(portfolio_returns, (list, np.ndarray))
@pytest.mark.asyncio
async def test_portfolio_optimization(self, risk_engine, portfolio_data):
"""포트폴리오 최적화 테스트"""
# 자산별 기대수익률과 공분산 행렬
expected_returns = np.array([0.08, 0.06, 0.12, 0.10]) # 연간
cov_matrix = np.array([
[0.04, 0.012, 0.008, 0.006],
[0.012, 0.09, 0.015, 0.009],
[0.008, 0.015, 0.16, 0.012],
[0.006, 0.009, 0.012, 0.12]
])
optimization_result = await risk_engine.optimize_portfolio(
expected_returns,
cov_matrix,
portfolio_data
)
# 최적화 결과 확인
assert 'optimal_weights' in optimization_result
assert 'expected_return' in optimization_result
assert 'portfolio_risk' in optimization_result
assert 'sharpe_ratio' in optimization_result
# 가중치 합계 확인
weights = optimization_result['optimal_weights']
assert abs(sum(weights) - 1.0) < 0.01 # 합이 1
assert all(w >= 0 for w in weights) # 음수 가중치 없음
@pytest.mark.asyncio
async def test_risk_attribution(self, risk_engine, portfolio_data):
"""리스크 기여도 분석 테스트"""
# 자산별 수익률 시계열
asset_returns = {
"005930": np.random.normal(0.0008, 0.025, 250),
"000660": np.random.normal(0.0006, 0.030, 250),
"035420": np.random.normal(0.0012, 0.035, 250),
"207940": np.random.normal(0.0010, 0.028, 250)
}
attribution_result = await risk_engine.analyze_risk_attribution(
portfolio_data,
asset_returns
)
# 기여도 분석 결과 확인
assert 'total_risk' in attribution_result
assert 'risk_contributions' in attribution_result
assert 'marginal_risk' in attribution_result
assert 'component_var' in attribution_result
# 기여도 합계 확인
contributions = attribution_result['risk_contributions']
total_contribution = sum(contributions.values())
assert abs(total_contribution - attribution_result['total_risk']) < 0.01
@pytest.mark.asyncio
async def test_risk_monitoring(self, risk_engine, portfolio_data):
"""리스크 모니터링 테스트"""
# 리스크 한계값 설정
risk_limits = {
"portfolio_var_95": 1000000, # 100만원
"concentration_limit": 0.5, # 50%
"sector_exposure": 0.3, # 30%
"daily_loss_limit": 500000 # 50만원
}
monitoring_result = await risk_engine.monitor_risk_limits(
portfolio_data,
risk_limits
)
# 모니터링 결과 확인
assert 'limit_breaches' in monitoring_result
assert 'current_exposures' in monitoring_result
assert 'risk_warnings' in monitoring_result
assert 'recommended_actions' in monitoring_result
def test_risk_metrics_calculation(self, risk_engine):
"""리스크 메트릭 계산 테스트"""
# 샘플 수익률 데이터
returns = np.array([0.01, -0.02, 0.015, -0.005, 0.008, -0.012, 0.003])
# 샤프 비율
sharpe = risk_engine._calculate_sharpe_ratio(returns, risk_free_rate=0.02)
assert isinstance(sharpe, (int, float))
# 최대 낙폭
max_dd = risk_engine._calculate_max_drawdown(returns)
assert isinstance(max_dd, (int, float))
assert max_dd <= 0 # 낙폭은 음수 또는 0
# 변동성
volatility = risk_engine._calculate_volatility(returns)
assert isinstance(volatility, (int, float))
assert volatility >= 0
class TestAdvancedPatternRecognition:
"""고급 패턴 인식 테스트"""
@pytest.fixture
def pattern_config(self):
"""패턴 인식 설정"""
return {
"pattern_types": ["head_shoulders", "triangle", "flag", "cup_handle", "double_top"],
"min_pattern_length": 20,
"max_pattern_length": 100,
"confidence_threshold": 0.8,
"timeframes": ["5m", "15m", "1h", "4h", "1d"]
}
@pytest.fixture
def pattern_recognizer(self, pattern_config):
"""패턴 인식기 인스턴스"""
return AdvancedPatternRecognition(pattern_config)
@pytest.fixture
def sample_price_patterns(self):
"""샘플 가격 패턴 데이터"""
# 헤드앤숄더 패턴 시뮬레이션
def create_head_shoulders(base_price=100, length=60):
x = np.linspace(0, 4*np.pi, length)
# 왼쪽 어깨, 헤드, 오른쪽 어깨
pattern = base_price + 5*np.sin(x) + 10*np.sin(x/2) * np.exp(-x/8)
return pattern
# 삼각형 패턴 시뮬레이션
def create_triangle(base_price=100, length=50):
# 수렴하는 삼각형
upper_line = np.linspace(base_price + 10, base_price + 2, length)
lower_line = np.linspace(base_price - 5, base_price - 1, length)
noise = np.random.normal(0, 0.5, length)
pattern = []
for i in range(length):
if i % 4 < 2: # 상한선 근처
pattern.append(upper_line[i] + noise[i])
else: # 하한선 근처
pattern.append(lower_line[i] + noise[i])
return np.array(pattern)
patterns_data = []
dates = pd.date_range('2023-01-01', periods=300, freq='1H')
# 다양한 패턴 생성
for i in range(0, 300, 80):
if i + 60 <= 300:
if i % 160 == 0: # 헤드앤숄더
prices = create_head_shoulders(75000 + i*10, 60)
pattern_type = "head_shoulders"
else: # 삼각형
prices = create_triangle(75000 + i*10, 50)
pattern_type = "triangle"
for j, price in enumerate(prices):
if i + j < len(dates):
patterns_data.append({
'timestamp': dates[i + j],
'symbol': '005930',
'price': price,
'volume': np.random.lognormal(12, 0.3),
'high': price * 1.005,
'low': price * 0.995,
'open': price * 1.001,
'close': price,
'actual_pattern': pattern_type if j == len(prices) - 1 else None
})
return pd.DataFrame(patterns_data)
def test_recognizer_initialization(self, pattern_recognizer, pattern_config):
"""패턴 인식기 초기화 테스트"""
assert pattern_recognizer.pattern_types == pattern_config["pattern_types"]
assert pattern_recognizer.min_pattern_length == pattern_config["min_pattern_length"]
assert pattern_recognizer.max_pattern_length == pattern_config["max_pattern_length"]
assert pattern_recognizer.confidence_threshold == pattern_config["confidence_threshold"]
@pytest.mark.asyncio
async def test_pattern_detection(self, pattern_recognizer, sample_price_patterns):
"""패턴 탐지 테스트"""
detected_patterns = await pattern_recognizer.detect_patterns(sample_price_patterns)
# 탐지된 패턴 확인
assert isinstance(detected_patterns, list)
assert len(detected_patterns) > 0
for pattern in detected_patterns:
assert 'pattern_type' in pattern
assert 'confidence' in pattern
assert 'start_index' in pattern
assert 'end_index' in pattern
assert 'price_target' in pattern
assert 'support_resistance' in pattern
# 패턴 타입 검증
assert pattern['pattern_type'] in pattern_recognizer.pattern_types
assert 0 <= pattern['confidence'] <= 1
@pytest.mark.asyncio
async def test_head_shoulders_detection(self, pattern_recognizer, sample_price_patterns):
"""헤드앤숄더 패턴 탐지 테스트"""
hs_patterns = await pattern_recognizer._detect_head_shoulders(sample_price_patterns)
# 헤드앤숄더 패턴 구조 확인
for pattern in hs_patterns:
assert 'left_shoulder' in pattern
assert 'head' in pattern
assert 'right_shoulder' in pattern
assert 'neckline' in pattern
assert 'target_price' in pattern
# 패턴 논리 검증
assert pattern['head']['price'] > pattern['left_shoulder']['price']
assert pattern['head']['price'] > pattern['right_shoulder']['price']
@pytest.mark.asyncio
async def test_triangle_pattern_detection(self, pattern_recognizer, sample_price_patterns):
"""삼각형 패턴 탐지 테스트"""
triangle_patterns = await pattern_recognizer._detect_triangles(sample_price_patterns)
# 삼각형 패턴 구조 확인
for pattern in triangle_patterns:
assert 'triangle_type' in pattern # ascending, descending, symmetrical
assert 'upper_trendline' in pattern
assert 'lower_trendline' in pattern
assert 'breakout_direction' in pattern
assert 'volume_confirmation' in pattern
@pytest.mark.asyncio
async def test_multi_timeframe_analysis(self, pattern_recognizer, sample_price_patterns):
"""다중 시간프레임 분석 테스트"""
# 다양한 시간프레임으로 리샘플링
timeframes = ['15min', '1H', '4H']
multi_tf_analysis = await pattern_recognizer.analyze_multiple_timeframes(
sample_price_patterns,
timeframes
)
# 시간프레임별 결과 확인
assert len(multi_tf_analysis) == len(timeframes)
for tf, patterns in multi_tf_analysis.items():
assert tf in timeframes
assert isinstance(patterns, list)
# 패턴 일관성 확인
for pattern in patterns:
assert 'timeframe' in pattern
assert pattern['timeframe'] == tf
@pytest.mark.asyncio
async def test_pattern_validation(self, pattern_recognizer, sample_price_patterns):
"""패턴 검증 테스트"""
detected_patterns = await pattern_recognizer.detect_patterns(sample_price_patterns)
# 각 패턴에 대해 검증 수행
for pattern in detected_patterns:
validation_result = await pattern_recognizer.validate_pattern(
sample_price_patterns,
pattern
)
assert 'is_valid' in validation_result
assert 'validation_score' in validation_result
assert 'validation_reasons' in validation_result
# 검증 점수 확인
assert 0 <= validation_result['validation_score'] <= 1
@pytest.mark.asyncio
async def test_pattern_completion_prediction(self, pattern_recognizer, sample_price_patterns):
"""패턴 완성 예측 테스트"""
# 부분적인 패턴 데이터 (80%만)
partial_data = sample_price_patterns.iloc[:int(len(sample_price_patterns) * 0.8)]
completion_predictions = await pattern_recognizer.predict_pattern_completion(partial_data)
# 예측 결과 확인
for prediction in completion_predictions:
assert 'pattern_type' in prediction
assert 'completion_probability' in prediction
assert 'estimated_completion_time' in prediction
assert 'price_targets' in prediction
# 확률 범위 확인
assert 0 <= prediction['completion_probability'] <= 1
def test_technical_indicators_calculation(self, pattern_recognizer):
"""기술적 지표 계산 테스트"""
# 샘플 가격 데이터
prices = np.array([100, 102, 105, 103, 107, 110, 108, 112, 115, 113])
# 이동평균
sma = pattern_recognizer._calculate_sma(prices, period=5)
assert len(sma) == len(prices)
assert not np.isnan(sma[-1]) # 마지막 값은 NaN이 아님
# 볼린저 밴드
bb_upper, bb_middle, bb_lower = pattern_recognizer._calculate_bollinger_bands(prices, period=5)
assert len(bb_upper) == len(prices)
assert bb_upper[-1] > bb_middle[-1] > bb_lower[-1] # 밴드 순서 확인
# RSI
rsi = pattern_recognizer._calculate_rsi(prices, period=5)
assert len(rsi) == len(prices)
if not np.isnan(rsi[-1]):
assert 0 <= rsi[-1] <= 100 # RSI 범위 확인
@pytest.mark.asyncio
async def test_pattern_similarity_matching(self, pattern_recognizer, sample_price_patterns):
"""패턴 유사성 매칭 테스트"""
# 참조 패턴 생성
reference_pattern = sample_price_patterns['price'].iloc[:50].values
# 유사한 패턴 검색
similar_patterns = await pattern_recognizer.find_similar_patterns(
sample_price_patterns,
reference_pattern
)
# 유사성 결과 확인
for match in similar_patterns:
assert 'similarity_score' in match
assert 'start_index' in match
assert 'end_index' in match
assert 'pattern_segment' in match
# 유사성 점수 범위 확인
assert 0 <= match['similarity_score'] <= 1
class TestMultiTimeframeAnalyzer:
"""다중 시간프레임 분석기 테스트"""
@pytest.fixture
def analyzer_config(self):
"""분석기 설정"""
return {
"timeframes": ["1m", "5m", "15m", "1h", "4h", "1d"],
"alignment_method": "weighted_consensus",
"trend_confirmation_required": 3,
"signal_strength_threshold": 0.7
}
@pytest.fixture
def mtf_analyzer(self, analyzer_config):
"""다중 시간프레임 분석기 인스턴스"""
return MultiTimeframeAnalyzer(analyzer_config)
@pytest.fixture
def multi_timeframe_data(self):
"""다중 시간프레임 데이터"""
# 1분 데이터 생성 (기본)
dates_1m = pd.date_range('2024-01-01', '2024-01-02', freq='1min')
np.random.seed(42)
# 가격 트렌드 시뮬레이션 (상승 추세)
trend = np.linspace(75000, 77000, len(dates_1m))
noise = np.random.normal(0, 100, len(dates_1m))
prices_1m = trend + noise
data_1m = pd.DataFrame({
'timestamp': dates_1m,
'symbol': '005930',
'open': prices_1m * 0.999,
'high': prices_1m * 1.002,
'low': prices_1m * 0.998,
'close': prices_1m,
'volume': np.random.lognormal(12, 0.3, len(dates_1m))
})
return {
'1m': data_1m,
'5m': data_1m.resample('5min', on='timestamp').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna(),
'15m': data_1m.resample('15min', on='timestamp').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna(),
'1h': data_1m.resample('1H', on='timestamp').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
}
def test_analyzer_initialization(self, mtf_analyzer, analyzer_config):
"""분석기 초기화 테스트"""
assert mtf_analyzer.timeframes == analyzer_config["timeframes"]
assert mtf_analyzer.alignment_method == analyzer_config["alignment_method"]
assert mtf_analyzer.trend_confirmation_required == analyzer_config["trend_confirmation_required"]
assert mtf_analyzer.signal_strength_threshold == analyzer_config["signal_strength_threshold"]
@pytest.mark.asyncio
async def test_trend_analysis_across_timeframes(self, mtf_analyzer, multi_timeframe_data):
"""시간프레임별 추세 분석 테스트"""
trend_analysis = await mtf_analyzer.analyze_trends(multi_timeframe_data)
# 시간프레임별 추세 결과 확인
for timeframe in multi_timeframe_data.keys():
assert timeframe in trend_analysis
tf_analysis = trend_analysis[timeframe]
assert 'trend_direction' in tf_analysis
assert 'trend_strength' in tf_analysis
assert 'trend_duration' in tf_analysis
assert 'support_resistance' in tf_analysis
# 추세 방향 검증
assert tf_analysis['trend_direction'] in ['up', 'down', 'sideways']
assert 0 <= tf_analysis['trend_strength'] <= 1
@pytest.mark.asyncio
async def test_consensus_signal_generation(self, mtf_analyzer, multi_timeframe_data):
"""합의 신호 생성 테스트"""
consensus_signals = await mtf_analyzer.generate_consensus_signals(multi_timeframe_data)
# 합의 신호 구조 확인
assert 'overall_signal' in consensus_signals
assert 'signal_strength' in consensus_signals
assert 'timeframe_alignment' in consensus_signals
assert 'conflicting_signals' in consensus_signals
# 신호 유효성 확인
assert consensus_signals['overall_signal'] in ['buy', 'sell', 'hold']
assert 0 <= consensus_signals['signal_strength'] <= 1
# 시간프레임 정렬도 확인
alignment = consensus_signals['timeframe_alignment']
assert 'aligned_timeframes' in alignment
assert 'total_timeframes' in alignment
assert alignment['aligned_timeframes'] <= alignment['total_timeframes']
@pytest.mark.asyncio
async def test_support_resistance_identification(self, mtf_analyzer, multi_timeframe_data):
"""지지/저항선 식별 테스트"""
sr_levels = await mtf_analyzer.identify_support_resistance(multi_timeframe_data)
# 지지/저항선 결과 확인
assert 'support_levels' in sr_levels
assert 'resistance_levels' in sr_levels
assert 'key_levels' in sr_levels
# 시간프레임별 중요도 확인
for level in sr_levels['key_levels']:
assert 'price' in level
assert 'strength' in level
assert 'timeframe_confirmations' in level
assert 'level_type' in level # support or resistance
assert 0 <= level['strength'] <= 1
assert level['level_type'] in ['support', 'resistance']
@pytest.mark.asyncio
async def test_entry_exit_timing(self, mtf_analyzer, multi_timeframe_data):
"""진입/청산 타이밍 분석 테스트"""
timing_analysis = await mtf_analyzer.analyze_entry_exit_timing(multi_timeframe_data)
# 타이밍 분석 결과 확인
assert 'entry_signals' in timing_analysis
assert 'exit_signals' in timing_analysis
assert 'optimal_timeframe' in timing_analysis
assert 'risk_reward_ratio' in timing_analysis
# 진입 신호 확인
for entry in timing_analysis['entry_signals']:
assert 'timeframe' in entry
assert 'signal_type' in entry # buy/sell
assert 'confidence' in entry
assert 'price_target' in entry
assert 'stop_loss' in entry
assert entry['signal_type'] in ['buy', 'sell']
assert 0 <= entry['confidence'] <= 1
@pytest.mark.asyncio
async def test_momentum_analysis(self, mtf_analyzer, multi_timeframe_data):
"""모멘텀 분석 테스트"""
momentum_analysis = await mtf_analyzer.analyze_momentum(multi_timeframe_data)
# 모멘텀 분석 결과 확인
assert 'momentum_direction' in momentum_analysis
assert 'momentum_strength' in momentum_analysis
assert 'momentum_divergence' in momentum_analysis
assert 'acceleration' in momentum_analysis
# 시간프레임별 모멘텀 확인
for timeframe in multi_timeframe_data.keys():
if timeframe in momentum_analysis['momentum_strength']:
strength = momentum_analysis['momentum_strength'][timeframe]
assert -1 <= strength <= 1 # 정규화된 모멘텀 강도
@pytest.mark.asyncio
async def test_volatility_analysis(self, mtf_analyzer, multi_timeframe_data):
"""변동성 분석 테스트"""
volatility_analysis = await mtf_analyzer.analyze_volatility(multi_timeframe_data)
# 변동성 분석 결과 확인
assert 'current_volatility' in volatility_analysis
assert 'volatility_regime' in volatility_analysis
assert 'volatility_forecast' in volatility_analysis
assert 'breakout_potential' in volatility_analysis
# 변동성 체제 확인
assert volatility_analysis['volatility_regime'] in ['low', 'medium', 'high']
# 시간프레임별 변동성 확인
for timeframe, vol_data in volatility_analysis['current_volatility'].items():
assert 'realized_volatility' in vol_data
assert 'implied_volatility_estimate' in vol_data
assert vol_data['realized_volatility'] >= 0
@pytest.mark.asyncio
async def test_correlation_analysis(self, mtf_analyzer, multi_timeframe_data):
"""상관관계 분석 테스트"""
# 다른 자산 데이터 추가
other_asset_data = {}
for tf, data in multi_timeframe_data.items():
other_data = data.copy()
other_data['close'] = other_data['close'] * 0.6 # 다른 가격 레벨
other_asset_data['000660'] = {tf: other_data}
correlation_analysis = await mtf_analyzer.analyze_correlations(
{'005930': multi_timeframe_data},
other_asset_data
)
# 상관관계 분석 결과 확인
assert 'asset_correlations' in correlation_analysis
assert 'timeframe_correlations' in correlation_analysis
assert 'correlation_stability' in correlation_analysis
def test_technical_indicator_calculation(self, mtf_analyzer):
"""기술 지표 계산 테스트"""
# 샘플 데이터
sample_data = pd.DataFrame({
'close': [100, 102, 105, 103, 107, 110, 108, 112, 115, 113],
'high': [101, 103, 106, 104, 108, 111, 109, 113, 116, 114],
'low': [99, 101, 104, 102, 106, 109, 107, 111, 114, 112],
'volume': [1000, 1100, 1200, 900, 1300, 1400, 1000, 1500, 1600, 1200]
})
# MACD 계산
macd_line, signal_line, histogram = mtf_analyzer._calculate_macd(sample_data['close'])
assert len(macd_line) == len(sample_data)
assert len(signal_line) == len(sample_data)
assert len(histogram) == len(sample_data)
# 스토캐스틱 계산
k_percent, d_percent = mtf_analyzer._calculate_stochastic(
sample_data['high'],
sample_data['low'],
sample_data['close']
)
assert len(k_percent) == len(sample_data)
assert len(d_percent) == len(sample_data)
class TestCorrelationEngine:
"""상관관계 엔진 테스트"""
@pytest.fixture
def engine_config(self):
"""엔진 설정"""
return {
"correlation_methods": ["pearson", "spearman", "kendall"],
"rolling_windows": [20, 60, 252], # 일, 주, 연
"significance_level": 0.05,
"min_observations": 30,
"sector_groupings": {
"technology": ["005930", "000660"],
"finance": ["055550", "316140"],
"healthcare": ["207940", "068270"]
}
}
@pytest.fixture
def correlation_engine(self, engine_config):
"""상관관계 엔진 인스턴스"""
return CorrelationEngine(engine_config)
@pytest.fixture
def multi_asset_data(self):
"""다중 자산 데이터"""
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2024-01-01', freq='D')
# 상관관계가 있는 자산들 시뮬레이션
base_returns = np.random.normal(0.001, 0.02, len(dates))
assets_data = {}
correlations = {
'005930': 1.0, # 기준
'000660': 0.7, # 높은 양의 상관관계
'055550': -0.3, # 음의 상관관계
'207940': 0.1 # 낮은 상관관계
}
for asset, corr in correlations.items():
# 상관관계 시뮬레이션
asset_specific = np.random.normal(0, 0.015, len(dates))
asset_returns = corr * base_returns + np.sqrt(1 - corr**2) * asset_specific
# 누적 가격 계산
prices = 100000 * np.exp(np.cumsum(asset_returns))
assets_data[asset] = pd.DataFrame({
'timestamp': dates,
'symbol': asset,
'open': prices * 0.999,
'high': prices * 1.002,
'low': prices * 0.998,
'close': prices,
'volume': np.random.lognormal(12, 0.3, len(dates)),
'returns': asset_returns
})
return assets_data
def test_engine_initialization(self, correlation_engine, engine_config):
"""엔진 초기화 테스트"""
assert correlation_engine.correlation_methods == engine_config["correlation_methods"]
assert correlation_engine.rolling_windows == engine_config["rolling_windows"]
assert correlation_engine.significance_level == engine_config["significance_level"]
assert correlation_engine.min_observations == engine_config["min_observations"]
@pytest.mark.asyncio
async def test_pairwise_correlation_calculation(self, correlation_engine, multi_asset_data):
"""쌍별 상관관계 계산 테스트"""
asset1_data = multi_asset_data['005930']
asset2_data = multi_asset_data['000660']
correlation_result = await correlation_engine.calculate_pairwise_correlation(
asset1_data['returns'],
asset2_data['returns']
)
# 상관관계 결과 확인
assert 'pearson' in correlation_result
assert 'spearman' in correlation_result
assert 'kendall' in correlation_result
for method in correlation_engine.correlation_methods:
corr_data = correlation_result[method]
assert 'correlation' in corr_data
assert 'p_value' in corr_data
assert 'confidence_interval' in corr_data
# 상관계수 범위 확인
assert -1 <= corr_data['correlation'] <= 1
assert 0 <= corr_data['p_value'] <= 1
@pytest.mark.asyncio
async def test_correlation_matrix_calculation(self, correlation_engine, multi_asset_data):
"""상관관계 행렬 계산 테스트"""
# 수익률 데이터 추출
returns_data = {}
for asset, data in multi_asset_data.items():
returns_data[asset] = data['returns']
correlation_matrix = await correlation_engine.calculate_correlation_matrix(returns_data)
# 행렬 구조 확인
assert 'correlation_matrix' in correlation_matrix
assert 'p_value_matrix' in correlation_matrix
assert 'method' in correlation_matrix
corr_matrix = correlation_matrix['correlation_matrix']
# 행렬 속성 확인
assert corr_matrix.shape[0] == corr_matrix.shape[1] # 정사각 행렬
assert len(corr_matrix) == len(multi_asset_data)
# 대각선 원소는 1
for i in range(len(corr_matrix)):
assert abs(corr_matrix.iloc[i, i] - 1.0) < 0.01
# 대칭 행렬
for i in range(len(corr_matrix)):
for j in range(len(corr_matrix)):
assert abs(corr_matrix.iloc[i, j] - corr_matrix.iloc[j, i]) < 0.01
@pytest.mark.asyncio
async def test_rolling_correlation_analysis(self, correlation_engine, multi_asset_data):
"""롤링 상관관계 분석 테스트"""
asset1_data = multi_asset_data['005930']
asset2_data = multi_asset_data['000660']
rolling_correlations = await correlation_engine.calculate_rolling_correlations(
asset1_data['returns'],
asset2_data['returns'],
window=60
)
# 롤링 상관관계 결과 확인
assert 'rolling_correlations' in rolling_correlations
assert 'correlation_stability' in rolling_correlations
assert 'trend_analysis' in rolling_correlations
rolling_corrs = rolling_correlations['rolling_correlations']
assert len(rolling_corrs) <= len(asset1_data) # 윈도우 크기로 인한 감소
# 안정성 지표 확인
stability = rolling_correlations['correlation_stability']
assert 'mean_correlation' in stability
assert 'correlation_volatility' in stability
assert 'stability_score' in stability
@pytest.mark.asyncio
async def test_sector_correlation_analysis(self, correlation_engine, multi_asset_data):
"""섹터 상관관계 분석 테스트"""
# 섹터별 데이터 구성
sector_data = {
'technology': {
asset: multi_asset_data[asset]
for asset in ['005930', '000660']
if asset in multi_asset_data
}
}
sector_correlations = await correlation_engine.analyze_sector_correlations(sector_data)
# 섹터 분석 결과 확인
assert 'intra_sector_correlations' in sector_correlations
assert 'sector_cohesion_scores' in sector_correlations
# 섹터 내 상관관계 확인
tech_correlations = sector_correlations['intra_sector_correlations']['technology']
assert isinstance(tech_correlations, dict)
@pytest.mark.asyncio
async def test_dynamic_correlation_analysis(self, correlation_engine, multi_asset_data):
"""동적 상관관계 분석 테스트"""
returns_data = {
asset: data['returns']
for asset, data in multi_asset_data.items()
}
dynamic_analysis = await correlation_engine.analyze_dynamic_correlations(
returns_data,
regime_detection=True
)
# 동적 분석 결과 확인
assert 'correlation_regimes' in dynamic_analysis
assert 'regime_transitions' in dynamic_analysis
assert 'current_regime' in dynamic_analysis
assert 'regime_probabilities' in dynamic_analysis
# 체제 전환 확인
transitions = dynamic_analysis['regime_transitions']
assert 'transition_dates' in transitions
assert 'transition_probabilities' in transitions
@pytest.mark.asyncio
async def test_correlation_forecasting(self, correlation_engine, multi_asset_data):
"""상관관계 예측 테스트"""
asset1_returns = multi_asset_data['005930']['returns']
asset2_returns = multi_asset_data['000660']['returns']
correlation_forecast = await correlation_engine.forecast_correlations(
asset1_returns,
asset2_returns,
forecast_horizon=30
)
# 예측 결과 확인
assert 'forecasted_correlations' in correlation_forecast
assert 'forecast_confidence' in correlation_forecast
assert 'forecast_intervals' in correlation_forecast
forecasts = correlation_forecast['forecasted_correlations']
assert len(forecasts) == 30 # 예측 기간
# 예측값 범위 확인
for forecast in forecasts:
assert -1 <= forecast <= 1
@pytest.mark.asyncio
async def test_correlation_breakdown_analysis(self, correlation_engine, multi_asset_data):
"""상관관계 분해 분석 테스트"""
returns_data = {
asset: data['returns']
for asset, data in multi_asset_data.items()
}
breakdown_analysis = await correlation_engine.analyze_correlation_breakdown(returns_data)
# 분해 분석 결과 확인
assert 'systematic_correlations' in breakdown_analysis
assert 'idiosyncratic_correlations' in breakdown_analysis
assert 'correlation_drivers' in breakdown_analysis
assert 'factor_loadings' in breakdown_analysis
def test_correlation_significance_testing(self, correlation_engine):
"""상관관계 유의성 검정 테스트"""
# 샘플 데이터
np.random.seed(42)
data1 = np.random.normal(0, 1, 100)
data2 = 0.5 * data1 + np.random.normal(0, 1, 100) # 약한 상관관계
# 유의성 검정
significance_result = correlation_engine._test_correlation_significance(
data1, data2, method='pearson'
)
# 검정 결과 확인
assert 'correlation' in significance_result
assert 'p_value' in significance_result
assert 'is_significant' in significance_result
assert 'confidence_interval' in significance_result
# 유의성 판단 확인
is_significant = significance_result['p_value'] < correlation_engine.significance_level
assert significance_result['is_significant'] == is_significant
@pytest.mark.asyncio
async def test_correlation_clustering(self, correlation_engine, multi_asset_data):
"""상관관계 클러스터링 테스트"""
returns_data = {
asset: data['returns']
for asset, data in multi_asset_data.items()
}
clustering_result = await correlation_engine.cluster_by_correlations(returns_data)
# 클러스터링 결과 확인
assert 'clusters' in clustering_result
assert 'cluster_centers' in clustering_result
assert 'silhouette_score' in clustering_result
# 클러스터 할당 확인
clusters = clustering_result['clusters']
assert len(clusters) == len(multi_asset_data)
# 실루엣 점수 확인 (-1 ~ 1 범위)
silhouette_score = clustering_result['silhouette_score']
assert -1 <= silhouette_score <= 1