test_ai_basic.pyโข17.7 kB
"""AI ์์คํ
๊ธฐ๋ณธ ํ
์คํธ (์์กด์ฑ ์ต์ํ)"""
import pytest
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.ai.price_predictor import PricePredictor
from src.exceptions import ModelNotTrainedError, InsufficientDataError, PredictionError
class MockDataFrame:
"""Pandas DataFrame ๋ชจํน"""
def __init__(self, data: Dict[str, List]):
self.data = data.copy() if data else {}
self._columns = list(self.data.keys())
self._length = len(next(iter(self.data.values()))) if self.data else 0
def __len__(self):
return self._length
def __getitem__(self, key):
if isinstance(key, str):
if key in self.data:
return MockSeries(self.data[key])
else:
return MockSeries([0] * self._length) # ๊ธฐ๋ณธ๊ฐ
return self
def __setitem__(self, key, value):
"""์ปฌ๋ผ ํ ๋น ์ง์"""
if isinstance(value, MockSeries):
self.data[key] = value.data
elif isinstance(value, list):
self.data[key] = value
else:
self.data[key] = [value] * self._length
if key not in self._columns:
self._columns.append(key)
@property
def columns(self):
return self._columns
def copy(self):
return MockDataFrame(self.data.copy())
def dropna(self):
return self
def pct_change(self):
return MockSeries([0.01] * self._length)
def rolling(self, window):
return MockRolling(self.data)
def ewm(self, span):
return MockEWM(self.data)
@property
def iloc(self):
return MockILoc(self.data)
@property
def values(self):
return [[self.data[col][i] for col in self._columns] for i in range(self._length)]
class MockSeries:
"""Pandas Series ๋ชจํน"""
def __init__(self, data: List):
self.data = data
self._length = len(data)
def __len__(self):
return self._length
def __getitem__(self, index):
if isinstance(index, int):
return self.data[index] if 0 <= index < self._length else None
return MockSeries(self.data[index])
def __add__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a + b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a + other for a in self.data])
def __sub__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a - b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a - other for a in self.data])
def __mul__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a * b for a, b in zip(self.data, other.data)])
else:
return MockSeries([a * other for a in self.data])
def __truediv__(self, other):
if isinstance(other, MockSeries):
return MockSeries([a / b if b != 0 else 0 for a, b in zip(self.data, other.data)])
else:
return MockSeries([a / other if other != 0 else 0 for a in self.data])
@property
def iloc(self):
return MockILoc({"data": self.data})["data"]
@property
def values(self):
return self.data
def pct_change(self):
return MockSeries([0.01] * self._length)
def rolling(self, window):
return MockRolling({"data": self.data})
def ewm(self, span):
return MockEWM({"data": self.data})
def diff(self):
return MockSeries([0.1] * self._length)
def where(self, condition, other):
return MockSeries([other if not condition else x for x in self.data])
class MockRolling:
"""Pandas Rolling ๋ชจํน"""
def __init__(self, data: Dict):
self.data = data
def mean(self):
if "data" in self.data:
return MockSeries([75000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([75000] * len(self.data[key]))
def std(self):
if "data" in self.data:
return MockSeries([1000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([1000] * len(self.data[key]))
class MockEWM:
"""Pandas EWM ๋ชจํน"""
def __init__(self, data: Dict):
self.data = data
def mean(self):
if "data" in self.data:
return MockSeries([75000] * len(self.data["data"]))
key = next(iter(self.data.keys()))
return MockSeries([75000] * len(self.data[key]))
class MockILoc:
"""Pandas iloc ๋ชจํน"""
def __init__(self, data: Dict):
self.data = data
def __getitem__(self, key):
if isinstance(key, int):
# ๋จ์ผ ์ธ๋ฑ์ค
if self.data:
first_key = next(iter(self.data.keys()))
if key >= 0 and key < len(self.data[first_key]):
return self.data[first_key][key]
elif key < 0 and abs(key) <= len(self.data[first_key]):
return self.data[first_key][key] # ์์ ์ธ๋ฑ์ค ์ง์
return 0 # ๊ธฐ๋ณธ๊ฐ
elif isinstance(key, slice):
# ์ฌ๋ผ์ด์ค
result_data = {}
for col, values in self.data.items():
result_data[col] = values[key]
return MockDataFrame(result_data)
return self
class TestPricePredictor:
"""๊ฐ๊ฒฉ ์์ธก ๋ชจ๋ธ ํ
์คํธ (๊ธฐ๋ณธ)"""
@pytest.fixture
def model_config(self):
"""๋ชจ๋ธ ์ค์ """
return {
"model_type": "lstm",
"sequence_length": 60,
"prediction_horizon": 5,
"features": ["price", "volume", "rsi", "macd", "bollinger_bands"],
"epochs": 100,
"batch_size": 32,
"validation_split": 0.2,
"early_stopping": True,
"learning_rate": 0.001
}
@pytest.fixture
def price_predictor(self, model_config):
"""๊ฐ๊ฒฉ ์์ธก๊ธฐ ์ธ์คํด์ค"""
return PricePredictor(model_config)
@pytest.fixture
def sample_market_data(self):
"""์ํ ์์ฅ ๋ฐ์ดํฐ (Mock DataFrame)"""
data_length = 100
return MockDataFrame({
'timestamp': [f"2024-01-{i+1:02d}" for i in range(data_length)],
'symbol': ['005930'] * data_length,
'price': [75000 + i * 10 for i in range(data_length)],
'volume': [1000000 + i * 1000 for i in range(data_length)],
'high': [75500 + i * 10 for i in range(data_length)],
'low': [74500 + i * 10 for i in range(data_length)],
'open': [75000 + i * 10 for i in range(data_length)],
'close': [75000 + i * 10 for i in range(data_length)]
})
def test_predictor_initialization(self, price_predictor, model_config):
"""์์ธก๊ธฐ ์ด๊ธฐํ ํ
์คํธ"""
assert price_predictor.model_type == model_config["model_type"]
assert price_predictor.sequence_length == model_config["sequence_length"]
assert price_predictor.prediction_horizon == model_config["prediction_horizon"]
assert price_predictor.features == model_config["features"]
assert price_predictor.is_trained == False
assert price_predictor.model is None
@pytest.mark.asyncio
async def test_feature_engineering(self, price_predictor, sample_market_data):
"""ํผ์ฒ ์์ง๋์ด๋ง ํ
์คํธ"""
features = await price_predictor.engineer_features(sample_market_data)
# ๊ธฐ๋ณธ ํผ์ฒ ํ์ธ
assert 'price' in features.columns
assert 'volume' in features.columns
# ๊ธฐ์ ์ ์งํ ํ์ธ
assert 'rsi' in features.columns
assert 'macd' in features.columns
assert 'bollinger_upper' in features.columns
assert 'bollinger_lower' in features.columns
assert 'sma_20' in features.columns
assert 'ema_12' in features.columns
# ๋ฐ์ดํฐ ๊ธธ์ด ํ์ธ
assert len(features) > 0
@pytest.mark.asyncio
async def test_data_preprocessing(self, price_predictor, sample_market_data):
"""๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ํ
์คํธ"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# ์ํ์ค ๋ฐ์ดํฐ ํํ ํ์ธ
assert len(X) > 0
assert len(y) > 0
assert len(X) == len(y)
# ์ํ์ค ๊ธธ์ด ํ์ธ
assert len(X[0]) == price_predictor.sequence_length
assert len(y[0]) == price_predictor.prediction_horizon
# ํผ์ฒ ์ ํ์ธ (available features ์์ ๋งค์นญ)
assert len(X[0][0]) > 0 # ์ต์ 1๊ฐ ์ด์์ ํผ์ฒ
assert len(X[0][0]) >= len([f for f in price_predictor.features if f != "bollinger_bands"]) + 2 # bollinger_bands๋ 2๊ฐ๋ก ๋ถ๋ฆฌ
@pytest.mark.asyncio
async def test_model_training(self, price_predictor, sample_market_data):
"""๋ชจ๋ธ ํ๋ จ ํ
์คํธ"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# ํ๋ จ ์ ์ํ ํ์ธ
assert price_predictor.is_trained == False
# ๋ชจ๋ธ ํ๋ จ
training_history = await price_predictor.train(X, y)
# ํ๋ จ ํ ์ํ ํ์ธ
assert price_predictor.is_trained == True
assert price_predictor.model is not None
assert 'history' in training_history
assert 'loss' in training_history['history']
assert 'val_loss' in training_history['history']
# ํ๋ จ ํ์ง ํ์ธ (์์ค ๊ฐ์)
loss_history = training_history['history']['loss']
assert loss_history[0] > loss_history[-1] # ์์ค ๊ฐ์
@pytest.mark.asyncio
async def test_price_prediction(self, price_predictor, sample_market_data):
"""๊ฐ๊ฒฉ ์์ธก ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# ์ต๊ทผ ๋ฐ์ดํฐ๋ก ์์ธก
recent_data = MockDataFrame({
'price': [76000, 76100, 76200, 76300, 76400],
'volume': [1100000, 1110000, 1120000, 1130000, 1140000]
})
predictions = await price_predictor.predict(recent_data)
# ์์ธก ๊ฒฐ๊ณผ ํ์ธ
assert 'predictions' in predictions
assert 'confidence_intervals' in predictions
assert 'probability_distribution' in predictions
pred_values = predictions['predictions']
assert len(pred_values) == price_predictor.prediction_horizon
assert all(isinstance(p, (int, float)) for p in pred_values)
# ์ ๋ขฐ๊ตฌ๊ฐ ํ์ธ
ci = predictions['confidence_intervals']
assert 'lower' in ci and 'upper' in ci
assert len(ci['lower']) == price_predictor.prediction_horizon
assert all(ci['lower'][i] <= pred_values[i] <= ci['upper'][i] for i in range(len(pred_values)))
@pytest.mark.asyncio
async def test_batch_prediction(self, price_predictor, sample_market_data):
"""๋ฐฐ์น ์์ธก ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# ์ฌ๋ฌ ์ฌ๋ณผ ๋ฐ์ดํฐ
symbols = ['005930', '000660', '035420']
batch_data = []
for symbol in symbols:
symbol_data = MockDataFrame({
'symbol': [symbol] * 10,
'price': [75000 + i * 10 for i in range(10)],
'volume': [1000000 + i * 1000 for i in range(10)]
})
batch_data.append(symbol_data)
# ๋ฐฐ์น ์์ธก
batch_predictions = await price_predictor.predict_batch(batch_data)
# ๊ฒฐ๊ณผ ํ์ธ
assert len(batch_predictions) == len(symbols)
for symbol, prediction in batch_predictions.items():
assert symbol in symbols
assert 'predictions' in prediction
assert len(prediction['predictions']) == price_predictor.prediction_horizon
@pytest.mark.asyncio
async def test_model_evaluation(self, price_predictor, sample_market_data):
"""๋ชจ๋ธ ํ๊ฐ ํ
์คํธ"""
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
# ํ๋ จ/ํ
์คํธ ๋ถํ
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# ๋ชจ๋ธ ํ๋ จ
await price_predictor.train(X_train, y_train)
# ๋ชจ๋ธ ํ๊ฐ
evaluation_metrics = await price_predictor.evaluate(X_test, y_test)
# ํ๊ฐ ์งํ ํ์ธ
assert 'mse' in evaluation_metrics
assert 'mae' in evaluation_metrics
assert 'rmse' in evaluation_metrics
assert 'mape' in evaluation_metrics
assert 'directional_accuracy' in evaluation_metrics
# ์งํ ๋ฒ์ ํ์ธ
assert evaluation_metrics['mse'] >= 0
assert evaluation_metrics['mae'] >= 0
assert evaluation_metrics['rmse'] >= 0
assert 0 <= evaluation_metrics['directional_accuracy'] <= 1
@pytest.mark.asyncio
async def test_model_save_load(self, price_predictor, sample_market_data, tmp_path):
"""๋ชจ๋ธ ์ ์ฅ/๋ก๋ ํ
์คํธ"""
# ๋ชจ๋ธ ํ๋ จ
features = await price_predictor.engineer_features(sample_market_data)
X, y = await price_predictor.preprocess_data(features)
await price_predictor.train(X, y)
# ๋ชจ๋ธ ์ ์ฅ
model_path = tmp_path / "test_model.json"
await price_predictor.save_model(str(model_path))
assert model_path.exists()
# ์ ์์ธก๊ธฐ ์ธ์คํด์ค ์์ฑ
new_predictor = PricePredictor(price_predictor.config)
assert new_predictor.is_trained == False
# ๋ชจ๋ธ ๋ก๋
await new_predictor.load_model(str(model_path))
assert new_predictor.is_trained == True
# ๋ชจ๋ธ ๊ตฌ์กฐ ํ์ธ
assert new_predictor.model['type'] == price_predictor.model['type']
@pytest.mark.asyncio
async def test_insufficient_data_error(self, price_predictor):
"""๋ฐ์ดํฐ ๋ถ์กฑ ์๋ฌ ํ
์คํธ"""
# ๋งค์ฐ ์ ์ ๋ฐ์ดํฐ
small_data = MockDataFrame({
'price': [100] * 5,
'volume': [1000] * 5
})
with pytest.raises(InsufficientDataError):
features = await price_predictor.engineer_features(small_data)
X, y = await price_predictor.preprocess_data(features)
@pytest.mark.asyncio
async def test_untrained_model_error(self, price_predictor, sample_market_data):
"""ํ๋ จ๋์ง ์์ ๋ชจ๋ธ ์๋ฌ ํ
์คํธ"""
recent_data = MockDataFrame({
'price': [76000, 76100, 76200],
'volume': [1100000, 1110000, 1120000]
})
with pytest.raises(ModelNotTrainedError):
await price_predictor.predict(recent_data)
def test_model_architecture_creation(self, price_predictor):
"""๋ชจ๋ธ ์ํคํ
์ฒ ์์ฑ ํ
์คํธ"""
input_shape = (price_predictor.sequence_length, len(price_predictor.features))
output_size = price_predictor.prediction_horizon
model = price_predictor._create_model(input_shape, output_size)
# ๋ชจ๋ธ ๊ตฌ์กฐ ํ์ธ
assert model is not None
assert 'layers' in model
assert len(model['layers']) > 0
assert model['input_shape'] == input_shape
assert model['output_shape'] == (output_size,)
class TestTechnicalIndicators:
"""๊ธฐ์ ์ ์งํ ๊ณ์ฐ ํ
์คํธ"""
@pytest.fixture
def price_predictor(self):
"""์์ธก๊ธฐ ์ธ์คํด์ค"""
config = {"model_type": "lstm"}
return PricePredictor(config)
def test_rsi_calculation(self, price_predictor):
"""RSI ๊ณ์ฐ ํ
์คํธ"""
prices = MockSeries([100, 102, 105, 103, 107, 110, 108, 112, 115, 113])
rsi = price_predictor._calculate_rsi(prices)
# RSI ๊ฒฐ๊ณผ ํ์ธ
assert len(rsi) == len(prices)
# RSI๋ ๋ณดํต 0-100 ๋ฒ์์ด์ง๋ง mock์์๋ 50 ๊ณ ์ ๊ฐ
assert all(isinstance(val, (int, float)) for val in rsi.data)
def test_macd_calculation(self, price_predictor):
"""MACD ๊ณ์ฐ ํ
์คํธ"""
prices = MockSeries([100, 102, 105, 103, 107, 110, 108, 112, 115, 113])
macd_line, signal_line = price_predictor._calculate_macd(prices)
# MACD ๊ฒฐ๊ณผ ํ์ธ
assert len(macd_line) == len(prices)
assert len(signal_line) == len(prices)
assert isinstance(macd_line.data[0], (int, float))
assert isinstance(signal_line.data[0], (int, float))