"""์์ฅ ์ฌ๋ฆฌ ์งํ ๋๊ตฌ"""
import json
import logging
import math
import random
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List, Tuple
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class MarketSentimentTool(BaseTool):
"""์์ฅ ์ฌ๋ฆฌ ์งํ ๋ถ์ ๋๊ตฌ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5๋ถ
@property
def name(self) -> str:
return "get_market_sentiment"
@property
def description(self) -> str:
return "์์ฅ ์ฌ๋ฆฌ ์งํ๋ฅผ ๋ถ์ํฉ๋๋ค. Fear & Greed Index, Put/Call Ratio, VKOSPI, ๋ด์ค ๊ฐ์ฑ ๋ถ์ ๋ฑ์ ์ง์ํฉ๋๋ค."
def get_tool_definition(self) -> ToolSchema:
"""๋๊ตฌ ์ ์ ๋ฐํ"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "KOSPI",
"description": "๋ถ์ํ ์์ฅ"
},
"indicators": {
"type": "array",
"items": {
"type": "string",
"enum": [
"put_call_ratio",
"vkospi",
"market_breadth",
"news_sentiment",
"social_media",
"volume_sentiment"
]
},
"minItems": 1,
"default": ["put_call_ratio", "vkospi"],
"description": "๋ถ์ํ ์ฌ๋ฆฌ ์งํ ๋ชฉ๋ก"
},
"period": {
"type": "string",
"enum": ["7d", "30d", "90d"],
"default": "30d",
"description": "๋ถ์ ๊ธฐ๊ฐ"
},
"include_fear_greed_index": {
"type": "boolean",
"default": False,
"description": "Fear & Greed Index ๊ณ์ฐ ํฌํจ ์ฌ๋ถ"
},
"include_historical_comparison": {
"type": "boolean",
"default": False,
"description": "๊ณผ๊ฑฐ ๋๋น ์ฌ๋ฆฌ ๋ณํ ๋ถ์ ํฌํจ ์ฌ๋ถ"
},
"include_buzz_analysis": {
"type": "boolean",
"default": False,
"description": "๋ฒ์ฆ ๊ฐ๋ ๋ถ์ ํฌํจ ์ฌ๋ถ"
}
},
"required": ["indicators"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""์์ฅ ์ฌ๋ฆฌ ๋ถ์ ์คํ"""
try:
# ํ๋ผ๋ฏธํฐ ์ถ์ถ ๋ฐ ๊ฒ์ฆ
market = arguments.get("market", "KOSPI")
indicators = arguments.get("indicators", ["put_call_ratio", "vkospi"])
period = arguments.get("period", "30d")
include_fear_greed = arguments.get("include_fear_greed_index", False)
include_historical = arguments.get("include_historical_comparison", False)
include_buzz = arguments.get("include_buzz_analysis", False)
self._validate_parameters(market, indicators, period)
# ์บ์ ํ์ธ
cache_key = self._generate_cache_key(
market, indicators, period, include_fear_greed, include_historical, include_buzz
)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ฐ์ดํฐ ์กฐํ
data = await self._fetch_sentiment_data(
market, indicators, period, include_fear_greed, include_historical, include_buzz
)
# ์บ์ ์ ์ฅ
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Sentiment analysis completed for {market}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in market sentiment tool: {e}")
raise
def _validate_parameters(self, market: str, indicators: List[str], period: str):
"""ํ๋ผ๋ฏธํฐ ๊ฒ์ฆ"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if not indicators or len(indicators) == 0:
raise ValueError("At least one indicator must be specified")
valid_indicators = [
"put_call_ratio", "vkospi", "market_breadth",
"news_sentiment", "social_media", "volume_sentiment"
]
for indicator in indicators:
if indicator not in valid_indicators:
raise ValueError(f"Invalid indicator: {indicator}")
valid_periods = ["7d", "30d", "90d"]
if period not in valid_periods:
raise ValueError(f"Invalid period: {period}")
def _generate_cache_key(self, market: str, indicators: List[str], period: str,
include_fgi: bool, include_hist: bool, include_buzz: bool) -> str:
"""์บ์ ํค ์์ฑ"""
indicators_str = "_".join(sorted(indicators))
return f"sentiment:{market}:{indicators_str}:{period}:{include_fgi}:{include_hist}:{include_buzz}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""๋ฐ์ดํฐ ์ ์ ๋ ํ์ธ"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_sentiment_data(self, market: str, indicators: List[str], period: str,
include_fgi: bool, include_hist: bool, include_buzz: bool) -> Dict[str, Any]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์ฌ๋ฆฌ ์งํ ๋ฐ์ดํฐ ์กฐํ"""
try:
days = self._get_period_days(period)
# ๊ฒฐ๊ณผ ๊ตฌ์ฑ
result = {
"timestamp": datetime.now().isoformat(),
"market": market,
"period": period,
"sentiment_indicators": {},
"overall_sentiment": "์ค๋ฆฝ"
}
# ๊ฐ ์งํ๋ณ๋ก ๋ฐ์ดํฐ ์กฐํ ๋ฐ ๋ถ์
sentiment_scores = {}
for indicator in indicators:
try:
if indicator == "put_call_ratio":
pcr_data = await self._fetch_put_call_data(market, days)
if pcr_data:
pcr_analysis = self._analyze_put_call_ratio(pcr_data)
result["sentiment_indicators"]["put_call_ratio"] = pcr_analysis
sentiment_scores["put_call_ratio"] = pcr_analysis.get("sentiment_score", 50)
elif indicator == "vkospi":
vkospi_data = await self._fetch_vkospi_data(days)
if vkospi_data:
vkospi_analysis = self._analyze_vkospi(vkospi_data)
result["sentiment_indicators"]["vkospi"] = vkospi_analysis
sentiment_scores["vkospi"] = vkospi_analysis.get("sentiment_score", 50)
elif indicator == "market_breadth":
breadth_data = await self._fetch_market_breadth_data(market, days)
if breadth_data:
breadth_analysis = self._analyze_market_breadth_sentiment(breadth_data)
result["sentiment_indicators"]["market_breadth"] = breadth_analysis
sentiment_scores["market_breadth"] = breadth_analysis.get("sentiment_score", 50)
elif indicator == "news_sentiment":
news_data = await self._fetch_news_sentiment_data(market, days)
if news_data:
news_analysis = self._analyze_news_sentiment(news_data, include_buzz)
result["sentiment_indicators"]["news_sentiment"] = news_analysis
sentiment_scores["news_sentiment"] = news_analysis.get("sentiment_score", 50)
elif indicator == "social_media":
social_data = await self._fetch_social_media_data(market, days)
if social_data:
social_analysis = self._analyze_social_media_sentiment(social_data)
result["sentiment_indicators"]["social_media"] = social_analysis
sentiment_scores["social_media"] = social_analysis.get("sentiment_score", 50)
elif indicator == "volume_sentiment":
volume_data = await self._fetch_volume_sentiment_data(market, days)
if volume_data:
volume_analysis = self._analyze_volume_sentiment(volume_data)
result["sentiment_indicators"]["volume_sentiment"] = volume_analysis
sentiment_scores["volume_sentiment"] = volume_analysis.get("sentiment_score", 50)
except Exception as e:
self.logger.warning(f"Failed to analyze {indicator}: {e}")
result["sentiment_indicators"][indicator] = {
"error": f"๋ถ์ ์คํจ: {str(e)}"
}
# DatabaseConnectionError๋ ๋ค์ ๋ฐ์์ํด
if isinstance(e, DatabaseConnectionError):
raise
# ๋ฐ์ดํฐ ๋ถ์กฑ ๊ฒฝ๊ณ (๋ ์๊ฒฉํ ๊ธฐ์ค)
if len(sentiment_scores) == 0:
result["warning"] = "์ถฉ๋ถํ ์ฌ๋ฆฌ ์งํ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"
return result
# ๊ฐ ์งํ๋ณ๋ก ๋ฐ์ดํฐ ํ์ง ํ์ธ
total_indicators = len(indicators)
successful_indicators = len(sentiment_scores)
if successful_indicators < total_indicators * 0.5: # 50% ๋ฏธ๋ง ์ฑ๊ณต
result["warning"] = f"์์ฒญํ {total_indicators}๊ฐ ์งํ ์ค {successful_indicators}๊ฐ๋ง ๋ถ์ ๊ฐ๋ฅ"
# ์ข
ํฉ ์ฌ๋ฆฌ ์ ์ ๊ณ์ฐ
overall_sentiment_data = self._aggregate_sentiment_indicators(sentiment_scores)
result["overall_sentiment"] = overall_sentiment_data["sentiment"]
result["sentiment_score"] = overall_sentiment_data["score"]
result["confidence"] = overall_sentiment_data["confidence"]
# Fear & Greed Index ๊ณ์ฐ
if include_fgi:
fgi_data = self._calculate_fear_greed_index_from_scores(sentiment_scores)
result["fear_greed_index"] = fgi_data
# ๊ณผ๊ฑฐ ๋๋น ๋ถ์
if include_hist:
historical_data = await self._perform_historical_comparison(market, indicators, sentiment_scores)
result["historical_comparison"] = historical_data
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch sentiment data: {e}")
def _get_period_days(self, period: str) -> int:
"""๊ธฐ๊ฐ์ ์ผ์๋ก ๋ณํ"""
period_map = {
"7d": 7,
"30d": 30,
"90d": 90
}
return period_map.get(period, 30)
async def _fetch_put_call_data(self, market: str, days: int) -> List[Dict[str, Any]]:
"""Put/Call Ratio ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, put_volume, call_volume, put_call_ratio, put_oi, call_oi
FROM put_call_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_vkospi_data(self, days: int) -> List[Dict[str, Any]]:
"""VKOSPI ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, vkospi_value, change, change_rate, high, low
FROM vkospi_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_market_breadth_data(self, market: str, days: int) -> List[Dict[str, Any]]:
"""์์ฅ ํญ ์ฌ๋ฆฌ ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, kospi_close, kospi_change_rate, volume_ratio,
advancing_issues, declining_issues, new_highs, new_lows
FROM market_sentiment_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_news_sentiment_data(self, market: str, days: int) -> List[Dict[str, Any]]:
"""๋ด์ค ์ฌ๋ฆฌ ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, positive_count, negative_count, neutral_count,
sentiment_score, buzz_intensity
FROM news_sentiment_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_social_media_data(self, market: str, days: int) -> List[Dict[str, Any]]:
"""์์
๋ฏธ๋์ด ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, platform, positive_mentions, negative_mentions, neutral_mentions,
sentiment_score, engagement_rate, trending_keywords
FROM social_media_sentiment
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date DESC
"""
return await self.db_manager.fetch_all(query, days)
async def _fetch_volume_sentiment_data(self, market: str, days: int) -> List[Dict[str, Any]]:
"""๊ฑฐ๋๋ ์ฌ๋ฆฌ ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT date, total_volume, up_volume, down_volume,
volume_ratio, price_volume_trend
FROM volume_sentiment_data
WHERE date >= CURRENT_DATE - INTERVAL '%s days'
"""
params = [days]
if market != "ALL":
query += " AND market = %s"
params.append(market)
query += " ORDER BY date DESC"
return await self.db_manager.fetch_all(query, *params)
def _analyze_put_call_ratio(self, pcr_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Put/Call Ratio ๋ถ์"""
if not pcr_data:
return {"error": "Put/Call Ratio ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
# ์ต์ ๋ฐ์ดํฐ ์๊ตฌ์ฌํญ ์ฒดํฌ
if len(pcr_data) < 5:
return {"error": "Insufficient data for Put/Call Ratio trend analysis (minimum 5 days required)"}
latest = pcr_data[0]
current_ratio = latest.get("put_call_ratio", 0.0)
# ํด์
interpretation = self._interpret_put_call_ratio(current_ratio)
# ํธ๋ ๋ ๋ถ์
if len(pcr_data) >= 5:
recent_ratios = [item.get("put_call_ratio", 0) for item in pcr_data[:5]]
trend = self._analyze_sentiment_trend(recent_ratios)
else:
trend = {"direction": "๋ถ๋ช
", "strength": "๋ถ๋ช
"}
# ์ฌ๋ฆฌ ์ ์ ๊ณ์ฐ (0-100, 50์ด ์ค๋ฆฝ)
sentiment_score = self._pcr_to_sentiment_score(current_ratio)
return {
"current_ratio": round(current_ratio, 3),
"interpretation": interpretation,
"trend": trend,
"sentiment_score": sentiment_score,
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
def _analyze_vkospi(self, vkospi_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""VKOSPI ๋ถ์"""
if not vkospi_data:
return {"error": "VKOSPI ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
latest = vkospi_data[0]
current_value = latest.get("vkospi_value", 0.0)
# ๋ณ๋์ฑ ์ฒด์ ๋ถ๋ฅ
regime = self._classify_volatility_regime(current_value)
# ํธ๋ ๋ ๋ถ์
if len(vkospi_data) >= 5:
recent_values = [item.get("vkospi_value", 0) for item in vkospi_data[:5]]
trend = self._analyze_sentiment_trend(recent_values, reverse=True) # VKOSPI๋ ๋์์๋ก ๋ถ์ ์
else:
trend = {"direction": "๋ถ๋ช
", "strength": "๋ถ๋ช
"}
# ์ฌ๋ฆฌ ์ ์ ๊ณ์ฐ (VKOSPI๋ ์ญ๋ฐฉํฅ)
sentiment_score = self._vkospi_to_sentiment_score(current_value)
return {
"current_value": round(current_value, 2),
"volatility_regime": regime,
"trend_analysis": trend,
"sentiment_score": sentiment_score,
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
def _analyze_market_breadth_sentiment(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์์ฅ ํญ ์ฌ๋ฆฌ ๋ถ์"""
if not breadth_data:
return {"error": "์์ฅ ํญ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
latest = breadth_data[0]
advancing = latest.get("advancing_issues", 0)
declining = latest.get("declining_issues", 0)
new_highs = latest.get("new_highs", 0)
new_lows = latest.get("new_lows", 0)
volume_ratio = latest.get("volume_ratio", 1.0)
# ์์นํ๋ฝ ์ฌ๋ฆฌ
ad_ratio = advancing / (declining + 1) # 0์ผ๋ก ๋๋๊ธฐ ๋ฐฉ์ง
ad_sentiment = "๊ธ์ ์ " if ad_ratio > 1.2 else "๋ถ์ ์ " if ad_ratio < 0.8 else "์ค๋ฆฝ"
# ์ ๊ณ ๊ฐ/์ ์ ๊ฐ ๋น์จ
hl_ratio = new_highs / (new_lows + 1)
# ๊ฑฐ๋๋ ์ฌ๋ฆฌ
volume_sentiment = "๊ธ์ ์ " if volume_ratio > 1.1 else "๋ถ์ ์ " if volume_ratio < 0.9 else "์ค๋ฆฝ"
# ์ข
ํฉ ์ฌ๋ฆฌ ์ ์
sentiment_score = self._calculate_breadth_sentiment_score(ad_ratio, hl_ratio, volume_ratio)
return {
"advance_decline_sentiment": ad_sentiment,
"advance_decline_ratio": round(ad_ratio, 2),
"volume_sentiment": volume_sentiment,
"volume_ratio": round(volume_ratio, 2),
"new_highs_lows_ratio": round(hl_ratio, 2),
"sentiment_score": sentiment_score,
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
def _analyze_news_sentiment(self, news_data: List[Dict[str, Any]], include_buzz: bool) -> Dict[str, Any]:
"""๋ด์ค ์ฌ๋ฆฌ ๋ถ์"""
if not news_data:
return {"error": "๋ด์ค ์ฌ๋ฆฌ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
latest = news_data[0]
sentiment_score = latest.get("sentiment_score", 0.0)
positive_count = latest.get("positive_count", 0)
negative_count = latest.get("negative_count", 0)
buzz_intensity = latest.get("buzz_intensity", 0.0)
# ์ฌ๋ฆฌ ํธ๋ ๋ ๋ถ์
if len(news_data) >= 7:
recent_scores = [item.get("sentiment_score", 0) for item in news_data[:7]]
trend = self._analyze_sentiment_trend(recent_scores)
else:
trend = {"direction": "๋ถ๋ช
", "strength": "๋ถ๋ช
"}
# ์ฌ๋ฆฌ ์ ์๋ฅผ 0-100 ์ค์ผ์ผ๋ก ๋ณํ
normalized_score = (sentiment_score + 1) * 50 # -1~1์ 0~100์ผ๋ก
result = {
"sentiment_score": round(normalized_score, 1),
"sentiment_trend": trend,
"positive_ratio": round(positive_count / (positive_count + negative_count + 1), 2),
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
# ๋ฒ์ฆ ๋ถ์ ํฌํจ
if include_buzz:
buzz_level = "๋์" if buzz_intensity > 0.7 else "๋ณดํต" if buzz_intensity > 0.3 else "๋ฎ์"
result["buzz_analysis"] = {
"intensity_level": buzz_level,
"buzz_score": round(buzz_intensity * 100, 1),
"trending_topics": ["์ฆ์", "ํฌ์", "์ฃผ์"] # ์ค์ ๋ก๋ DB์์ ์กฐํ
}
return result
def _analyze_social_media_sentiment(self, social_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์์
๋ฏธ๋์ด ์ฌ๋ฆฌ ๋ถ์"""
if not social_data:
return {"error": "์์
๋ฏธ๋์ด ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
latest = social_data[0]
sentiment_score = latest.get("sentiment_score", 0.0)
positive_mentions = latest.get("positive_mentions", 0)
negative_mentions = latest.get("negative_mentions", 0)
engagement_rate = latest.get("engagement_rate", 0.0)
# ์ฌ๋ฆฌ ํธ๋ ๋
if len(social_data) >= 7:
recent_scores = [item.get("sentiment_score", 0) for item in social_data[:7]]
trend = self._analyze_sentiment_trend(recent_scores)
else:
trend = {"direction": "๋ถ๋ช
", "strength": "๋ถ๋ช
"}
# ์ฐธ์ฌ๋ ํธ๋ ๋
if len(social_data) >= 7:
recent_engagement = [item.get("engagement_rate", 0) for item in social_data[:7]]
avg_engagement = statistics.mean(recent_engagement)
engagement_trend = "์ฆ๊ฐ" if avg_engagement > recent_engagement[-1] else "๊ฐ์"
else:
engagement_trend = "๋ถ๋ช
"
# ์ฌ๋ฆฌ ์ ์ ์ ๊ทํ
normalized_score = (sentiment_score + 1) * 50
return {
"sentiment_score": round(normalized_score, 1),
"engagement_trend": engagement_trend,
"positive_ratio": round(positive_mentions / (positive_mentions + negative_mentions + 1), 2),
"trending_topics": latest.get("trending_keywords", []),
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
def _analyze_volume_sentiment(self, volume_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""๊ฑฐ๋๋ ์ฌ๋ฆฌ ๋ถ์"""
if not volume_data:
return {"error": "๊ฑฐ๋๋ ์ฌ๋ฆฌ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
latest = volume_data[0]
up_volume = latest.get("up_volume", 0)
down_volume = latest.get("down_volume", 0)
volume_ratio = latest.get("volume_ratio", 1.0)
# ์์น/ํ๋ฝ ๊ฑฐ๋๋ ๋น์จ
updown_ratio = up_volume / (down_volume + 1)
# ๊ฑฐ๋๋ ์ฌ๋ฆฌ
volume_sentiment = "๊ธ์ ์ " if updown_ratio > 1.2 else "๋ถ์ ์ " if updown_ratio < 0.8 else "์ค๋ฆฝ"
# ์ฌ๋ฆฌ ์ ์ ๊ณ์ฐ
sentiment_score = 50 + (updown_ratio - 1) * 25 # 1์ ์ค์ฌ์ผ๋ก ์ค์ผ์ผ๋ง
sentiment_score = max(0, min(100, sentiment_score)) # 0-100 ๋ฒ์ ์ ํ
return {
"volume_sentiment": volume_sentiment,
"updown_volume_ratio": round(updown_ratio, 2),
"total_volume_ratio": round(volume_ratio, 2),
"sentiment_score": round(sentiment_score, 1),
"analysis_date": latest.get("date").isoformat() if latest.get("date") else None
}
def _interpret_put_call_ratio(self, ratio: float) -> str:
"""Put/Call Ratio ํด์"""
if ratio >= 1.1:
return "๊ทน๋ ๊ณตํฌ - ๊ณผ๋ํ ํ์ต์
๋งค์"
elif ratio >= 0.9:
return "๊ณตํฌ - ํ์ต์
์ ํธ"
elif ratio >= 0.7:
return "์ค๋ฆฝ - ๊ท ํ์ ์ต์
๊ฑฐ๋"
elif ratio >= 0.5:
return "ํ์ - ์ฝ์ต์
์ ํธ"
else:
return "๊ทน๋ ํ์ - ๊ณผ๋ํ ์ฝ์ต์
๋งค์"
def _classify_volatility_regime(self, vkospi: float) -> str:
"""VKOSPI ๋ณ๋์ฑ ์ฒด์ ๋ถ๋ฅ"""
if vkospi <= 18:
return "๋ฎ์"
elif vkospi <= 25:
return "๋ณดํต"
elif vkospi <= 35:
return "๋์"
else:
return "๋งค์ฐ ๋์"
def _pcr_to_sentiment_score(self, ratio: float) -> float:
"""Put/Call Ratio๋ฅผ ์ฌ๋ฆฌ ์ ์๋ก ๋ณํ (0-100)"""
# 0.7์ ์ค๋ฆฝ(50)์ผ๋ก ์ค์
if ratio <= 0.3:
return 90 # ๊ทน๋ ํ์
elif ratio <= 0.5:
return 70 # ํ์
elif ratio <= 0.9:
return 50 # ์ค๋ฆฝ
elif ratio <= 1.2:
return 30 # ๊ณตํฌ
else:
return 10 # ๊ทน๋ ๊ณตํฌ
def _vkospi_to_sentiment_score(self, vkospi: float) -> float:
"""VKOSPI๋ฅผ ์ฌ๋ฆฌ ์ ์๋ก ๋ณํ (0-100, ๋์์๋ก ๋ถ์ ์ )"""
if vkospi <= 15:
return 80 # ๋ฎ์ ๋ณ๋์ฑ = ๊ธ์ ์
elif vkospi <= 22:
return 60
elif vkospi <= 30:
return 40
elif vkospi <= 40:
return 20
else:
return 10 # ๋งค์ฐ ๋์ ๋ณ๋์ฑ = ๋ถ์ ์
def _calculate_breadth_sentiment_score(self, ad_ratio: float, hl_ratio: float, volume_ratio: float) -> float:
"""์์ฅ ํญ ๊ธฐ๋ฐ ์ฌ๋ฆฌ ์ ์ ๊ณ์ฐ"""
# ๊ฐ ์งํ๋ฅผ 0-100 ์ ์๋ก ๋ณํ
ad_score = 50 + (ad_ratio - 1) * 25 # 1์ ์ค์ฌ์ผ๋ก
hl_score = 50 + (hl_ratio - 1) * 20
vol_score = 50 + (volume_ratio - 1) * 30
# ๊ฐ์ค ํ๊ท (์์นํ๋ฝ ๋น์จ์ ๋ ๋์ ๊ฐ์ค์น)
final_score = (ad_score * 0.5 + hl_score * 0.3 + vol_score * 0.2)
return max(0, min(100, round(final_score, 1)))
def _analyze_sentiment_trend(self, values: List[float], reverse: bool = False) -> Dict[str, Any]:
"""์ฌ๋ฆฌ ํธ๋ ๋ ๋ถ์"""
if len(values) < 3:
return {"direction": "๋ถ๋ช
", "strength": "๋ถ๋ช
"}
# ์ ํ ํ๊ท ๊ธฐ์ธ๊ธฐ ๊ณ์ฐ (๊ฐ๋จํ ๋ฐฉ๋ฒ)
x = list(range(len(values)))
if reverse:
values = [-v for v in values] # VKOSPI ๊ฐ์ ์ญ์งํ
# ์ต์์ ๊ณฑ๋ฒ์ผ๋ก ๊ธฐ์ธ๊ธฐ ๊ณ์ฐ
n = len(values)
sum_x = sum(x)
sum_y = sum(values)
sum_xy = sum(x[i] * values[i] for i in range(n))
sum_x2 = sum(xi**2 for xi in x)
if n * sum_x2 - sum_x**2 == 0:
slope = 0
else:
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x**2)
# ๋ฐฉํฅ ๊ฒฐ์ (๋ฐ์ดํฐ ๋ฒ์์ ๋ง๋ ์๊ณ๊ฐ)
if abs(slope) < 0.5:
direction = "๋ณดํฉ"
strength = "์ฝํจ"
elif slope > 0:
direction = "์์น"
strength = "๊ฐํจ" if slope > 2.0 else "๋ณดํต"
else:
direction = "ํ๋ฝ"
strength = "๊ฐํจ" if slope < -2.0 else "๋ณดํต"
return {
"direction": direction,
"strength": strength,
"slope": round(slope, 3)
}
def _aggregate_sentiment_indicators(self, sentiment_scores: Dict[str, float]) -> Dict[str, Any]:
"""์ฌ๋ฆฌ ์งํ ์ข
ํฉ"""
if not sentiment_scores:
return {"score": 50, "sentiment": "์ค๋ฆฝ", "confidence": 0}
# ๊ฐ์ค์น ์ค์
weights = {
"put_call_ratio": 0.25,
"vkospi": 0.20,
"market_breadth": 0.25,
"news_sentiment": 0.15,
"social_media": 0.10,
"volume_sentiment": 0.05
}
# ๊ฐ์ค ํ๊ท ๊ณ์ฐ
total_score = 0
total_weight = 0
for indicator, score in sentiment_scores.items():
weight = weights.get(indicator, 0.1)
total_score += score * weight
total_weight += weight
final_score = total_score / total_weight if total_weight > 0 else 50
# ์ฌ๋ฆฌ ์นดํ
๊ณ ๋ฆฌ
if final_score <= 20:
sentiment = "๋งค์ฐ ๋ถ์ ์ "
elif final_score <= 40:
sentiment = "๋ถ์ ์ "
elif final_score <= 60:
sentiment = "์ค๋ฆฝ"
elif final_score <= 80:
sentiment = "๊ธ์ ์ "
else:
sentiment = "๋งค์ฐ ๊ธ์ ์ "
# ์ ๋ขฐ๋ (์งํ ์์ ํธ์ฐจ ๊ธฐ๋ฐ)
confidence = min(len(sentiment_scores) / 4, 1.0) # 4๊ฐ ์งํ๋ฅผ ์ต๋๋ก
if len(sentiment_scores) > 1:
score_variance = statistics.variance(sentiment_scores.values())
confidence *= max(0.5, 1 - score_variance / 1000) # ํธ์ฐจ๊ฐ ํด์๋ก ์ ๋ขฐ๋ ๋ฎ์
return {
"score": round(final_score, 1),
"sentiment": sentiment,
"confidence": round(confidence, 2)
}
def _calculate_fear_greed_index(self, indicators: Dict[str, Dict[str, float]]) -> float:
"""Fear & Greed Index ๊ณ์ฐ"""
total_score = 0
total_weight = 0
for indicator, data in indicators.items():
score = data.get("score", 50)
weight = data.get("weight", 1.0)
total_score += score * weight
total_weight += weight
return round(total_score / total_weight if total_weight > 0 else 50, 1)
def _calculate_fear_greed_index_from_scores(self, sentiment_scores: Dict[str, float]) -> Dict[str, Any]:
"""์ฌ๋ฆฌ ์ ์๋ก๋ถํฐ Fear & Greed Index ๊ณ์ฐ"""
# ๊ฐ ์งํ๋ฅผ Fear & Greed Index ํ์์ผ๋ก ๋ณํ
fgi_indicators = {}
for indicator, score in sentiment_scores.items():
fgi_indicators[indicator] = {
"score": score,
"weight": 0.2 # ๋๋ฑํ ๊ฐ์ค์น
}
fgi_score = self._calculate_fear_greed_index(fgi_indicators)
category = self._get_fear_greed_category(fgi_score)
return {
"score": fgi_score,
"category": category,
"components": fgi_indicators
}
def _get_fear_greed_category(self, score: float) -> str:
"""Fear & Greed ์นดํ
๊ณ ๋ฆฌ ๋ถ๋ฅ"""
if score <= 25:
return "๊ทน๋ ๊ณตํฌ"
elif score <= 45:
return "๊ณตํฌ"
elif score <= 55:
return "์ค๋ฆฝ"
elif score <= 75:
return "ํ์"
else:
return "๊ทน๋ ํ์"
async def _perform_historical_comparison(self, market: str, indicators: List[str],
current_scores: Dict[str, float]) -> Dict[str, Any]:
"""๊ณผ๊ฑฐ ๋๋น ์ฌ๋ฆฌ ๋ณํ ๋ถ์"""
try:
# 1์ฃผ์ผ ์ , 1๊ฐ์ ์ ๋ฐ์ดํฐ ์กฐํ (๊ฐ์ํ)
comparison = {}
# ํ์ฌ ํ๊ท ์ ์
current_avg = statistics.mean(current_scores.values()) if current_scores else 50
# ๊ฐ์์ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ (์ค์ ๋ก๋ DB์์ ์กฐํ)
week_ago_avg = current_avg + random.uniform(-10, 10)
month_ago_avg = current_avg + random.uniform(-15, 15)
comparison["vs_1week_ago"] = {
"change": round(current_avg - week_ago_avg, 1),
"percentage_change": round((current_avg - week_ago_avg) / week_ago_avg * 100, 1)
}
comparison["vs_1month_ago"] = {
"change": round(current_avg - month_ago_avg, 1),
"percentage_change": round((current_avg - month_ago_avg) / month_ago_avg * 100, 1)
}
# ๋ฐฑ๋ถ์ ์์ (์ต๊ทผ 1๋
๋๋น)
comparison["percentile_rank"] = random.randint(10, 90) # ์ค์ ๋ก๋ ๊ณ์ฐ
return comparison
except Exception as e:
self.logger.warning(f"Historical comparison failed: {e}")
return {"error": "๊ณผ๊ฑฐ ๋๋น ๋ถ์ ์ค ์ค๋ฅ ๋ฐ์"}