test_investor_tools.pyβ’14.4 kB
"""ν¬μμ λν₯ λꡬ ν
μ€νΈ"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import AsyncMock
from src.tools.investor_tools import InvestorFlowsTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestInvestorFlowsTool:
"""ν¬μμ λν₯ λꡬ ν
μ€νΈ"""
@pytest.fixture
def mock_db_manager(self):
"""Mock λ°μ΄ν°λ² μ΄μ€ λ§€λμ """
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock μΊμ λ§€λμ """
return AsyncMock()
@pytest.fixture
def investor_tool(self, mock_db_manager, mock_cache_manager):
"""ν¬μμ λν₯ λꡬ μΈμ€ν΄μ€"""
return InvestorFlowsTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_investor_data(self):
"""μν ν¬μμ λ°μ΄ν°"""
return [
{
"date": datetime.now().date(),
"investor_type": "κ°μΈ",
"buy_amount": 1500000000000, # 1.5μ‘°
"sell_amount": 1200000000000, # 1.2μ‘°
"net_amount": 300000000000, # 3μ²μ΅ μλ§€μ
"market": "KOSPI"
},
{
"date": datetime.now().date(),
"investor_type": "μΈκ΅μΈ",
"buy_amount": 800000000000, # 8μ²μ΅
"sell_amount": 1100000000000, # 1.1μ‘°
"net_amount": -300000000000, # 3μ²μ΅ μλ§€λ
"market": "KOSPI"
},
{
"date": datetime.now().date(),
"investor_type": "κΈ°κ΄",
"buy_amount": 700000000000, # 7μ²μ΅
"sell_amount": 700000000000, # 7μ²μ΅
"net_amount": 0, # κ· ν
"market": "KOSPI"
}
]
def test_tool_initialization(self, investor_tool, mock_db_manager, mock_cache_manager):
"""λꡬ μ΄κΈ°ν ν
μ€νΈ"""
assert investor_tool.name == "get_investor_flows"
assert investor_tool.description is not None
assert "ν¬μμ" in investor_tool.description
assert investor_tool.db_manager == mock_db_manager
assert investor_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, investor_tool):
"""λꡬ μ μ ν
μ€νΈ"""
definition = investor_tool.get_tool_definition()
assert definition.name == "get_investor_flows"
assert definition.description is not None
assert definition.inputSchema is not None
# μ
λ ₯ μ€ν€λ§ κ²μ¦
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "market" in properties
assert "period" in properties
assert "investor_type" in properties
# market νλΌλ―Έν° κ²μ¦
market_prop = properties["market"]
assert market_prop["type"] == "string"
assert "KOSPI" in market_prop["enum"]
assert "KOSDAQ" in market_prop["enum"]
# period νλΌλ―Έν° κ²μ¦
period_prop = properties["period"]
assert period_prop["type"] == "string"
assert "1d" in period_prop["enum"]
assert "1w" in period_prop["enum"]
assert "1m" in period_prop["enum"]
@pytest.mark.asyncio
async def test_execute_daily_flows(self, investor_tool, sample_investor_data):
"""μΌμΌ ν¬μμ λν₯ μ‘°ν ν
μ€νΈ"""
# μΊμ λ―Έμ€
investor_tool.cache_manager.get.return_value = None
# λ°μ΄ν°λ² μ΄μ€ μλ΅ μ€μ
investor_tool.db_manager.fetch_all.return_value = sample_investor_data
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1d",
"investor_type": "ALL"
})
# κ²°κ³Ό κ²μ¦
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON νμ±νμ¬ λ΄μ© νμΈ
import json
data = json.loads(content.text)
assert "timestamp" in data
assert "period" in data
assert "market" in data
assert "flows" in data
assert "summary" in data
# νλ‘μ° λ°μ΄ν° κ²μ¦
flows = data["flows"]
assert len(flows) == 3
# κ°μΈ ν¬μμ λ°μ΄ν° νμΈ
individual = next((f for f in flows if f["investor_type"] == "κ°μΈ"), None)
assert individual is not None
assert individual["net_amount"] == 300000000000
assert individual["net_amount_formatted"] == "3,000μ΅"
# μΈκ΅μΈ λ°μ΄ν° νμΈ
foreign = next((f for f in flows if f["investor_type"] == "μΈκ΅μΈ"), None)
assert foreign is not None
assert foreign["net_amount"] == -300000000000
assert foreign["net_amount_formatted"] == "-3,000μ΅"
# μμ½ μ 보 κ²μ¦
summary = data["summary"]
assert "net_individual" in summary
assert "net_foreign" in summary
assert "net_institution" in summary
assert summary["net_individual"] == 300000000000
assert summary["net_foreign"] == -300000000000
assert summary["net_institution"] == 0
@pytest.mark.asyncio
async def test_execute_specific_investor_type(self, investor_tool, sample_investor_data):
"""νΉμ ν¬μμ μ ν μ‘°ν ν
μ€νΈ"""
# κ°μΈ ν¬μμλ§ νν°λ§
individual_data = [data for data in sample_investor_data if data["investor_type"] == "κ°μΈ"]
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.return_value = individual_data
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1d",
"investor_type": "κ°μΈ"
})
# κ²°κ³Ό κ²μ¦
content = result[0]
import json
data = json.loads(content.text)
flows = data["flows"]
assert len(flows) == 1
assert flows[0]["investor_type"] == "κ°μΈ"
# λ°μ΄ν°λ² μ΄μ€ 쿼리μ νν° μ‘°κ±΄ νμΈ
call_args = investor_tool.db_manager.fetch_all.call_args[0]
query = call_args[0]
params = call_args[1:]
assert "κ°μΈ" in params
@pytest.mark.asyncio
async def test_execute_weekly_period(self, investor_tool):
"""μ£Όκ° κΈ°κ° μ‘°ν ν
μ€νΈ"""
# μΌμ£ΌμΌ κ°μ λ°μ΄ν°
weekly_data = []
for i in range(7):
date = datetime.now().date() - timedelta(days=i)
weekly_data.append({
"date": date,
"investor_type": "κ°μΈ",
"buy_amount": 1000000000000,
"sell_amount": 900000000000,
"net_amount": 100000000000,
"market": "KOSPI"
})
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.return_value = weekly_data
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1w",
"investor_type": "κ°μΈ"
})
# κ²°κ³Ό κ²μ¦
content = result[0]
import json
data = json.loads(content.text)
assert data["period"] == "1w"
# λ°μ΄ν°λ² μ΄μ€ 쿼리μ λ μ§ λ²μ νμΈ
call_args = investor_tool.db_manager.fetch_all.call_args[0]
query = call_args[0].upper()
assert "DATE >=" in query or ">=" in query
assert "7" in str(call_args) or "INTERVAL" in query
@pytest.mark.asyncio
async def test_cumulative_flows_calculation(self, investor_tool):
"""λμ νλ‘μ° κ³μ° ν
μ€νΈ"""
# μ°μ 5μΌ λ°μ΄ν°
daily_flows = []
net_amounts = [100, -50, 200, -75, 150] # μ΅μ λ¨μ
for i, net in enumerate(net_amounts):
daily_flows.append({
"date": datetime.now().date() - timedelta(days=4-i),
"investor_type": "κ°μΈ",
"buy_amount": 1000000000000,
"sell_amount": 1000000000000 - (net * 100000000),
"net_amount": net * 100000000, # μ΅μ -> μ
"market": "KOSPI"
})
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.return_value = daily_flows
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1w",
"investor_type": "κ°μΈ"
})
# κ²°κ³Ό κ²μ¦
content = result[0]
import json
data = json.loads(content.text)
assert "cumulative_flows" in data
cumulative = data["cumulative_flows"]
assert len(cumulative) == 5
# λμ κ³μ° νμΈ (100, 50, 250, 175, 325)
expected_cumulative = [100, 50, 250, 175, 325]
for i, expected in enumerate(expected_cumulative):
assert cumulative[i]["cumulative_net"] == expected * 100000000
@pytest.mark.asyncio
async def test_cache_functionality(self, investor_tool, sample_investor_data):
"""μΊμ κΈ°λ₯ ν
μ€νΈ"""
# μΊμ ννΈ μλ리μ€
cached_data = {
"timestamp": datetime.now().isoformat(),
"period": "1d",
"market": "KOSPI",
"flows": []
}
investor_tool.cache_manager.get.return_value = cached_data
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1d"
})
# μΊμμμ λ°μ΄ν° λ°ν νμΈ
content = result[0]
import json
data = json.loads(content.text)
assert data == cached_data
# λ°μ΄ν°λ² μ΄μ€ νΈμΆ μμ νμΈ
investor_tool.db_manager.fetch_all.assert_not_called()
@pytest.mark.asyncio
async def test_error_handling(self, investor_tool):
"""μλ¬ μ²λ¦¬ ν
μ€νΈ"""
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB μ°κ²° μ€ν¨")
with pytest.raises(DatabaseConnectionError):
await investor_tool.execute({
"market": "KOSPI",
"period": "1d"
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, investor_tool):
"""μλͺ»λ νλΌλ―Έν° ν
μ€νΈ"""
# μλͺ»λ μμ₯
with pytest.raises(ValueError, match="Invalid market"):
await investor_tool.execute({
"market": "INVALID",
"period": "1d"
})
# μλͺ»λ κΈ°κ°
with pytest.raises(ValueError, match="Invalid period"):
await investor_tool.execute({
"market": "KOSPI",
"period": "invalid"
})
# μλͺ»λ ν¬μμ μ ν
with pytest.raises(ValueError, match="Invalid investor_type"):
await investor_tool.execute({
"market": "KOSPI",
"period": "1d",
"investor_type": "invalid"
})
@pytest.mark.asyncio
async def test_empty_data_handling(self, investor_tool):
"""λΉ λ°μ΄ν° μ²λ¦¬ ν
μ€νΈ"""
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.return_value = []
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1d"
})
content = result[0]
import json
data = json.loads(content.text)
assert data["flows"] == []
assert "message" in data
assert "λ°μ΄ν°κ° μμ΅λλ€" in data["message"] or "no data" in data["message"].lower()
def test_amount_formatting(self, investor_tool):
"""κΈμ‘ ν¬λ§·ν
ν
μ€νΈ"""
# μ‘° λ¨μ
formatted = investor_tool._format_amount(1500000000000)
assert formatted == "1.5μ‘°"
# μ²μ΅ λ¨μ
formatted = investor_tool._format_amount(300000000000)
assert formatted == "3,000μ΅"
# λ°±μ΅ λ¨μ
formatted = investor_tool._format_amount(50000000000)
assert formatted == "500μ΅"
# μμ
formatted = investor_tool._format_amount(-300000000000)
assert formatted == "-3,000μ΅"
# 0
formatted = investor_tool._format_amount(0)
assert formatted == "0μ"
@pytest.mark.asyncio
async def test_market_impact_analysis(self, investor_tool, sample_investor_data):
"""μμ₯ μν₯λ λΆμ ν
μ€νΈ"""
investor_tool.cache_manager.get.return_value = None
investor_tool.db_manager.fetch_all.return_value = sample_investor_data
# μμ₯ μ§μ λ³ν λ°μ΄ν° μΆκ° Mock
investor_tool.db_manager.fetch_one.return_value = {
"index_change": 15.30,
"index_change_rate": 0.58
}
# μ€ν
result = await investor_tool.execute({
"market": "KOSPI",
"period": "1d",
"include_market_impact": True
})
# κ²°κ³Ό κ²μ¦
content = result[0]
import json
data = json.loads(content.text)
assert "market_impact" in data
impact = data["market_impact"]
assert "index_change" in impact
assert "correlation_analysis" in impact
# μκ΄κ΄κ³ λΆμ νμΈ
correlation = impact["correlation_analysis"]
assert "foreign_vs_index" in correlation
# μΈκ΅μΈ μλ§€λ(-3μ²μ΅)μ μ§μ μμΉ(+0.58%) κ°μ μμ μκ΄κ΄κ³
assert correlation["foreign_vs_index"] < 0