"""AI 시스템 기본 테스트 (의존성 최소화)"""
import pytest
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.ai.price_predictor import PricePredictor
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class MockDataFrame:
"""Pandas DataFrame 모킹"""
def __init__(self, data: Dict[str, List]):
self.data = data.copy() if data else {}
self._columns = list(self.data.keys())
self._length = len(next(iter(self.data.values()))) if self.data else 0
def __len__(self):
return self._length
def __getitem__(self, key):
if isinstance(key, str):
if key in self.data:
return MockSeries(self.data[key])
else:
return MockSeries([0] * self._length) # 기본값
return self
def __setitem__(self, key, value):
"""컬럼 할당 지원"""
if isinstance(value, MockSeries):
self.data[key] = value.data
elif isinstance(value, list):
self.data[key] = value
else:
self.data[key] = [value] * self._length
if key not in self._columns:
self._columns.append(key)
@property
def columns(self):
return self._columns
def copy(self):
return MockDataFrame(self.data.copy())
def dropna(self):
return self
def pct_change(self):
return MockSeries([0.01] * self._length)
def rolling(self, window):
return MockRolling(self.data)
def ewm(self, span):
return MockEWM(self.data)
@property
def iloc(self):
return MockILoc(self.data)
@property
def values(self):
return [[self.data[col][i] for col in self._columns] for i in range(self._length)]
class MockSeries:
"""Pandas Series 모킹"""
def __init__(self, data: List):
self.data = data
self._length = len(data)
def __len__(self):
return self._length
def __getitem__(self, index):
if isinstance(index, int):
return self.data[index] if 0 <= index < self._length else None
return MockSeries(self.data[index])
def __add__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a + b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a + other for a in self.data])
def __sub__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a - b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a - other for a in self.data])
def __mul__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a * b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a * other for a in self.data])
def __truediv__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a / b if b != 0 else 0 for a, b in zip(self.data, other.data)])
else:
return MockSeries([a / other if other != 0 else 0 for a in self.data])
@property
def iloc(self):
return MockILoc({"data": self.data})["data"]
@property
def values(self):
return self.data
def pct_change(self):
return MockSeries([0.01] * self._length)
def rolling(self, window):
return MockRolling({"data": self.data})
def ewm(self, span):
return MockEWM({"data": self.data})
def diff(self):
return MockSeries([0.1] * self._length)
def where(self, condition, other):
return MockSeries([other if not condition else x for x in self.data])
class MockRolling:
"""Pandas Rolling 모킹"""
def __init__(self, data: Dict):
self.data = data
def mean(self):
if "data" in self.data:
return MockSeries([75000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([75000] * len(self.data[key]))
def std(self):
if "data" in self.data:
return MockSeries([1000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([1000] * len(self.data[key]))
class MockEWM:
"""Pandas EWM 모킹"""
def __init__(self, data: Dict):
self.data = data
def mean(self):
if "data" in self.data:
return MockSeries([75000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([75000] * len(self.data[key]))
class MockILoc:
"""Pandas iloc 모킹"""
def __init__(self, data: Dict):
self.data = data
def __getitem__(self, key):
if isinstance(key, int):
# 단일 인덱스
if self.data:
first_key = next(iter(self.data.keys()))
if key >= 0 and key < len(self.data[first_key]):
return self.data[first_key][key]
elif key < 0 and abs(key) <= len(self.data[first_key]):
return self.data[first_key][key] # 음수 인덱스 지원
return 0 # 기본값
elif isinstance(key, slice):
# 슬라이스
result_data = {}
for col, values in self.data.items():
result_data[col] = values[key]
return MockDataFrame(result_data)
return self
class TestPricePredictor:
"""가격 예측 모델 테스트 (기본)"""
@pytest.fixture
def model_config(self):
"""모델 설정"""
return {
"model_type": "lstm",
"sequence_length": 60,
"prediction_horizon": 5,
"features": ["price", "volume", "rsi", "macd", "bollinger_bands"],
"epochs": 100,
"batch_size": 32,
"validation_split": 0.2,
"early_stopping": True,
"learning_rate": 0.001
}
@pytest.fixture
def price_predictor(self, model_config):
"""가격 예측기 인스턴스"""
return PricePredictor(model_config)
@pytest.fixture
def sample_market_data(self):
"""샘플 시장 데이터 (Mock DataFrame)"""
data_length = 100
return MockDataFrame({
'timestamp': [f"2024-01-{i+1:02d}" for i in range(data_length)],
'symbol': ['005930'] * data_length,
'price': [75000 + i * 10 for i in range(data_length)],
'volume': [1000000 + i * 1000 for i in range(data_length)],
'high': [75500 + i * 10 for i in range(data_length)],
'low': [74500 + i * 10 for i in range(data_length)],
'open': [75000 + i * 10 for i in range(data_length)],
'close': [75000 + i * 10 for i in range(data_length)]
})
def test_predictor_initialization(self, price_predictor, model_config):
"""예측기 초기화 테스트"""
assert price_predictor.model_type == model_config["model_type"]
assert price_predictor.sequence_length == model_config["sequence_length"]
assert price_predictor.prediction_horizon == model_config["prediction_horizon"]
assert price_predictor.features == model_config["features"]
assert price_predictor.is_trained == False
assert price_predictor.model is None
@pytest.mark.asyncio
async def test_feature_engineering(self, price_predictor, sample_market_data):
"""피처 엔지니어링 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
# 기본 피처 확인
assert 'price' in features.columns
assert 'volume' in features.columns
# 기술적 지표 확인
assert 'rsi' in features.columns
assert 'macd' in features.columns
assert 'bollinger_upper' in features.columns
assert 'bollinger_lower' in features.columns
assert 'sma_20' in features.columns
assert 'ema_12' in features.columns
# 데이터 길이 확인
assert len(features) > 0
@pytest.mark.asyncio
async def test_data_preprocessing(self, price_predictor, sample_market_data):
"""데이터 전처리 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 시퀀스 데이터 형태 확인
assert len(X) > 0
assert len(y) > 0
assert len(X) == len(y)
# 시퀀스 길이 확인
assert len(X[0]) == price_predictor.sequence_length
assert len(y[0]) == price_predictor.prediction_horizon
# 피처 수 확인 (available features 수와 매칭)
assert len(X[0][0]) > 0 # 최소 1개 이상의 피처
assert len(X[0][0]) >= len([f for f in price_predictor.features if f != "bollinger_bands"]) + 2 # bollinger_bands는 2개로 분리
@pytest.mark.asyncio
async def test_model_training(self, price_predictor, sample_market_data):
"""모델 훈련 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 훈련 전 상태 확인
assert price_predictor.is_trained == False
# 모델 훈련
training_history = await price_predictor.train(X, y)
# 훈련 후 상태 확인
assert price_predictor.is_trained == True
assert price_predictor.model is not None
assert 'history' in training_history
assert 'loss' in training_history['history']
assert 'val_loss' in training_history['history']
# 훈련 품질 확인 (손실 감소)
loss_history = training_history['history']['loss']
assert loss_history[0] > loss_history[-1] # 손실 감소
@pytest.mark.asyncio
async def test_price_prediction(self, price_predictor, sample_market_data):
"""가격 예측 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 최근 데이터로 예측
recent_data = MockDataFrame({
'price': [76000, 76100, 76200, 76300, 76400],
'volume': [1100000, 1110000, 1120000, 1130000, 1140000]
})
predictions = await price_predictor.predict(recent_data)
# 예측 결과 확인
assert 'predictions' in predictions
assert 'confidence_intervals' in predictions
assert 'probability_distribution' in predictions
pred_values = predictions['predictions']
assert len(pred_values) == price_predictor.prediction_horizon
assert all(isinstance(p, (int, float)) for p in pred_values)
# 신뢰구간 확인
ci = predictions['confidence_intervals']
assert 'lower' in ci and 'upper' in ci
assert len(ci['lower']) == price_predictor.prediction_horizon
assert all(ci['lower'][i] <= pred_values[i] <= ci['upper'][i] for i in range(len(pred_values)))
@pytest.mark.asyncio
async def test_batch_prediction(self, price_predictor, sample_market_data):
"""배치 예측 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 여러 심볼 데이터
symbols = ['005930', '000660', '035420']
batch_data = []
for symbol in symbols:
symbol_data = MockDataFrame({
'symbol': [symbol] * 10,
'price': [75000 + i * 10 for i in range(10)],
'volume': [1000000 + i * 1000 for i in range(10)]
})
batch_data.append(symbol_data)
# 배치 예측
batch_predictions = await price_predictor.predict_batch(batch_data)
# 결과 확인
assert len(batch_predictions) == len(symbols)
for symbol, prediction in batch_predictions.items():
assert symbol in symbols
assert 'predictions' in prediction
assert len(prediction['predictions']) == price_predictor.prediction_horizon
@pytest.mark.asyncio
async def test_model_evaluation(self, price_predictor, sample_market_data):
"""모델 평가 테스트"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# 훈련/테스트 분할
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# 모델 훈련
await price_predictor.train(X_train, y_train)
# 모델 평가
evaluation_metrics = await price_predictor.evaluate(X_test, y_test)
# 평가 지표 확인
assert 'mse' in evaluation_metrics
assert 'mae' in evaluation_metrics
assert 'rmse' in evaluation_metrics
assert 'mape' in evaluation_metrics
assert 'directional_accuracy' in evaluation_metrics
# 지표 범위 확인
assert evaluation_metrics['mse'] >= 0
assert evaluation_metrics['mae'] >= 0
assert evaluation_metrics['rmse'] >= 0
assert 0 <= evaluation_metrics['directional_accuracy'] <= 1
@pytest.mark.asyncio
async def test_model_save_load(self, price_predictor, sample_market_data, tmp_path):
"""모델 저장/로드 테스트"""
# 모델 훈련
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# 모델 저장
model_path = tmp_path / "test_model.json"
await price_predictor.save_model(str(model_path))
assert model_path.exists()
# 새 예측기 인스턴스 생성
new_predictor = PricePredictor(price_predictor.config)
assert new_predictor.is_trained == False
# 모델 로드
await new_predictor.load_model(str(model_path))
assert new_predictor.is_trained == True
# 모델 구조 확인
assert new_predictor.model['type'] == price_predictor.model['type']
@pytest.mark.asyncio
async def test_insufficient_data_error(self, price_predictor):
"""데이터 부족 에러 테스트"""
# 매우 적은 데이터
small_data = MockDataFrame({
'price': [100] * 5,
'volume': [1000] * 5
})
with pytest.raises(InsufficientDataError):
features = await price_predictor.engineer_features(small_data)
X, y = await price_predictor.preprocess_data(features)
@pytest.mark.asyncio
async def test_untrained_model_error(self, price_predictor, sample_market_data):
"""훈련되지 않은 모델 에러 테스트"""
recent_data = MockDataFrame({
'price': [76000, 76100, 76200],
'volume': [1100000, 1110000, 1120000]
})
with pytest.raises(ModelNotTrainedError):
await price_predictor.predict(recent_data)
def test_model_architecture_creation(self, price_predictor):
"""모델 아키텍처 생성 테스트"""
input_shape = (price_predictor.sequence_length, len(price_predictor.features))
output_size = price_predictor.prediction_horizon
model = price_predictor._create_model(input_shape, output_size)
# 모델 구조 확인
assert model is not None
assert 'layers' in model
assert len(model['layers']) > 0
assert model['input_shape'] == input_shape
assert model['output_shape'] == (output_size,)
class TestTechnicalIndicators:
"""기술적 지표 계산 테스트"""
@pytest.fixture
def price_predictor(self):
"""예측기 인스턴스"""
config = {"model_type": "lstm"}
return PricePredictor(config)
def test_rsi_calculation(self, price_predictor):
"""RSI 계산 테스트"""
prices = MockSeries([100, 102, 105, 103, 107, 110, 108, 112, 115, 113])
rsi = price_predictor._calculate_rsi(prices)
# RSI 결과 확인
assert len(rsi) == len(prices)
# RSI는 보통 0-100 범위이지만 mock에서는 50 고정값
assert all(isinstance(val, (int, float)) for val in rsi.data)
def test_macd_calculation(self, price_predictor):
"""MACD 계산 테스트"""
prices = MockSeries([100, 102, 105, 103, 107, 110, 108, 112, 115, 113])
macd_line, signal_line = price_predictor._calculate_macd(prices)
# MACD 결과 확인
assert len(macd_line) == len(prices)
assert len(signal_line) == len(prices)
assert isinstance(macd_line.data[0], (int, float))
assert isinstance(signal_line.data[0], (int, float))