test_liquidity_tools.py•26.5 kB
"""유동성 분석 도구 테스트"""
import pytest
import random
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
from typing import Dict, List, Any
from src.tools.liquidity_tools import LiquidityAnalysisTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestLiquidityAnalysisTool:
"""유동성 분석 도구 테스트"""
@pytest.fixture
def mock_db_manager(self):
"""Mock 데이터베이스 매니저"""
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock 캐시 매니저"""
return AsyncMock()
@pytest.fixture
def liquidity_tool(self, mock_db_manager, mock_cache_manager):
"""유동성 분석 도구 인스턴스"""
return LiquidityAnalysisTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_liquidity_data(self):
"""샘플 유동성 데이터"""
base_date = datetime.now().date()
data = []
markets = ["KOSPI", "KOSDAQ"]
for i in range(30): # 30일 데이터
date = base_date - timedelta(days=i)
for market in markets:
# 시장별 유동성 패턴 시뮬레이션
if market == "KOSPI":
# KOSPI는 더 높은 유동성
daily_volume = random.gauss(500000000000, 100000000000) # 5천억 중심
bid_ask_spread = random.gauss(0.02, 0.005) # 2bp 중심
market_depth = random.gauss(1000000000, 200000000) # 10억 중심
else:
# KOSDAQ은 상대적으로 낮은 유동성
daily_volume = random.gauss(200000000000, 50000000000) # 2천억 중심
bid_ask_spread = random.gauss(0.05, 0.01) # 5bp 중심
market_depth = random.gauss(500000000, 100000000) # 5억 중심
data.append({
"date": date,
"market": market,
"daily_volume": max(daily_volume, 0),
"daily_transaction_value": daily_volume * random.gauss(2500, 500),
"bid_ask_spread": max(bid_ask_spread, 0.001),
"market_depth_buy": max(market_depth, 0),
"market_depth_sell": max(market_depth, 0),
"turnover_rate": random.gauss(0.15, 0.05), # 15% 중심
"price_impact": random.gauss(0.001, 0.0005),
"volatility": random.gauss(0.02, 0.01),
"active_stocks_count": random.randint(800, 1200),
"block_trade_volume": random.gauss(50000000000, 10000000000)
})
return data
@pytest.fixture
def sample_intraday_data(self):
"""샘플 일중 유동성 데이터"""
base_time = datetime.now()
data = []
# 하루 중 30분 간격 데이터
for i in range(16): # 9:00 ~ 15:30
timestamp = base_time.replace(hour=9, minute=0) + timedelta(minutes=i*30)
# 장 시작/마감 시간대 유동성 패턴
if i < 2 or i > 13: # 장 시작/마감 1시간
volume_multiplier = 1.5
spread_multiplier = 1.3
else: # 중간 시간대
volume_multiplier = 1.0
spread_multiplier = 1.0
data.append({
"timestamp": timestamp,
"market": "KOSPI",
"volume": random.gauss(30000000000, 5000000000) * volume_multiplier,
"bid_ask_spread": random.gauss(0.02, 0.005) * spread_multiplier,
"market_depth": random.gauss(1000000000, 200000000),
"trade_count": random.randint(5000, 15000),
"large_trade_ratio": random.gauss(0.25, 0.05)
})
return data
def test_tool_initialization(self, liquidity_tool, mock_db_manager, mock_cache_manager):
"""도구 초기화 테스트"""
assert liquidity_tool.name == "analyze_liquidity"
assert liquidity_tool.description is not None
assert "유동성" in liquidity_tool.description or "liquidity" in liquidity_tool.description.lower()
assert liquidity_tool.db_manager == mock_db_manager
assert liquidity_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, liquidity_tool):
"""도구 정의 테스트"""
definition = liquidity_tool.get_tool_definition()
assert definition.name == "analyze_liquidity"
assert definition.description is not None
assert definition.inputSchema is not None
# 입력 스키마 검증
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "analysis_type" in properties
assert "markets" in properties
assert "time_period" in properties
assert "include_intraday" in properties
# analysis_type 파라미터 검증
analysis_prop = properties["analysis_type"]
assert analysis_prop["type"] == "array"
assert "market_liquidity" in str(analysis_prop)
assert "spread_analysis" in str(analysis_prop)
assert "depth_analysis" in str(analysis_prop)
@pytest.mark.asyncio
async def test_execute_market_liquidity_analysis(self, liquidity_tool, sample_liquidity_data):
"""시장 유동성 분석 테스트"""
# 캐시 미스
liquidity_tool.cache_manager.get.return_value = None
# 데이터베이스 응답 설정
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["market_liquidity"],
"markets": ["KOSPI", "KOSDAQ"],
"time_period": "30d"
})
# 결과 검증
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON 파싱하여 내용 확인
import json
data = json.loads(content.text)
assert "liquidity_analysis" in data
assert "market_liquidity" in data["liquidity_analysis"]
# 시장 유동성 결과 검증
market_liquidity = data["liquidity_analysis"]["market_liquidity"]
assert "overall_liquidity_score" in market_liquidity
assert "market_comparison" in market_liquidity
assert "liquidity_trends" in market_liquidity
assert "volume_analysis" in market_liquidity
# 시장별 비교 데이터 확인
market_comparison = market_liquidity["market_comparison"]
assert "KOSPI" in market_comparison
assert "KOSDAQ" in market_comparison
kospi_data = market_comparison["KOSPI"]
assert "liquidity_score" in kospi_data
assert "average_volume" in kospi_data
assert "average_spread" in kospi_data
@pytest.mark.asyncio
async def test_execute_spread_analysis(self, liquidity_tool, sample_liquidity_data):
"""스프레드 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["spread_analysis"],
"markets": ["KOSPI"],
"time_period": "20d",
"include_percentiles": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "spread_analysis" in data["liquidity_analysis"]
spread_analysis = data["liquidity_analysis"]["spread_analysis"]
assert "average_spread" in spread_analysis
assert "spread_volatility" in spread_analysis
assert "spread_percentiles" in spread_analysis
assert "time_of_day_patterns" in spread_analysis
# 백분위수 확인
percentiles = spread_analysis["spread_percentiles"]
assert "p25" in percentiles
assert "p50" in percentiles
assert "p75" in percentiles
assert "p95" in percentiles
@pytest.mark.asyncio
async def test_execute_depth_analysis(self, liquidity_tool, sample_liquidity_data):
"""시장 깊이 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["depth_analysis"],
"markets": ["KOSPI", "KOSDAQ"],
"time_period": "15d"
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "depth_analysis" in data["liquidity_analysis"]
depth_analysis = data["liquidity_analysis"]["depth_analysis"]
assert "average_market_depth" in depth_analysis
assert "depth_imbalance" in depth_analysis
assert "depth_resilience" in depth_analysis
assert "market_making_activity" in depth_analysis
# 시장 깊이 데이터 확인
avg_depth = depth_analysis["average_market_depth"]
assert "total_depth" in avg_depth
assert "buy_side_depth" in avg_depth
assert "sell_side_depth" in avg_depth
@pytest.mark.asyncio
async def test_execute_turnover_analysis(self, liquidity_tool, sample_liquidity_data):
"""회전율 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["turnover_analysis"],
"markets": ["KOSPI"],
"time_period": "30d",
"include_sector_breakdown": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "turnover_analysis" in data["liquidity_analysis"]
turnover_analysis = data["liquidity_analysis"]["turnover_analysis"]
assert "average_turnover_rate" in turnover_analysis
assert "turnover_volatility" in turnover_analysis
assert "high_turnover_stocks" in turnover_analysis
assert "sector_turnover" in turnover_analysis
# 섹터별 회전율 확인
sector_turnover = turnover_analysis["sector_turnover"]
assert isinstance(sector_turnover, dict)
@pytest.mark.asyncio
async def test_execute_price_impact_analysis(self, liquidity_tool, sample_liquidity_data):
"""가격 충격 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["price_impact"],
"markets": ["KOSPI", "KOSDAQ"],
"time_period": "30d",
"trade_size_analysis": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "price_impact" in data["liquidity_analysis"]
price_impact = data["liquidity_analysis"]["price_impact"]
assert "average_price_impact" in price_impact
assert "impact_by_trade_size" in price_impact
assert "market_resilience" in price_impact
assert "temporary_vs_permanent_impact" in price_impact
# 거래 규모별 가격 충격 확인
impact_by_size = price_impact["impact_by_trade_size"]
assert "small_trades" in impact_by_size
assert "medium_trades" in impact_by_size
assert "large_trades" in impact_by_size
@pytest.mark.asyncio
async def test_comprehensive_liquidity_analysis(self, liquidity_tool, sample_liquidity_data):
"""종합 유동성 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_liquidity_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["market_liquidity", "spread_analysis", "depth_analysis", "turnover_analysis"],
"markets": ["KOSPI", "KOSDAQ"],
"time_period": "30d",
"include_risk_metrics": True,
"include_forecasts": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
analysis = data["liquidity_analysis"]
assert "market_liquidity" in analysis
assert "spread_analysis" in analysis
assert "depth_analysis" in analysis
assert "turnover_analysis" in analysis
# 리스크 메트릭 및 예측
assert "risk_metrics" in data
assert "forecasts" in data
risk_metrics = data["risk_metrics"]
assert "liquidity_risk_score" in risk_metrics
assert "market_stress_indicators" in risk_metrics
forecasts = data["forecasts"]
assert "liquidity_trend_forecast" in forecasts
assert "expected_spread_range" in forecasts
@pytest.mark.asyncio
async def test_intraday_liquidity_analysis(self, liquidity_tool, sample_intraday_data):
"""일중 유동성 분석 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = sample_intraday_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["intraday_patterns"],
"markets": ["KOSPI"],
"time_period": "1d",
"include_intraday": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "intraday_patterns" in data["liquidity_analysis"]
intraday = data["liquidity_analysis"]["intraday_patterns"]
assert "hourly_liquidity" in intraday
assert "opening_closing_effects" in intraday
assert "lunch_break_impact" in intraday
assert "volume_concentration" in intraday
# 시간별 유동성 패턴 확인
hourly_liquidity = intraday["hourly_liquidity"]
assert isinstance(hourly_liquidity, list)
assert len(hourly_liquidity) > 0
def test_liquidity_score_calculation(self, liquidity_tool):
"""유동성 점수 계산 테스트"""
# 테스트 데이터
market_data = {
"daily_volume": 500000000000, # 5천억
"bid_ask_spread": 0.02, # 2bp
"market_depth_buy": 1000000000, # 10억
"market_depth_sell": 1000000000, # 10억
"turnover_rate": 0.15, # 15%
"price_impact": 0.001, # 0.1%
"volatility": 0.02 # 2%
}
score = liquidity_tool._calculate_liquidity_score(market_data)
assert "overall_score" in score
assert "volume_score" in score
assert "spread_score" in score
assert "depth_score" in score
assert "impact_score" in score
# 점수는 0-100 범위
assert 0 <= score["overall_score"] <= 100
assert 0 <= score["volume_score"] <= 100
assert 0 <= score["spread_score"] <= 100
def test_spread_percentile_calculation(self, liquidity_tool):
"""스프레드 백분위수 계산 테스트"""
# 테스트 데이터
spread_data = [0.01, 0.015, 0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.055]
percentiles = liquidity_tool._calculate_spread_percentiles(spread_data)
assert "p25" in percentiles
assert "p50" in percentiles
assert "p75" in percentiles
assert "p95" in percentiles
# 백분위수 순서 확인
assert percentiles["p25"] <= percentiles["p50"]
assert percentiles["p50"] <= percentiles["p75"]
assert percentiles["p75"] <= percentiles["p95"]
def test_market_depth_analysis(self, liquidity_tool):
"""시장 깊이 분석 테스트"""
depth_data = [
{"market_depth_buy": 1000000000, "market_depth_sell": 800000000},
{"market_depth_buy": 1200000000, "market_depth_sell": 1000000000},
{"market_depth_buy": 900000000, "market_depth_sell": 1100000000},
]
depth_analysis = liquidity_tool._analyze_market_depth(depth_data)
assert "average_total_depth" in depth_analysis
assert "buy_sell_imbalance" in depth_analysis
assert "depth_stability" in depth_analysis
# 평균 깊이 계산 확인
expected_avg_buy = (1000000000 + 1200000000 + 900000000) / 3
expected_avg_sell = (800000000 + 1000000000 + 1100000000) / 3
expected_total = expected_avg_buy + expected_avg_sell
assert depth_analysis["average_total_depth"] == expected_total
def test_turnover_volatility_calculation(self, liquidity_tool):
"""회전율 변동성 계산 테스트"""
turnover_data = [
{"turnover_rate": 0.10},
{"turnover_rate": 0.15},
{"turnover_rate": 0.20},
{"turnover_rate": 0.12},
{"turnover_rate": 0.18}
]
volatility = liquidity_tool._calculate_turnover_volatility(turnover_data)
assert "volatility" in volatility
assert "coefficient_of_variation" in volatility
assert "volatility_trend" in volatility
# 변동성은 양수
assert volatility["volatility"] >= 0
assert volatility["coefficient_of_variation"] >= 0
def test_price_impact_by_trade_size(self, liquidity_tool):
"""거래 규모별 가격 충격 테스트"""
impact_data = [
{"trade_size": 1000000, "price_impact": 0.0005}, # 소액
{"trade_size": 10000000, "price_impact": 0.001}, # 중간
{"trade_size": 100000000, "price_impact": 0.005}, # 대형
{"trade_size": 500000000, "price_impact": 0.015}, # 초대형
]
impact_analysis = liquidity_tool._analyze_price_impact_by_size(impact_data)
assert "small_trades" in impact_analysis
assert "medium_trades" in impact_analysis
assert "large_trades" in impact_analysis
assert "block_trades" in impact_analysis
# 거래 규모가 클수록 가격 충격이 커야 함
assert impact_analysis["small_trades"]["avg_impact"] < impact_analysis["large_trades"]["avg_impact"]
def test_intraday_pattern_detection(self, liquidity_tool):
"""일중 패턴 탐지 테스트"""
# 시간별 거래량 데이터 (9시~15시)
hourly_data = []
hours = [9, 10, 11, 12, 13, 14, 15]
volumes = [150, 100, 80, 60, 80, 100, 140] # 장 시작/마감 높음
for hour, volume in zip(hours, volumes):
hourly_data.append({
"hour": hour,
"volume": volume * 1000000000,
"trade_count": volume * 100,
"avg_spread": 0.02 if hour in [9, 15] else 0.015
})
patterns = liquidity_tool._detect_intraday_patterns(hourly_data)
assert "peak_hours" in patterns
assert "low_liquidity_hours" in patterns
assert "opening_effect" in patterns
assert "closing_effect" in patterns
# 장 시작/마감 효과 확인
assert patterns["opening_effect"]["enhanced_volume"] == True
assert patterns["closing_effect"]["enhanced_volume"] == True
def test_liquidity_risk_assessment(self, liquidity_tool):
"""유동성 리스크 평가 테스트"""
risk_data = {
"average_spread": 0.035, # 높은 스프레드
"spread_volatility": 0.015, # 높은 변동성
"market_depth": 500000000, # 낮은 깊이
"turnover_rate": 0.05, # 낮은 회전율
"price_impact": 0.008 # 높은 가격 충격
}
risk_assessment = liquidity_tool._assess_liquidity_risk(risk_data)
assert "overall_risk_level" in risk_assessment
assert "risk_factors" in risk_assessment
assert "risk_score" in risk_assessment
assert "recommendations" in risk_assessment
# 리스크 레벨은 low/medium/high 중 하나
assert risk_assessment["overall_risk_level"] in ["low", "medium", "high"]
assert risk_assessment["risk_score"] >= 0
@pytest.mark.asyncio
async def test_liquidity_forecast(self, liquidity_tool):
"""유동성 예측 테스트"""
# 시계열 유동성 데이터
historical_data = []
for i in range(20):
historical_data.append({
"date": (datetime.now() - timedelta(days=i)).date(),
"liquidity_score": 75 + i * 0.5 + random.gauss(0, 2),
"volume": 400000000000 + i * 10000000000,
"spread": 0.02 + i * 0.0001
})
forecast = await liquidity_tool._forecast_liquidity(historical_data, forecast_days=5)
assert "liquidity_scores" in forecast
assert "volume_forecast" in forecast
assert "spread_forecast" in forecast
assert "confidence_intervals" in forecast
# 5일 예측 확인
assert len(forecast["liquidity_scores"]) == 5
assert len(forecast["volume_forecast"]) == 5
assert len(forecast["spread_forecast"]) == 5
@pytest.mark.asyncio
async def test_cache_functionality(self, liquidity_tool):
"""캐시 기능 테스트"""
# 캐시 히트 시나리오
cached_data = {
"timestamp": datetime.now().isoformat(),
"liquidity_analysis": {
"market_liquidity": {"overall_liquidity_score": 85}
}
}
liquidity_tool.cache_manager.get.return_value = cached_data
# 실행
result = await liquidity_tool.execute({
"analysis_type": ["market_liquidity"],
"markets": ["KOSPI"]
})
# 캐시에서 데이터 반환 확인
content = result[0]
import json
data = json.loads(content.text)
assert data["liquidity_analysis"]["market_liquidity"]["overall_liquidity_score"] == 85
# 데이터베이스 호출 없음 확인
liquidity_tool.db_manager.fetch_all.assert_not_called()
@pytest.mark.asyncio
async def test_error_handling(self, liquidity_tool):
"""에러 처리 테스트"""
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB 연결 실패")
with pytest.raises(DatabaseConnectionError):
await liquidity_tool.execute({
"analysis_type": ["market_liquidity"]
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, liquidity_tool):
"""잘못된 파라미터 테스트"""
# 빈 분석 타입
with pytest.raises(ValueError, match="At least one analysis type"):
await liquidity_tool.execute({
"analysis_type": []
})
# 잘못된 분석 타입
with pytest.raises(ValueError, match="Invalid analysis type"):
await liquidity_tool.execute({
"analysis_type": ["invalid_analysis"]
})
# 잘못된 시간 기간
with pytest.raises(ValueError, match="Invalid time period"):
await liquidity_tool.execute({
"analysis_type": ["market_liquidity"],
"time_period": "invalid_period"
})
@pytest.mark.asyncio
async def test_insufficient_data_handling(self, liquidity_tool):
"""데이터 부족 처리 테스트"""
# 데이터가 부족한 경우
insufficient_data = [
{
"date": datetime.now().date(),
"market": "KOSPI",
"daily_volume": 100000000000,
"bid_ask_spread": 0.02
}
]
liquidity_tool.cache_manager.get.return_value = None
liquidity_tool.db_manager.fetch_all.return_value = insufficient_data
result = await liquidity_tool.execute({
"analysis_type": ["market_liquidity"],
"time_period": "30d"
})
content = result[0]
import json
data = json.loads(content.text)
assert "warning" in data or "insufficient data" in str(data).lower()
def test_market_stress_detection(self, liquidity_tool):
"""시장 스트레스 탐지 테스트"""
# 스트레스 상황 데이터
stress_data = {
"bid_ask_spread": 0.08, # 매우 높은 스프레드
"market_depth": 200000000, # 매우 낮은 깊이
"volume": 1000000000000, # 비정상적 높은 거래량
"price_impact": 0.02, # 높은 가격 충격
"volatility": 0.05 # 높은 변동성
}
stress_indicators = liquidity_tool._detect_market_stress(stress_data)
assert "stress_level" in stress_indicators
assert "stress_factors" in stress_indicators
assert "liquidity_crunch_risk" in stress_indicators
# 스트레스 상황이므로 높은 스트레스 레벨
assert stress_indicators["stress_level"] in ["medium", "high"]
assert len(stress_indicators["stress_factors"]) > 0