investor_tools.pyβ’15.5 kB
"""ν¬μμ λν₯ λΆμ λꡬ"""
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class InvestorFlowsTool(BaseTool):
"""ν¬μμλ³ λ§€λ§€ λν₯μ λΆμνλ λꡬ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5λΆ
@property
def name(self) -> str:
return "get_investor_flows"
@property
def description(self) -> str:
return "ν¬μμλ³(κ°μΈ, κΈ°κ΄, μΈκ΅μΈ) λ§€λ§€ λν₯κ³Ό μκΈ νλ¦μ λΆμν©λλ€."
def get_tool_definition(self) -> ToolSchema:
"""λꡬ μ μ λ°ν"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "KOSPI",
"description": "μ‘°νν μμ₯"
},
"period": {
"type": "string",
"enum": ["1d", "1w", "1m", "3m"],
"default": "1d",
"description": "μ‘°ν κΈ°κ° (1d: 1μΌ, 1w: 1μ£Ό, 1m: 1κ°μ, 3m: 3κ°μ)"
},
"investor_type": {
"type": "string",
"enum": ["κ°μΈ", "κΈ°κ΄", "μΈκ΅μΈ", "ALL"],
"default": "ALL",
"description": "μ‘°νν ν¬μμ μ ν"
},
"include_market_impact": {
"type": "boolean",
"default": False,
"description": "μμ₯ μν₯λ λΆμ ν¬ν¨ μ¬λΆ"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""ν¬μμ λν₯ λ°μ΄ν° μ‘°ν λ° λΆμ"""
try:
# νλΌλ―Έν° μΆμΆ λ° κ²μ¦
market = arguments.get("market", "KOSPI")
period = arguments.get("period", "1d")
investor_type = arguments.get("investor_type", "ALL")
include_market_impact = arguments.get("include_market_impact", False)
self._validate_parameters(market, period, investor_type)
# μΊμ νμΈ
cache_key = self._generate_cache_key(market, period, investor_type, include_market_impact)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# λ°μ΄ν°λ² μ΄μ€μμ λ°μ΄ν° μ‘°ν
data = await self._fetch_investor_data(market, period, investor_type, include_market_impact)
# μΊμ μ μ₯
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Investor flows data fetched for {market}/{period}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in investor flows tool: {e}")
raise
def _validate_parameters(self, market: str, period: str, investor_type: str):
"""νλΌλ―Έν° κ²μ¦"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
valid_periods = ["1d", "1w", "1m", "3m"]
valid_investor_types = ["κ°μΈ", "κΈ°κ΄", "μΈκ΅μΈ", "ALL"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if period not in valid_periods:
raise ValueError(f"Invalid period: {period}")
if investor_type not in valid_investor_types:
raise ValueError(f"Invalid investor_type: {investor_type}")
def _generate_cache_key(self, market: str, period: str, investor_type: str, include_impact: bool) -> str:
"""μΊμ ν€ μμ±"""
return f"investor_flows:{market}:{period}:{investor_type}:{include_impact}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""λ°μ΄ν° μ μ λ νμΈ"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_investor_data(self, market: str, period: str, investor_type: str, include_impact: bool) -> Dict[str, Any]:
"""λ°μ΄ν°λ² μ΄μ€μμ ν¬μμ λν₯ λ°μ΄ν° μ‘°ν"""
try:
# κΈ°κ° μ€μ
days = self._get_period_days(period)
# 쿼리 ꡬμ±
query = """
SELECT date, investor_type, buy_amount, sell_amount, net_amount, market
FROM investor_flows
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
# μμ₯ νν°
if market != "ALL":
query += " AND market = %s"
params.append(market)
# ν¬μμ μ ν νν°
if investor_type != "ALL":
query += " AND investor_type = %s"
params.append(investor_type)
query += " ORDER BY date DESC, investor_type"
# λ°μ΄ν° μ‘°ν
flows_data = await self.db_manager.fetch_all(query, *params)
# κ²°κ³Ό ꡬμ±
result = {
"timestamp": datetime.now().isoformat(),
"period": period,
"market": market,
"investor_type": investor_type,
"flows": [self._format_flow_data(flow) for flow in flows_data],
"summary": self._calculate_summary(flows_data)
}
# λΉ λ°μ΄ν° μ²λ¦¬
if not flows_data:
result["message"] = "μμ²ν κΈ°κ°μ ν¬μμ λν₯ λ°μ΄ν°κ° μμ΅λλ€"
result["flows"] = []
# λμ νλ‘μ° κ³μ° (μ£Όκ°/μκ° κΈ°κ°μ κ²½μ°)
if period in ["1w", "1m", "3m"] and flows_data:
result["cumulative_flows"] = self._calculate_cumulative_flows(flows_data)
# μμ₯ μν₯λ λΆμ ν¬ν¨
if include_impact and flows_data:
impact_data = await self._analyze_market_impact(market, period, flows_data)
result["market_impact"] = impact_data
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch investor data: {e}")
def _get_period_days(self, period: str) -> int:
"""κΈ°κ°μ μΌμλ‘ λ³ν"""
period_map = {
"1d": 1,
"1w": 7,
"1m": 30,
"3m": 90
}
return period_map.get(period, 1)
def _format_flow_data(self, flow: Dict[str, Any]) -> Dict[str, Any]:
"""νλ‘μ° λ°μ΄ν° ν¬λ§·ν
"""
net_amount = flow.get("net_amount", 0)
return {
"date": flow.get("date").isoformat() if flow.get("date") else None,
"investor_type": flow.get("investor_type"),
"buy_amount": int(flow.get("buy_amount", 0)),
"sell_amount": int(flow.get("sell_amount", 0)),
"net_amount": int(net_amount),
"net_amount_formatted": self._format_amount(net_amount),
"market": flow.get("market")
}
def _format_amount(self, amount: int) -> str:
"""κΈμ‘ ν¬λ§·ν
(μ‘°, μ΅ λ¨μ)"""
if amount == 0:
return "0μ"
abs_amount = abs(amount)
sign = "-" if amount < 0 else ""
if abs_amount >= 1000000000000: # 1μ‘° μ΄μ
trillion = abs_amount / 1000000000000
if trillion == int(trillion):
return f"{sign}{int(trillion)}μ‘°"
else:
return f"{sign}{trillion:.1f}μ‘°"
elif abs_amount >= 100000000: # 1μ΅ μ΄μ
hundred_million = abs_amount / 100000000
if hundred_million >= 1000: # 1000μ΅ μ΄μμ μ½€λ§ νμ
return f"{sign}{int(hundred_million):,}μ΅"
else:
return f"{sign}{int(hundred_million)}μ΅"
else:
return f"{sign}{int(abs_amount):,}μ"
def _calculate_summary(self, flows_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""ν¬μμλ³ μλ§€λ§€ μμ½ κ³μ°"""
summary = {
"net_individual": 0,
"net_foreign": 0,
"net_institution": 0,
"total_trading_value": 0
}
for flow in flows_data:
investor_type = flow.get("investor_type", "")
net_amount = flow.get("net_amount", 0)
buy_amount = flow.get("buy_amount", 0)
sell_amount = flow.get("sell_amount", 0)
if investor_type == "κ°μΈ":
summary["net_individual"] += net_amount
elif investor_type == "μΈκ΅μΈ":
summary["net_foreign"] += net_amount
elif investor_type == "κΈ°κ΄":
summary["net_institution"] += net_amount
summary["total_trading_value"] += buy_amount + sell_amount
# ν¬λ§·ν
λ λ²μ λ μΆκ°
summary["net_individual_formatted"] = self._format_amount(summary["net_individual"])
summary["net_foreign_formatted"] = self._format_amount(summary["net_foreign"])
summary["net_institution_formatted"] = self._format_amount(summary["net_institution"])
return summary
def _calculate_cumulative_flows(self, flows_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""λμ νλ‘μ° κ³μ°"""
# λ μ§λ³λ‘ κ·Έλ£Ήν
daily_flows = {}
for flow in flows_data:
date = flow.get("date")
if date:
date_str = date.isoformat() if hasattr(date, 'isoformat') else str(date)
if date_str not in daily_flows:
daily_flows[date_str] = 0
daily_flows[date_str] += flow.get("net_amount", 0)
# λ μ§ μμΌλ‘ μ λ ¬νκ³ λμ κ³μ°
sorted_dates = sorted(daily_flows.keys())
cumulative = 0
cumulative_flows = []
for date_str in sorted_dates:
daily_net = daily_flows[date_str]
cumulative += daily_net
cumulative_flows.append({
"date": date_str,
"daily_net": daily_net,
"cumulative_net": cumulative,
"daily_net_formatted": self._format_amount(daily_net),
"cumulative_net_formatted": self._format_amount(cumulative)
})
return cumulative_flows
async def _analyze_market_impact(self, market: str, period: str, flows_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""μμ₯ μν₯λ λΆμ"""
try:
# λμΌ κΈ°κ° μμ₯ μ§μ λ³ν μ‘°ν
days = self._get_period_days(period)
index_data = await self.db_manager.fetch_one(
"""
SELECT
(SELECT current_value FROM market_indices WHERE index_code = %s ORDER BY timestamp DESC LIMIT 1) -
(SELECT current_value FROM market_indices WHERE index_code = %s AND timestamp <= CURRENT_TIMESTAMP - INTERVAL '%s days' ORDER BY timestamp DESC LIMIT 1) as index_change,
((SELECT current_value FROM market_indices WHERE index_code = %s ORDER BY timestamp DESC LIMIT 1) /
(SELECT current_value FROM market_indices WHERE index_code = %s AND timestamp <= CURRENT_TIMESTAMP - INTERVAL '%s days' ORDER BY timestamp DESC LIMIT 1) - 1) * 100 as index_change_rate
""",
market if market != "ALL" else "KOSPI",
market if market != "ALL" else "KOSPI",
days,
market if market != "ALL" else "KOSPI",
market if market != "ALL" else "KOSPI",
days
)
if not index_data:
return {"error": "μμ₯ μ§μ λ°μ΄ν°λ₯Ό μ°Ύμ μ μμ΅λλ€"}
# ν¬μμλ³ μλ§€λ§€μ μ§μ λ³ν μκ΄κ΄κ³ λΆμ
summary = self._calculate_summary(flows_data)
index_change_rate = index_data.get("index_change_rate", 0)
# λ¨μ μκ΄κ΄κ³ κ³μ° (μ€μ λ‘λ λ 볡μ‘ν ν΅κ³μ λΆμμ΄ νμ)
foreign_correlation = -1 if (summary["net_foreign"] < 0 and index_change_rate > 0) or \
(summary["net_foreign"] > 0 and index_change_rate < 0) else 1
individual_correlation = 1 if (summary["net_individual"] > 0 and index_change_rate > 0) or \
(summary["net_individual"] < 0 and index_change_rate < 0) else -1
return {
"index_change": float(index_data.get("index_change", 0)),
"index_change_rate": float(index_change_rate),
"correlation_analysis": {
"foreign_vs_index": foreign_correlation,
"individual_vs_index": individual_correlation,
"interpretation": self._interpret_correlation(foreign_correlation, individual_correlation)
}
}
except Exception as e:
self.logger.warning(f"Market impact analysis failed: {e}")
return {"error": f"μμ₯ μν₯λ λΆμ μ€ μ€λ₯ λ°μ: {e}"}
def _interpret_correlation(self, foreign_corr: float, individual_corr: float) -> str:
"""μκ΄κ΄κ³ ν΄μ"""
interpretations = []
if foreign_corr < 0:
interpretations.append("μΈκ΅μΈ μκΈμ΄ μ§μμ λ°λ λ°©ν₯μΌλ‘ μμ§μ")
else:
interpretations.append("μΈκ΅μΈ μκΈμ΄ μ§μμ κ°μ λ°©ν₯μΌλ‘ μμ§μ")
if individual_corr > 0:
interpretations.append("κ°μΈ ν¬μμκ° μ§μμ κ°μ λ°©ν₯μΌλ‘ μμ§μ")
else:
interpretations.append("κ°μΈ ν¬μμκ° μ§μμ λ°λ λ°©ν₯μΌλ‘ μμ§μ")
return "; ".join(interpretations)