test_money_flow_tools.py•27.3 kB
"""자금 흐름 분석 도구 테스트"""
import pytest
import random
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock
from src.tools.money_flow_tools import MoneyFlowAnalysisTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestMoneyFlowAnalysisTool:
"""자금 흐름 분석 도구 테스트"""
@pytest.fixture
def mock_db_manager(self):
"""Mock 데이터베이스 매니저"""
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock 캐시 매니저"""
return AsyncMock()
@pytest.fixture
def money_flow_tool(self, mock_db_manager, mock_cache_manager):
"""자금 흐름 분석 도구 인스턴스"""
return MoneyFlowAnalysisTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_money_flow_data(self):
"""샘플 자금 흐름 데이터"""
base_date = datetime.now().date()
data = []
sectors = ["IT", "금융", "화학", "바이오", "자동차", "건설", "유통", "통신"]
for i in range(30): # 30일 데이터
date = base_date - timedelta(days=i)
for sector in sectors:
# 섹터별 자금 흐름 시뮬레이션
if sector == "IT":
# IT 섹터는 외국인 유입, 개인 유출 패턴
individual_flow = random.gauss(-500000000, 200000000) # 개인 유출
foreign_flow = random.gauss(800000000, 300000000) # 외국인 유입
institutional_flow = random.gauss(100000000, 400000000) # 기관 약간 유입
elif sector == "금융":
# 금융 섹터는 기관 유입 중심
individual_flow = random.gauss(-200000000, 150000000)
foreign_flow = random.gauss(200000000, 200000000)
institutional_flow = random.gauss(600000000, 300000000)
else:
# 기타 섹터는 혼재
individual_flow = random.gauss(0, 300000000)
foreign_flow = random.gauss(0, 200000000)
institutional_flow = random.gauss(0, 250000000)
data.append({
"date": date,
"sector": sector,
"individual_net_flow": individual_flow,
"foreign_net_flow": foreign_flow,
"institutional_net_flow": institutional_flow,
"pension_net_flow": random.gauss(50000000, 100000000),
"bank_net_flow": random.gauss(-20000000, 80000000),
"insurance_net_flow": random.gauss(30000000, 120000000),
"investment_trust_net_flow": random.gauss(0, 150000000),
"private_equity_net_flow": random.gauss(10000000, 50000000),
"total_transaction_amount": random.randint(5000000000, 50000000000),
"market_cap_change": random.gauss(0.005, 0.02)
})
return data
@pytest.fixture
def sample_stock_style_data(self):
"""샘플 스타일별 자금 흐름 데이터"""
base_date = datetime.now().date()
data = []
styles = ["large_cap", "mid_cap", "small_cap", "growth", "value", "dividend"]
for i in range(20):
date = base_date - timedelta(days=i)
for style in styles:
# 스타일별 특성 반영
if style == "large_cap":
foreign_preference = 0.7 # 외국인은 대형주 선호
individual_preference = 0.3 # 개인은 대형주 덜 선호
elif style == "small_cap":
foreign_preference = 0.2 # 외국인은 소형주 비선호
individual_preference = 0.8 # 개인은 소형주 선호
elif style == "growth":
foreign_preference = 0.8 # 외국인은 성장주 선호
individual_preference = 0.6
else:
foreign_preference = 0.5
individual_preference = 0.5
base_flow = 1000000000
foreign_flow = base_flow * foreign_preference * random.gauss(1, 0.3)
individual_flow = base_flow * individual_preference * random.gauss(1, 0.3)
institutional_flow = base_flow * 0.6 * random.gauss(1, 0.2)
data.append({
"date": date,
"style": style,
"individual_net_flow": individual_flow,
"foreign_net_flow": foreign_flow,
"institutional_net_flow": institutional_flow,
"total_market_cap": random.randint(100000000000000, 500000000000000),
"performance": random.gauss(0.002, 0.015)
})
return data
def test_tool_initialization(self, money_flow_tool, mock_db_manager, mock_cache_manager):
"""도구 초기화 테스트"""
assert money_flow_tool.name == "get_money_flow"
assert money_flow_tool.description is not None
assert "자금" in money_flow_tool.description or "money" in money_flow_tool.description.lower()
assert money_flow_tool.db_manager == mock_db_manager
assert money_flow_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, money_flow_tool):
"""도구 정의 테스트"""
definition = money_flow_tool.get_tool_definition()
assert definition.name == "get_money_flow"
assert definition.description is not None
assert definition.inputSchema is not None
# 입력 스키마 검증
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "analysis_type" in properties
assert "sectors" in properties
assert "time_period" in properties
assert "include_foreign_flow" in properties
# analysis_type 파라미터 검증
analysis_prop = properties["analysis_type"]
assert analysis_prop["type"] == "array"
assert "sector_rotation" in str(analysis_prop)
assert "style_preference" in str(analysis_prop)
assert "foreign_flow" in str(analysis_prop)
@pytest.mark.asyncio
async def test_execute_sector_rotation_analysis(self, money_flow_tool, sample_money_flow_data):
"""섹터 로테이션 분석 테스트"""
# 캐시 미스
money_flow_tool.cache_manager.get.return_value = None
# 데이터베이스 응답 설정
money_flow_tool.db_manager.fetch_all.return_value = sample_money_flow_data
# 실행
result = await money_flow_tool.execute({
"analysis_type": ["sector_rotation"],
"sectors": ["IT", "금융", "화학"],
"time_period": "30d"
})
# 결과 검증
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON 파싱하여 내용 확인
import json
data = json.loads(content.text)
assert "money_flow_analysis" in data
assert "sector_rotation" in data["money_flow_analysis"]
# 섹터 로테이션 결과 검증
sector_rotation = data["money_flow_analysis"]["sector_rotation"]
assert "sector_flows" in sector_rotation
assert "rotation_patterns" in sector_rotation
assert "hot_sectors" in sector_rotation
assert "cold_sectors" in sector_rotation
# 개별 섹터 데이터 확인
sector_flows = sector_rotation["sector_flows"]
assert "IT" in sector_flows
assert "foreign_net_flow" in sector_flows["IT"]
assert "individual_net_flow" in sector_flows["IT"]
assert "institutional_net_flow" in sector_flows["IT"]
@pytest.mark.asyncio
async def test_execute_style_preference_analysis(self, money_flow_tool, sample_stock_style_data):
"""스타일 선호도 분석 테스트"""
money_flow_tool.cache_manager.get.return_value = None
money_flow_tool.db_manager.fetch_all.return_value = sample_stock_style_data
# 실행
result = await money_flow_tool.execute({
"analysis_type": ["style_preference"],
"time_period": "20d",
"include_performance_correlation": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "style_preference" in data["money_flow_analysis"]
style_analysis = data["money_flow_analysis"]["style_preference"]
assert "large_vs_small_cap" in style_analysis
assert "growth_vs_value" in style_analysis
assert "style_momentum" in style_analysis
# 대형주 vs 소형주 분석
large_vs_small = style_analysis["large_vs_small_cap"]
assert "large_cap_preference" in large_vs_small
assert "small_cap_preference" in large_vs_small
assert "preference_ratio" in large_vs_small
@pytest.mark.asyncio
async def test_execute_foreign_flow_analysis(self, money_flow_tool, sample_money_flow_data):
"""외국인 자금 흐름 분석 테스트"""
money_flow_tool.cache_manager.get.return_value = None
money_flow_tool.db_manager.fetch_all.return_value = sample_money_flow_data
# 실행
result = await money_flow_tool.execute({
"analysis_type": ["foreign_flow"],
"time_period": "30d",
"include_currency_impact": True,
"include_global_comparison": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "foreign_flow" in data["money_flow_analysis"]
foreign_analysis = data["money_flow_analysis"]["foreign_flow"]
assert "total_foreign_flow" in foreign_analysis
assert "sector_preferences" in foreign_analysis
assert "flow_trends" in foreign_analysis
assert "currency_impact" in foreign_analysis
assert "global_comparison" in foreign_analysis
@pytest.mark.asyncio
async def test_comprehensive_money_flow_analysis(self, money_flow_tool, sample_money_flow_data):
"""종합 자금 흐름 분석 테스트"""
money_flow_tool.cache_manager.get.return_value = None
money_flow_tool.db_manager.fetch_all.return_value = sample_money_flow_data
# 실행
result = await money_flow_tool.execute({
"analysis_type": ["sector_rotation", "style_preference", "foreign_flow", "institutional_flow"],
"sectors": ["ALL"],
"time_period": "60d",
"include_predictions": True,
"include_risk_assessment": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
analysis = data["money_flow_analysis"]
assert "sector_rotation" in analysis
assert "style_preference" in analysis
assert "foreign_flow" in analysis
assert "institutional_flow" in analysis
# 예측 및 리스크 평가
assert "predictions" in data
assert "risk_assessment" in data
predictions = data["predictions"]
assert "sector_flow_forecast" in predictions
assert "style_trend_forecast" in predictions
risk_assessment = data["risk_assessment"]
assert "concentration_risk" in risk_assessment
assert "flow_volatility" in risk_assessment
def test_sector_flow_calculation(self, money_flow_tool):
"""섹터별 자금 흐름 계산 테스트"""
# 테스트 데이터
sector_data = [
{"sector": "IT", "individual_net_flow": -500000000, "foreign_net_flow": 800000000, "institutional_net_flow": 200000000},
{"sector": "IT", "individual_net_flow": -600000000, "foreign_net_flow": 900000000, "institutional_net_flow": 150000000},
{"sector": "금융", "individual_net_flow": -200000000, "foreign_net_flow": 300000000, "institutional_net_flow": 600000000},
]
flows = money_flow_tool._calculate_sector_flows(sector_data)
assert "IT" in flows
assert "금융" in flows
it_flow = flows["IT"]
assert "total_net_flow" in it_flow
assert "individual_net_flow" in it_flow
assert "foreign_net_flow" in it_flow
assert "institutional_net_flow" in it_flow
# IT 섹터 계산 확인
assert it_flow["individual_net_flow"] == -1100000000 # -500M + -600M
assert it_flow["foreign_net_flow"] == 1700000000 # 800M + 900M
assert it_flow["institutional_net_flow"] == 350000000 # 200M + 150M
def test_rotation_pattern_detection(self, money_flow_tool):
"""로테이션 패턴 탐지 테스트"""
# 시계열 섹터 자금 흐름 데이터
time_series_data = [
{"date": "2025-01-20", "sector": "IT", "total_net_flow": 1000000000},
{"date": "2025-01-19", "sector": "IT", "total_net_flow": 800000000},
{"date": "2025-01-18", "sector": "IT", "total_net_flow": 600000000},
{"date": "2025-01-20", "sector": "금융", "total_net_flow": -500000000},
{"date": "2025-01-19", "sector": "금융", "total_net_flow": -300000000},
{"date": "2025-01-18", "sector": "금융", "total_net_flow": -100000000},
]
patterns = money_flow_tool._detect_rotation_patterns(time_series_data)
assert "inflow_sectors" in patterns
assert "outflow_sectors" in patterns
assert "rotation_strength" in patterns
# IT는 유입, 금융은 유출 패턴이어야 함
assert "IT" in patterns["inflow_sectors"]
assert "금융" in patterns["outflow_sectors"]
assert patterns["rotation_strength"] > 0
def test_style_preference_calculation(self, money_flow_tool):
"""스타일 선호도 계산 테스트"""
style_data = [
{"style": "large_cap", "foreign_net_flow": 2000000000, "individual_net_flow": 500000000},
{"style": "small_cap", "foreign_net_flow": 300000000, "individual_net_flow": 1500000000},
{"style": "growth", "foreign_net_flow": 1800000000, "individual_net_flow": 800000000},
{"style": "value", "foreign_net_flow": 700000000, "individual_net_flow": 1200000000},
]
preferences = money_flow_tool._calculate_style_preferences(style_data)
assert "large_vs_small_cap" in preferences
assert "growth_vs_value" in preferences
# 대형주 vs 소형주
large_vs_small = preferences["large_vs_small_cap"]
assert "foreign_preference" in large_vs_small
assert "individual_preference" in large_vs_small
# 외국인은 대형주 선호, 개인은 소형주 선호 패턴
assert large_vs_small["foreign_preference"] > large_vs_small["individual_preference"]
def test_foreign_flow_trend_analysis(self, money_flow_tool):
"""외국인 자금 흐름 트렌드 분석 테스트"""
# 시계열 외국인 자금 흐름 데이터
foreign_data = [
{"date": "2025-01-20", "foreign_net_flow": 1000000000},
{"date": "2025-01-19", "foreign_net_flow": 1200000000},
{"date": "2025-01-18", "foreign_net_flow": 800000000},
{"date": "2025-01-17", "foreign_net_flow": 600000000},
{"date": "2025-01-16", "foreign_net_flow": 400000000},
]
trends = money_flow_tool._analyze_foreign_flow_trends(foreign_data)
assert "trend_direction" in trends
assert "trend_strength" in trends
assert "volatility" in trends
assert "momentum" in trends
# 상승 추세여야 함
assert trends["trend_direction"] in ["상승", "증가", "up", "increasing"]
assert trends["trend_strength"] > 0
def test_institutional_flow_breakdown(self, money_flow_tool):
"""기관별 자금 흐름 분석 테스트"""
institutional_data = [
{
"pension_net_flow": 500000000,
"bank_net_flow": -200000000,
"insurance_net_flow": 300000000,
"investment_trust_net_flow": 150000000,
"private_equity_net_flow": 50000000
}
]
breakdown = money_flow_tool._analyze_institutional_breakdown(institutional_data)
assert "pension_flow" in breakdown
assert "bank_flow" in breakdown
assert "insurance_flow" in breakdown
assert "investment_trust_flow" in breakdown
assert "private_equity_flow" in breakdown
assert "total_institutional_flow" in breakdown
# 총 기관 자금 흐름 계산 확인
expected_total = 500000000 - 200000000 + 300000000 + 150000000 + 50000000
assert breakdown["total_institutional_flow"] == expected_total
def test_flow_concentration_analysis(self, money_flow_tool):
"""자금 흐름 집중도 분석 테스트"""
flow_data = [
{"sector": "IT", "total_net_flow": 5000000000},
{"sector": "금융", "total_net_flow": 1000000000},
{"sector": "화학", "total_net_flow": 500000000},
{"sector": "바이오", "total_net_flow": 200000000},
{"sector": "기타", "total_net_flow": 100000000},
]
concentration = money_flow_tool._analyze_flow_concentration(flow_data)
assert "herfindahl_index" in concentration
assert "top3_concentration" in concentration
assert "concentration_level" in concentration
# HHI 계산 확인 (집중도가 높을수록 큰 값)
assert concentration["herfindahl_index"] > 0
assert concentration["top3_concentration"] > 0.5 # 상위 3개 섹터가 50% 이상
assert concentration["concentration_level"] in ["low", "medium", "high"]
def test_flow_momentum_calculation(self, money_flow_tool):
"""자금 흐름 모멘텀 계산 테스트"""
# 가속하는 자금 흐름 패턴
momentum_data = [
{"date": "2025-01-20", "total_flow": 1000000000},
{"date": "2025-01-19", "total_flow": 800000000},
{"date": "2025-01-18", "total_flow": 600000000},
{"date": "2025-01-17", "total_flow": 300000000},
{"date": "2025-01-16", "total_flow": 100000000},
]
momentum = money_flow_tool._calculate_flow_momentum(momentum_data)
assert "momentum_score" in momentum
assert "acceleration" in momentum
assert "momentum_direction" in momentum
# 가속하는 상승 모멘텀이어야 함
assert momentum["momentum_score"] > 0
assert momentum["acceleration"] > 0
assert momentum["momentum_direction"] in ["상승", "증가", "up"]
def test_currency_impact_analysis(self, money_flow_tool):
"""환율 영향 분석 테스트"""
# 환율과 외국인 자금 흐름 데이터
currency_data = [
{"date": "2025-01-20", "usd_krw": 1350, "foreign_flow": 1000000000},
{"date": "2025-01-19", "usd_krw": 1360, "foreign_flow": 800000000},
{"date": "2025-01-18", "usd_krw": 1370, "foreign_flow": 600000000},
{"date": "2025-01-17", "usd_krw": 1380, "foreign_flow": 400000000},
]
impact = money_flow_tool._analyze_currency_impact(currency_data)
assert "correlation" in impact
assert "sensitivity" in impact
assert "current_impact" in impact
# 원화 강세(환율 하락)와 외국인 유입의 상관관계
assert -1 <= impact["correlation"] <= 1
assert impact["sensitivity"] >= 0
@pytest.mark.asyncio
async def test_flow_prediction(self, money_flow_tool):
"""자금 흐름 예측 테스트"""
# 시계열 자금 흐름 데이터
historical_data = []
for i in range(30):
historical_data.append({
"date": (datetime.now() - timedelta(days=i)).date(),
"sector": "IT",
"total_net_flow": 1000000000 + i * 50000000 + random.gauss(0, 100000000)
})
predictions = await money_flow_tool._predict_future_flows(historical_data, forecast_days=5)
assert "sector_forecasts" in predictions
assert "confidence_intervals" in predictions
assert "trend_continuation" in predictions
sector_forecast = predictions["sector_forecasts"]
assert "IT" in sector_forecast
assert len(sector_forecast["IT"]) == 5 # 5일 예측
@pytest.mark.asyncio
async def test_risk_assessment(self, money_flow_tool):
"""리스크 평가 테스트"""
# 리스크 평가용 자금 흐름 데이터
risk_data = {
"concentration": {"herfindahl_index": 0.3, "top3_concentration": 0.7},
"volatility": {"daily_std": 500000000, "monthly_std": 2000000000},
"foreign_dependency": {"foreign_ratio": 0.4, "stability": 0.6}
}
risk_assessment = await money_flow_tool._assess_flow_risks(risk_data)
assert "overall_risk_level" in risk_assessment
assert "risk_factors" in risk_assessment
assert "mitigation_suggestions" in risk_assessment
assert risk_assessment["overall_risk_level"] in ["low", "medium", "high"]
assert len(risk_assessment["risk_factors"]) > 0
@pytest.mark.asyncio
async def test_cache_functionality(self, money_flow_tool):
"""캐시 기능 테스트"""
# 캐시 히트 시나리오
cached_data = {
"timestamp": datetime.now().isoformat(),
"money_flow_analysis": {
"sector_rotation": {"hot_sectors": ["IT", "바이오"]}
}
}
money_flow_tool.cache_manager.get.return_value = cached_data
# 실행
result = await money_flow_tool.execute({
"analysis_type": ["sector_rotation"],
"time_period": "30d"
})
# 캐시에서 데이터 반환 확인
content = result[0]
import json
data = json.loads(content.text)
assert data["money_flow_analysis"]["sector_rotation"]["hot_sectors"] == ["IT", "바이오"]
# 데이터베이스 호출 없음 확인
money_flow_tool.db_manager.fetch_all.assert_not_called()
@pytest.mark.asyncio
async def test_error_handling(self, money_flow_tool):
"""에러 처리 테스트"""
money_flow_tool.cache_manager.get.return_value = None
money_flow_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB 연결 실패")
with pytest.raises(DatabaseConnectionError):
await money_flow_tool.execute({
"analysis_type": ["sector_rotation"]
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, money_flow_tool):
"""잘못된 파라미터 테스트"""
# 빈 분석 타입
with pytest.raises(ValueError, match="At least one analysis type"):
await money_flow_tool.execute({
"analysis_type": []
})
# 잘못된 분석 타입
with pytest.raises(ValueError, match="Invalid analysis type"):
await money_flow_tool.execute({
"analysis_type": ["invalid_type"]
})
# 잘못된 시간 기간
with pytest.raises(ValueError, match="Invalid time period"):
await money_flow_tool.execute({
"analysis_type": ["sector_rotation"],
"time_period": "invalid_period"
})
@pytest.mark.asyncio
async def test_insufficient_data_handling(self, money_flow_tool):
"""데이터 부족 처리 테스트"""
# 데이터가 부족한 경우
insufficient_data = [
{
"date": datetime.now().date(),
"sector": "IT",
"individual_net_flow": 100000000,
"foreign_net_flow": 200000000,
"institutional_net_flow": 150000000
}
]
money_flow_tool.cache_manager.get.return_value = None
money_flow_tool.db_manager.fetch_all.return_value = insufficient_data
result = await money_flow_tool.execute({
"analysis_type": ["sector_rotation"],
"time_period": "30d"
})
content = result[0]
import json
data = json.loads(content.text)
assert "warning" in data or "insufficient data" in str(data).lower()
def test_etf_flow_analysis(self, money_flow_tool):
"""ETF 자금 흐름 분석 테스트"""
etf_data = [
{"etf_name": "KODEX 200", "net_flow": 500000000, "assets": 10000000000000},
{"etf_name": "TIGER IT", "net_flow": 800000000, "assets": 3000000000000},
{"etf_name": "ARIRANG 금융", "net_flow": -200000000, "assets": 1500000000000},
]
etf_analysis = money_flow_tool._analyze_etf_flows(etf_data)
assert "total_etf_flow" in etf_analysis
assert "sector_preferences" in etf_analysis
assert "flow_patterns" in etf_analysis
# ETF를 통한 섹터 선호도 확인
sector_prefs = etf_analysis["sector_preferences"]
assert "IT" in str(sector_prefs) or "tech" in str(sector_prefs).lower()
def test_margin_trading_impact(self, money_flow_tool):
"""신용거래 영향 분석 테스트"""
margin_data = [
{"date": "2025-01-20", "margin_buy": 2000000000, "margin_sell": 1500000000},
{"date": "2025-01-19", "margin_buy": 1800000000, "margin_sell": 1600000000},
{"date": "2025-01-18", "margin_buy": 1600000000, "margin_sell": 1700000000},
]
margin_impact = money_flow_tool._analyze_margin_trading_impact(margin_data)
assert "net_margin_flow" in margin_impact
assert "margin_trend" in margin_impact
assert "leverage_effect" in margin_impact
# 최근 신용매수 증가 패턴 확인
assert margin_impact["margin_trend"] in ["증가", "감소", "보합", "increasing", "decreasing", "stable"]