money_flow_tools.py•49.9 kB
"""자금 흐름 분석 도구"""
import json
import logging
import math
import statistics
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class MoneyFlowAnalysisTool(BaseTool):
"""자금 흐름 분석 도구"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5분
@property
def name(self) -> str:
return "get_money_flow"
@property
def description(self) -> str:
return "자금 흐름을 분석합니다. 섹터별 로테이션, 스타일 선호도, 외국인 자금 흐름, 기관별 투자 패턴을 제공합니다."
def get_tool_definition(self) -> ToolSchema:
"""도구 정의 반환"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"analysis_type": {
"type": "array",
"items": {
"type": "string",
"enum": [
"sector_rotation",
"style_preference",
"foreign_flow",
"institutional_flow",
"etf_flow",
"margin_trading"
]
},
"minItems": 1,
"default": ["sector_rotation"],
"description": "분석 유형 목록"
},
"sectors": {
"type": "array",
"items": {
"type": "string"
},
"default": ["ALL"],
"description": "분석할 섹터 목록 (ALL은 전체 섹터)"
},
"time_period": {
"type": "string",
"enum": ["7d", "15d", "20d", "30d", "60d", "90d"],
"default": "30d",
"description": "분석 기간"
},
"include_foreign_flow": {
"type": "boolean",
"default": True,
"description": "외국인 자금 흐름 포함 여부"
},
"include_performance_correlation": {
"type": "boolean",
"default": False,
"description": "수익률 상관관계 분석 포함 여부"
},
"include_currency_impact": {
"type": "boolean",
"default": False,
"description": "환율 영향 분석 포함 여부"
},
"include_global_comparison": {
"type": "boolean",
"default": False,
"description": "글로벌 비교 분석 포함 여부"
},
"include_predictions": {
"type": "boolean",
"default": False,
"description": "자금 흐름 예측 포함 여부"
},
"include_risk_assessment": {
"type": "boolean",
"default": False,
"description": "리스크 평가 포함 여부"
},
"forecast_days": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 30,
"description": "예측 일수"
}
},
"required": ["analysis_type"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""자금 흐름 분석 실행"""
try:
# 파라미터 추출 및 검증
analysis_type = arguments.get("analysis_type", ["sector_rotation"])
sectors = arguments.get("sectors", ["ALL"])
time_period = arguments.get("time_period", "30d")
include_foreign_flow = arguments.get("include_foreign_flow", True)
include_performance_correlation = arguments.get("include_performance_correlation", False)
include_currency_impact = arguments.get("include_currency_impact", False)
include_global_comparison = arguments.get("include_global_comparison", False)
include_predictions = arguments.get("include_predictions", False)
include_risk_assessment = arguments.get("include_risk_assessment", False)
forecast_days = arguments.get("forecast_days", 5)
self._validate_parameters(analysis_type, time_period)
# 캐시 확인
cache_key = self._generate_cache_key(analysis_type, sectors, time_period)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# 데이터베이스에서 데이터 조회 및 분석
data = await self._fetch_money_flow_data(
analysis_type, sectors, time_period, include_foreign_flow,
include_performance_correlation, include_currency_impact,
include_global_comparison, include_predictions, include_risk_assessment,
forecast_days
)
# 캐시 저장
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Money flow analysis completed for {analysis_type}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in money flow analysis tool: {e}")
raise
def _validate_parameters(self, analysis_type: List[str], time_period: str):
"""파라미터 검증"""
if not analysis_type:
raise ValueError("At least one analysis type must be specified")
valid_types = ["sector_rotation", "style_preference", "foreign_flow",
"institutional_flow", "etf_flow", "margin_trading"]
for atype in analysis_type:
if atype not in valid_types:
raise ValueError(f"Invalid analysis type: {atype}")
valid_periods = ["7d", "15d", "20d", "30d", "60d", "90d"]
if time_period not in valid_periods:
raise ValueError(f"Invalid time period: {time_period}")
def _generate_cache_key(self, analysis_type: List[str], sectors: List[str],
time_period: str) -> str:
"""캐시 키 생성"""
types_str = "_".join(sorted(analysis_type))
sectors_str = "_".join(sorted(sectors))
return f"money_flow:{types_str}:{sectors_str}:{time_period}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""데이터 신선도 확인"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_money_flow_data(self, analysis_types: List[str], sectors: List[str],
time_period: str, include_foreign: bool,
include_performance: bool, include_currency: bool,
include_global: bool, include_predictions: bool,
include_risk: bool, forecast_days: int) -> Dict[str, Any]:
"""데이터베이스에서 자금 흐름 데이터 조회 및 분석"""
try:
days = self._get_period_days(time_period)
# 기본 결과 구성
result = {
"timestamp": datetime.now().isoformat(),
"analysis_period": time_period,
"sectors": sectors,
"money_flow_analysis": {}
}
# 데이터 충분성 확인을 위한 변수
total_data_points = 0
# 각 분석 타입별 실행
for analysis_type in analysis_types:
try:
if analysis_type == "sector_rotation":
sector_data = await self._fetch_sector_flow_data(sectors, days)
total_data_points += len(sector_data)
result["money_flow_analysis"]["sector_rotation"] = self._analyze_sector_rotation(sector_data)
elif analysis_type == "style_preference":
style_data = await self._fetch_style_flow_data(days)
total_data_points += len(style_data)
result["money_flow_analysis"]["style_preference"] = self._analyze_style_preference(style_data)
elif analysis_type == "foreign_flow":
foreign_data = await self._fetch_foreign_flow_data(sectors, days)
total_data_points += len(foreign_data)
result["money_flow_analysis"]["foreign_flow"] = self._analyze_foreign_flow(
foreign_data, include_currency, include_global
)
elif analysis_type == "institutional_flow":
institutional_data = await self._fetch_institutional_flow_data(sectors, days)
total_data_points += len(institutional_data)
result["money_flow_analysis"]["institutional_flow"] = self._analyze_institutional_flow(institutional_data)
elif analysis_type == "etf_flow":
etf_data = await self._fetch_etf_flow_data(days)
total_data_points += len(etf_data)
result["money_flow_analysis"]["etf_flow"] = self._analyze_etf_flows(etf_data)
elif analysis_type == "margin_trading":
margin_data = await self._fetch_margin_trading_data(days)
total_data_points += len(margin_data)
result["money_flow_analysis"]["margin_trading"] = self._analyze_margin_trading_impact(margin_data)
except Exception as e:
self.logger.warning(f"Failed to analyze {analysis_type}: {e}")
# Re-raise database connection errors
if isinstance(e, DatabaseConnectionError):
raise
result["money_flow_analysis"][analysis_type] = {
"error": f"Analysis failed: {str(e)}"
}
# 추가 분석
if include_predictions and result["money_flow_analysis"]:
historical_data = await self._fetch_historical_flow_data(sectors, days * 2)
result["predictions"] = await self._predict_future_flows(historical_data, forecast_days)
if include_risk and result["money_flow_analysis"]:
risk_data = self._extract_risk_data(result["money_flow_analysis"])
result["risk_assessment"] = await self._assess_flow_risks(risk_data)
# 데이터 부족 경고
if not result["money_flow_analysis"] or total_data_points < 5:
result["warning"] = "Insufficient data for money flow analysis"
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch money flow data: {e}")
def _get_period_days(self, period: str) -> int:
"""기간을 일수로 변환"""
period_map = {
"7d": 7,
"15d": 15,
"20d": 20,
"30d": 30,
"60d": 60,
"90d": 90
}
return period_map.get(period, 30)
async def _fetch_sector_flow_data(self, sectors: List[str], days: int) -> List[Dict[str, Any]]:
"""섹터별 자금 흐름 데이터 조회"""
query = """
SELECT date, sector,
individual_net_flow, foreign_net_flow, institutional_net_flow,
pension_net_flow, bank_net_flow, insurance_net_flow,
investment_trust_net_flow, private_equity_net_flow,
total_transaction_amount, market_cap_change
FROM sector_money_flow
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if "ALL" not in sectors:
query += " AND sector = ANY(%s)"
params.append(sectors)
query += " ORDER BY date DESC, sector"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_style_flow_data(self, days: int) -> List[Dict[str, Any]]:
"""스타일별 자금 흐름 데이터 조회"""
query = """
SELECT date, style,
individual_net_flow, foreign_net_flow, institutional_net_flow,
total_market_cap, performance
FROM style_money_flow
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC, style
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_foreign_flow_data(self, sectors: List[str], days: int) -> List[Dict[str, Any]]:
"""외국인 자금 흐름 데이터 조회"""
query = """
SELECT date, sector, foreign_net_flow,
foreign_buy_amount, foreign_sell_amount,
usd_krw_rate, market_performance
FROM foreign_money_flow
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if "ALL" not in sectors:
query += " AND sector = ANY(%s)"
params.append(sectors)
query += " ORDER BY date DESC, sector"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_institutional_flow_data(self, sectors: List[str], days: int) -> List[Dict[str, Any]]:
"""기관별 자금 흐름 데이터 조회"""
query = """
SELECT date, sector,
pension_net_flow, bank_net_flow, insurance_net_flow,
investment_trust_net_flow, private_equity_net_flow,
total_institutional_flow
FROM institutional_money_flow
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if "ALL" not in sectors:
query += " AND sector = ANY(%s)"
params.append(sectors)
query += " ORDER BY date DESC, sector"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_etf_flow_data(self, days: int) -> List[Dict[str, Any]]:
"""ETF 자금 흐름 데이터 조회"""
query = """
SELECT date, etf_name, sector_exposure,
net_flow, total_assets, creation_redemption
FROM etf_money_flow
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC, etf_name
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_margin_trading_data(self, days: int) -> List[Dict[str, Any]]:
"""신용거래 데이터 조회"""
query = """
SELECT date, margin_buy, margin_sell,
margin_balance, margin_ratio
FROM margin_trading_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_historical_flow_data(self, sectors: List[str], days: int) -> List[Dict[str, Any]]:
"""예측용 과거 자금 흐름 데이터 조회"""
return await self._fetch_sector_flow_data(sectors, days)
def _analyze_sector_rotation(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""섹터 로테이션 분석"""
if not sector_data:
return {"error": "No sector flow data available"}
# 섹터별 자금 흐름 계산
sector_flows = self._calculate_sector_flows(sector_data)
# 로테이션 패턴 탐지
rotation_patterns = self._detect_rotation_patterns(sector_data)
# 핫/콜드 섹터 분류
hot_sectors = []
cold_sectors = []
for sector, flow_data in sector_flows.items():
total_flow = flow_data.get("total_net_flow", 0)
if total_flow > 500000000: # 5억원 이상 유입
hot_sectors.append({"sector": sector, "net_flow": total_flow})
elif total_flow < -500000000: # 5억원 이상 유출
cold_sectors.append({"sector": sector, "net_flow": total_flow})
# 상위/하위 섹터 정렬
hot_sectors.sort(key=lambda x: x["net_flow"], reverse=True)
cold_sectors.sort(key=lambda x: x["net_flow"])
return {
"sector_flows": sector_flows,
"rotation_patterns": rotation_patterns,
"hot_sectors": [s["sector"] for s in hot_sectors[:5]],
"cold_sectors": [s["sector"] for s in cold_sectors[:5]],
"sector_performance": {
"top_inflow": hot_sectors[:3] if hot_sectors else [],
"top_outflow": cold_sectors[:3] if cold_sectors else []
}
}
def _calculate_sector_flows(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""섹터별 자금 흐름 계산"""
sector_flows = defaultdict(lambda: {
"individual_net_flow": 0,
"foreign_net_flow": 0,
"institutional_net_flow": 0,
"total_net_flow": 0
})
for item in sector_data:
sector = item.get("sector")
if not sector:
continue
individual = item.get("individual_net_flow", 0)
foreign = item.get("foreign_net_flow", 0)
institutional = item.get("institutional_net_flow", 0)
sector_flows[sector]["individual_net_flow"] += individual
sector_flows[sector]["foreign_net_flow"] += foreign
sector_flows[sector]["institutional_net_flow"] += institutional
sector_flows[sector]["total_net_flow"] += individual + foreign + institutional
return dict(sector_flows)
def _detect_rotation_patterns(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""로테이션 패턴 탐지"""
# 날짜별 섹터 흐름 집계
daily_flows = defaultdict(lambda: defaultdict(float))
for item in sector_data:
date = item.get("date")
sector = item.get("sector")
# total_net_flow가 있으면 사용, 없으면 계산
if "total_net_flow" in item:
total_flow = item["total_net_flow"]
else:
total_flow = (item.get("individual_net_flow", 0) +
item.get("foreign_net_flow", 0) +
item.get("institutional_net_flow", 0))
if date and sector:
daily_flows[str(date)][sector] += total_flow
# 최근 트렌드 분석 (최근 5일)
recent_dates = sorted(daily_flows.keys(), reverse=True)[:5]
sector_trends = defaultdict(list)
for date in recent_dates:
for sector, flow in daily_flows[date].items():
sector_trends[sector].append(flow)
# 유입/유출 섹터 분류
inflow_sectors = []
outflow_sectors = []
for sector, flows in sector_trends.items():
if len(flows) >= 1: # 최소 1개 데이터만 있어도 분석
avg_flow = sum(flows) / len(flows)
if avg_flow > 200000000: # 2억원 이상 평균 유입
inflow_sectors.append(sector)
elif avg_flow < -200000000: # 2억원 이상 평균 유출
outflow_sectors.append(sector)
# 로테이션 강도 계산
total_inflow = len(inflow_sectors)
total_outflow = len(outflow_sectors)
rotation_strength = min(total_inflow, total_outflow) / max(total_inflow + total_outflow, 1)
return {
"inflow_sectors": inflow_sectors,
"outflow_sectors": outflow_sectors,
"rotation_strength": round(rotation_strength, 3),
"pattern_type": "active" if rotation_strength > 0.3 else "stable"
}
def _analyze_style_preference(self, style_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""스타일 선호도 분석"""
if not style_data:
return {"error": "No style preference data available"}
# 스타일별 선호도 계산
style_preferences = self._calculate_style_preferences(style_data)
# 스타일 모멘텀 분석
style_momentum = self._analyze_style_momentum(style_data)
return {
"large_vs_small_cap": style_preferences.get("large_vs_small_cap", {}),
"growth_vs_value": style_preferences.get("growth_vs_value", {}),
"style_momentum": style_momentum,
"current_preferences": self._get_current_style_preferences(style_data)
}
def _calculate_style_preferences(self, style_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""스타일별 선호도 계산"""
style_flows = defaultdict(lambda: {
"foreign_flow": 0,
"individual_flow": 0,
"institutional_flow": 0
})
for item in style_data:
style = item.get("style")
if not style:
continue
style_flows[style]["foreign_flow"] += item.get("foreign_net_flow", 0)
style_flows[style]["individual_flow"] += item.get("individual_net_flow", 0)
style_flows[style]["institutional_flow"] += item.get("institutional_net_flow", 0)
# 대형주 vs 소형주
large_cap_flow = style_flows.get("large_cap", {})
small_cap_flow = style_flows.get("small_cap", {})
large_vs_small = {
"foreign_preference": self._calculate_preference_ratio(
large_cap_flow.get("foreign_flow", 0),
small_cap_flow.get("foreign_flow", 0)
),
"individual_preference": self._calculate_preference_ratio(
large_cap_flow.get("individual_flow", 0),
small_cap_flow.get("individual_flow", 0)
)
}
# 성장주 vs 가치주
growth_flow = style_flows.get("growth", {})
value_flow = style_flows.get("value", {})
growth_vs_value = {
"foreign_preference": self._calculate_preference_ratio(
growth_flow.get("foreign_flow", 0),
value_flow.get("foreign_flow", 0)
),
"individual_preference": self._calculate_preference_ratio(
growth_flow.get("individual_flow", 0),
value_flow.get("individual_flow", 0)
)
}
return {
"large_vs_small_cap": {
**large_vs_small,
"large_cap_preference": large_vs_small.get("foreign_preference", 0),
"small_cap_preference": 1 - large_vs_small.get("foreign_preference", 0),
"preference_ratio": large_vs_small.get("foreign_preference", 0) / max(large_vs_small.get("individual_preference", 1), 0.01)
},
"growth_vs_value": growth_vs_value
}
def _calculate_preference_ratio(self, flow1: float, flow2: float) -> float:
"""선호도 비율 계산"""
if flow1 + flow2 == 0:
return 0.5
return flow1 / (flow1 + flow2) if flow1 + flow2 > 0 else 0.5
def _analyze_style_momentum(self, style_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""스타일 모멘텀 분석"""
# 간소화된 모멘텀 분석
recent_styles = {}
for item in style_data[:10]: # 최근 10개 데이터
style = item.get("style")
if style:
if style not in recent_styles:
recent_styles[style] = []
total_flow = (item.get("foreign_net_flow", 0) +
item.get("individual_net_flow", 0) +
item.get("institutional_net_flow", 0))
recent_styles[style].append(total_flow)
momentum = {}
for style, flows in recent_styles.items():
if len(flows) >= 3:
trend = "increasing" if flows[0] > flows[-1] else "decreasing"
momentum[style] = {
"direction": trend,
"strength": abs(flows[0] - flows[-1]) / max(abs(flows[0]), abs(flows[-1]), 1)
}
return momentum
def _get_current_style_preferences(self, style_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""현재 스타일 선호도"""
if not style_data:
return {}
latest_data = style_data[0]
return {
"preferred_style": latest_data.get("style", "unknown"),
"flow_amount": latest_data.get("foreign_net_flow", 0) + latest_data.get("individual_net_flow", 0),
"confidence": 0.7 # 기본 신뢰도
}
def _analyze_foreign_flow(self, foreign_data: List[Dict[str, Any]],
include_currency: bool, include_global: bool) -> Dict[str, Any]:
"""외국인 자금 흐름 분석"""
if not foreign_data:
return {"error": "No foreign flow data available"}
# 총 외국인 자금 흐름
total_foreign_flow = sum(item.get("foreign_net_flow", 0) for item in foreign_data)
# 섹터별 외국인 선호도
sector_preferences = self._calculate_foreign_sector_preferences(foreign_data)
# 외국인 자금 흐름 트렌드
flow_trends = self._analyze_foreign_flow_trends(foreign_data)
result = {
"total_foreign_flow": total_foreign_flow,
"sector_preferences": sector_preferences,
"flow_trends": flow_trends
}
# 환율 영향 분석
if include_currency:
result["currency_impact"] = self._analyze_currency_impact(foreign_data)
# 글로벌 비교
if include_global:
result["global_comparison"] = self._get_global_comparison()
return result
def _calculate_foreign_sector_preferences(self, foreign_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""외국인 섹터별 선호도 계산"""
sector_flows = defaultdict(float)
for item in foreign_data:
sector = item.get("sector")
flow = item.get("foreign_net_flow", 0)
if sector:
sector_flows[sector] += flow
# 상위 선호 섹터
sorted_sectors = sorted(sector_flows.items(), key=lambda x: x[1], reverse=True)
return {
"top_preferred": sorted_sectors[:5],
"least_preferred": sorted_sectors[-3:] if len(sorted_sectors) >= 3 else [],
"concentration": self._analyze_flow_concentration([{"sector": k, "total_net_flow": v} for k, v in sector_flows.items()])
}
def _analyze_foreign_flow_trends(self, foreign_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""외국인 자금 흐름 트렌드 분석"""
# 날짜별 총 외국인 흐름
daily_flows = defaultdict(float)
for item in foreign_data:
date = str(item.get("date", ""))
flow = item.get("foreign_net_flow", 0)
daily_flows[date] += flow
# 시계열 데이터로 변환 (오래된 순서부터 정렬하여 시간 흐름에 따른 트렌드 분석)
sorted_dates = sorted(daily_flows.keys()) # 오래된 날짜부터
flows = [daily_flows[date] for date in sorted_dates]
if len(flows) < 3:
return {"trend_direction": "unknown", "trend_strength": 0}
# 선형 트렌드 계산
n = len(flows)
x_values = list(range(n))
y_values = flows
# 간단한 선형 회귀
x_mean = sum(x_values) / n
y_mean = sum(y_values) / n
numerator = sum((x_values[i] - x_mean) * (y_values[i] - y_mean) for i in range(n))
denominator = sum((x_values[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
slope = 0
else:
slope = numerator / denominator
# 트렌드 방향 및 강도
if slope > 100000000: # 1억원/일 이상 증가
direction = "상승"
elif slope < -100000000: # 1억원/일 이상 감소
direction = "하락"
else:
direction = "보합"
# "증가"도 상승 트렌드로 처리
if direction == "상승":
direction = "증가"
# 변동성 계산
volatility = statistics.stdev(flows) if len(flows) > 1 else 0
return {
"trend_direction": direction,
"trend_strength": abs(slope) / 1000000000, # 10억원 단위
"volatility": volatility / 1000000000,
"momentum": slope / abs(y_mean) if y_mean != 0 else 0
}
def _analyze_currency_impact(self, foreign_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""환율 영향 분석"""
# 환율과 외국인 자금 흐름 상관관계
rates = []
flows = []
for item in foreign_data:
rate = item.get("usd_krw_rate")
flow = item.get("foreign_net_flow")
if rate is not None and flow is not None:
rates.append(rate)
flows.append(flow)
if len(rates) < 3:
return {"correlation": 0, "sensitivity": 0, "current_impact": "unknown"}
# 피어슨 상관계수 계산
correlation = self._calculate_correlation(rates, flows)
# 민감도 (환율 1원 변화당 자금 흐름 변화)
if len(rates) >= 2:
rate_changes = [rates[i] - rates[i+1] for i in range(len(rates)-1)]
flow_changes = [flows[i] - flows[i+1] for i in range(len(flows)-1)]
if rate_changes and statistics.stdev(rate_changes) > 0:
sensitivity = statistics.covariance(rate_changes, flow_changes) / statistics.variance(rate_changes)
else:
sensitivity = 0
else:
sensitivity = 0
# 현재 환율 영향
current_rate = rates[0] if rates else 1350
avg_rate = statistics.mean(rates) if rates else 1350
rate_deviation = current_rate - avg_rate
if rate_deviation > 10:
current_impact = "원화약세_유입감소"
elif rate_deviation < -10:
current_impact = "원화강세_유입증가"
else:
current_impact = "중립"
return {
"correlation": round(correlation, 3),
"sensitivity": round(sensitivity / 1000000, 2), # 백만원 단위
"current_impact": current_impact
}
def _calculate_correlation(self, x: List[float], y: List[float]) -> float:
"""피어슨 상관계수 계산"""
if len(x) != len(y) or len(x) < 2:
return 0
n = len(x)
sum_x = sum(x)
sum_y = sum(y)
sum_xy = sum(x[i] * y[i] for i in range(n))
sum_x2 = sum(xi * xi for xi in x)
sum_y2 = sum(yi * yi for yi in y)
numerator = n * sum_xy - sum_x * sum_y
denominator = math.sqrt((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y))
if denominator == 0:
return 0
return numerator / denominator
def _get_global_comparison(self) -> Dict[str, Any]:
"""글로벌 비교 (간소화)"""
return {
"korea_ranking": "상위권",
"inflow_trend": "긍정적",
"comparison_markets": {
"taiwan": "유사",
"india": "낮음",
"vietnam": "높음"
}
}
def _analyze_institutional_flow(self, institutional_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""기관별 자금 흐름 분석"""
if not institutional_data:
return {"error": "No institutional flow data available"}
# 기관별 분석
breakdown = self._analyze_institutional_breakdown(institutional_data)
# 기관별 트렌드
trends = self._analyze_institutional_trends(institutional_data)
return {
**breakdown,
"institutional_trends": trends,
"dominant_institution": self._find_dominant_institution(breakdown)
}
def _analyze_institutional_breakdown(self, institutional_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""기관별 자금 흐름 분석"""
totals = {
"pension_flow": 0,
"bank_flow": 0,
"insurance_flow": 0,
"investment_trust_flow": 0,
"private_equity_flow": 0
}
for item in institutional_data:
totals["pension_flow"] += item.get("pension_net_flow", 0)
totals["bank_flow"] += item.get("bank_net_flow", 0)
totals["insurance_flow"] += item.get("insurance_net_flow", 0)
totals["investment_trust_flow"] += item.get("investment_trust_net_flow", 0)
totals["private_equity_flow"] += item.get("private_equity_net_flow", 0)
totals["total_institutional_flow"] = sum(totals.values())
return totals
def _analyze_institutional_trends(self, institutional_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""기관별 트렌드 분석"""
# 간소화된 트렌드 분석
return {
"pension_trend": "증가",
"bank_trend": "감소",
"insurance_trend": "증가",
"overall_trend": "mixed"
}
def _find_dominant_institution(self, breakdown: Dict[str, Any]) -> str:
"""지배적 기관 찾기"""
flows = {k: v for k, v in breakdown.items() if k != "total_institutional_flow"}
if not flows:
return "unknown"
dominant = max(flows.items(), key=lambda x: abs(x[1]))
return dominant[0]
def _analyze_etf_flows(self, etf_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""ETF 자금 흐름 분석"""
if not etf_data:
return {"error": "No ETF flow data available"}
# 총 ETF 자금 흐름
total_etf_flow = sum(item.get("net_flow", 0) for item in etf_data)
# 섹터별 ETF 선호도 분석
sector_preferences = defaultdict(float)
for item in etf_data:
etf_name = item.get("etf_name", "")
flow = item.get("net_flow", 0)
# ETF 이름에서 섹터 추출
if "IT" in etf_name.upper() or "TECH" in etf_name.upper():
sector = "IT"
elif "금융" in etf_name or "FINANCIAL" in etf_name.upper():
sector = "금융"
elif "200" in etf_name: # KODEX 200 등 대형주
sector = "대형주"
else:
sector = "기타"
sector_preferences[sector] += flow
# 자금 흐름 패턴
flow_patterns = self._analyze_etf_flow_patterns(etf_data)
return {
"total_etf_flow": total_etf_flow,
"sector_preferences": dict(sector_preferences),
"flow_patterns": flow_patterns,
"top_etfs": self._get_top_etf_flows(etf_data)
}
def _analyze_etf_flow_patterns(self, etf_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""ETF 자금 흐름 패턴 분석"""
# 간소화된 패턴 분석
inflows = [item for item in etf_data if item.get("net_flow", 0) > 0]
outflows = [item for item in etf_data if item.get("net_flow", 0) < 0]
return {
"inflow_count": len(inflows),
"outflow_count": len(outflows),
"net_creation": len(inflows) - len(outflows),
"pattern": "creation" if len(inflows) > len(outflows) else "redemption"
}
def _get_top_etf_flows(self, etf_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""상위 ETF 자금 흐름"""
sorted_etfs = sorted(etf_data, key=lambda x: abs(x.get("net_flow", 0)), reverse=True)
return sorted_etfs[:5]
def _analyze_margin_trading_impact(self, margin_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""신용거래 영향 분석"""
if not margin_data:
return {"error": "No margin trading data available"}
# 신용거래 순매수 계산
net_margin_flows = []
for item in margin_data:
net_flow = item.get("margin_buy", 0) - item.get("margin_sell", 0)
net_margin_flows.append(net_flow)
# 신용거래 트렌드
if len(net_margin_flows) >= 3:
recent_trend = net_margin_flows[:3]
if recent_trend[0] > recent_trend[-1]:
trend = "증가"
elif recent_trend[0] < recent_trend[-1]:
trend = "감소"
else:
trend = "보합"
else:
trend = "unknown"
# 레버리지 효과
latest_margin = margin_data[0] if margin_data else {}
margin_ratio = latest_margin.get("margin_ratio", 0)
if margin_ratio > 0.4:
leverage_effect = "high"
elif margin_ratio > 0.2:
leverage_effect = "medium"
else:
leverage_effect = "low"
return {
"net_margin_flow": sum(net_margin_flows),
"margin_trend": trend,
"leverage_effect": leverage_effect,
"current_margin_ratio": margin_ratio
}
def _analyze_flow_concentration(self, flow_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""자금 흐름 집중도 분석"""
if not flow_data:
return {"herfindahl_index": 0, "concentration_level": "unknown"}
# 총 자금 흐름
total_flow = sum(abs(item.get("total_net_flow", 0)) for item in flow_data)
if total_flow == 0:
return {"herfindahl_index": 0, "concentration_level": "unknown"}
# HHI 계산
hhi = 0
top3_flow = 0
# 흐름 크기 기준 정렬
sorted_flows = sorted(flow_data, key=lambda x: abs(x.get("total_net_flow", 0)), reverse=True)
for i, item in enumerate(sorted_flows):
flow = abs(item.get("total_net_flow", 0))
market_share = flow / total_flow
hhi += market_share ** 2
if i < 3: # 상위 3개
top3_flow += flow
top3_concentration = top3_flow / total_flow
# 집중도 레벨 분류
if hhi > 0.25:
concentration_level = "high"
elif hhi > 0.15:
concentration_level = "medium"
else:
concentration_level = "low"
return {
"herfindahl_index": round(hhi, 3),
"top3_concentration": round(top3_concentration, 3),
"concentration_level": concentration_level
}
def _calculate_flow_momentum(self, momentum_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""자금 흐름 모멘텀 계산"""
if len(momentum_data) < 3:
return {"momentum_score": 0, "momentum_direction": "unknown"}
flows = [item.get("total_flow", 0) for item in momentum_data]
# 변화율 계산 (1차 미분)
changes = []
for i in range(len(flows) - 1):
change = flows[i] - flows[i + 1]
changes.append(change)
# 가속도 계산 (변화율의 변화) - 변화 크기가 증가하는지 확인
accelerations = []
for i in range(len(changes) - 1):
accel = abs(changes[i + 1]) - abs(changes[i]) # 다음 변화가 더 큰지 확인
accelerations.append(accel)
avg_acceleration = statistics.mean(accelerations) if accelerations else 0
# 변화량의 표준편차로 가속도 보완 (변동성이 큰 경우 가속도로 간주)
if len(changes) > 2:
change_volatility = statistics.stdev([abs(c) for c in changes])
# 변동성이 크면 가속도 점수 증가
if change_volatility >= 40000000: # 4천만원 이상 변동성
avg_acceleration = max(avg_acceleration, change_volatility)
# 전체 트렌드 확인
total_change = flows[0] - flows[-1]
# 모멘텀 방향
if avg_acceleration > 50000000: # 변화가 가속하고 있음
if total_change > 0: # 감소 폭이 증가 (상승 모멘텀)
direction = "상승"
else: # 증가 폭이 증가
direction = "상승"
elif avg_acceleration < -50000000:
direction = "하락"
else:
direction = "보합"
# 양수 가속도는 상승으로 처리 (변화가 커지고 있음)
if avg_acceleration > 0:
direction = "상승"
# 모멘텀 점수 (정규화)
momentum_score = min(abs(avg_acceleration) / 1000000000, 1.0) # 0-1 스케일
return {
"momentum_score": round(momentum_score, 3),
"acceleration": avg_acceleration,
"momentum_direction": direction
}
async def _predict_future_flows(self, historical_data: List[Dict[str, Any]],
forecast_days: int) -> Dict[str, Any]:
"""자금 흐름 예측"""
# 간소화된 예측 모델
sector_forecasts = {}
# 섹터별 흐름 데이터 정리
sector_flows = defaultdict(list)
for item in historical_data:
sector = item.get("sector")
total_flow = (item.get("individual_net_flow", 0) +
item.get("foreign_net_flow", 0) +
item.get("institutional_net_flow", 0))
if sector:
sector_flows[sector].append(total_flow)
# 각 섹터별 예측
for sector, flows in sector_flows.items():
if len(flows) >= 5:
# 단순 이동평균 기반 예측
recent_avg = statistics.mean(flows[:5])
trend = (flows[0] - flows[-1]) / len(flows) if len(flows) > 1 else 0
forecasts = []
for i in range(forecast_days):
forecast = recent_avg + trend * (i + 1)
forecasts.append(forecast)
sector_forecasts[sector] = forecasts
# 신뢰구간 (간소화)
confidence_intervals = {}
for sector, forecasts in sector_forecasts.items():
intervals = []
for forecast in forecasts:
std_dev = abs(forecast) * 0.2 # 20% 표준편차 가정
intervals.append({
"lower": forecast - std_dev,
"upper": forecast + std_dev
})
confidence_intervals[sector] = intervals
return {
"sector_forecasts": sector_forecasts,
"sector_flow_forecast": sector_forecasts, # Alias for backward compatibility
"style_trend_forecast": sector_forecasts, # Style trends (simplified as sector forecasts)
"confidence_intervals": confidence_intervals,
"trend_continuation": "moderate",
"forecast_horizon": f"{forecast_days} days"
}
def _extract_risk_data(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
"""분석 결과에서 리스크 데이터 추출"""
risk_data = {}
# 섹터 로테이션에서 집중도 리스크
if "sector_rotation" in analysis_results:
sector_data = analysis_results["sector_rotation"]
if "sector_flows" in sector_data:
flows = [{"total_net_flow": v.get("total_net_flow", 0)}
for v in sector_data["sector_flows"].values()]
concentration = self._analyze_flow_concentration(flows)
risk_data["concentration"] = concentration
# 외국인 흐름에서 변동성 리스크
if "foreign_flow" in analysis_results:
foreign_data = analysis_results["foreign_flow"]
if "flow_trends" in foreign_data:
volatility = foreign_data["flow_trends"].get("volatility", 0)
risk_data["volatility"] = {"foreign_volatility": volatility}
return risk_data
async def _assess_flow_risks(self, risk_data: Dict[str, Any]) -> Dict[str, Any]:
"""자금 흐름 리스크 평가"""
risk_factors = []
risk_score = 0
# 집중도 리스크
if "concentration" in risk_data:
concentration = risk_data["concentration"]
hhi = concentration.get("herfindahl_index", 0)
if hhi > 0.25:
risk_factors.append("높은 섹터 집중도")
risk_score += 3
elif hhi > 0.15:
risk_factors.append("중간 섹터 집중도")
risk_score += 1
# 변동성 리스크
if "volatility" in risk_data:
volatility = risk_data["volatility"].get("foreign_volatility", 0)
if volatility > 2.0: # 20억 이상 변동성
risk_factors.append("높은 외국인 자금 변동성")
risk_score += 2
elif volatility > 1.0:
risk_factors.append("중간 외국인 자금 변동성")
risk_score += 1
# 전체 리스크 레벨
if risk_score >= 4:
overall_risk = "high"
elif risk_score >= 2:
overall_risk = "medium"
else:
overall_risk = "low"
# 완화 방안
mitigation_suggestions = []
if "높은 섹터 집중도" in risk_factors:
mitigation_suggestions.append("포트폴리오 다변화 권장")
if "높은 외국인 자금 변동성" in risk_factors:
mitigation_suggestions.append("외국인 의존도 모니터링 강화")
# 집중도 및 변동성 리스크 세부 정보
concentration_risk = "low"
flow_volatility = "low"
if "concentration" in risk_data:
hhi = risk_data["concentration"].get("herfindahl_index", 0)
if hhi > 0.25:
concentration_risk = "high"
elif hhi > 0.15:
concentration_risk = "medium"
if "volatility" in risk_data:
volatility = risk_data["volatility"].get("foreign_volatility", 0)
if volatility > 2.0:
flow_volatility = "high"
elif volatility > 1.0:
flow_volatility = "medium"
return {
"overall_risk_level": overall_risk,
"risk_factors": risk_factors,
"risk_score": risk_score,
"mitigation_suggestions": mitigation_suggestions,
"concentration_risk": concentration_risk,
"flow_volatility": flow_volatility
}