sector_tools.pyโข20.3 kB
"""์นํฐ ๋ถ์ ๋๊ตฌ"""
import json
import logging
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class SectorAnalysisTool(BaseTool):
"""์นํฐ๋ณ ๋ถ์ ๋๊ตฌ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5๋ถ
@property
def name(self) -> str:
return "get_sector_analysis"
@property
def description(self) -> str:
return "์
์ข
๋ณ/์นํฐ๋ณ ์ฃผ์ ์์ฅ ๋ถ์์ ์ ๊ณตํฉ๋๋ค. ์นํฐ ์ฑ๊ณผ, ์๊ฐ์ด์ก, ์ฃผ์ ์ข
๋ชฉ ๋ฑ์ ๋ถ์ํฉ๋๋ค."
def get_tool_definition(self) -> ToolSchema:
"""๋๊ตฌ ์ ์ ๋ฐํ"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "ALL",
"description": "์กฐํํ ์์ฅ"
},
"sector": {
"type": "string",
"default": "ALL",
"description": "์กฐํํ ์นํฐ (์: IT, ๊ธ์ต, ๋ฐ์ด์ค, ALL)"
},
"sort_by": {
"type": "string",
"enum": ["market_cap", "change_rate", "volume", "strength"],
"default": "market_cap",
"description": "์ ๋ ฌ ๊ธฐ์ค"
},
"include_stocks": {
"type": "boolean",
"default": False,
"description": "์ฃผ์ ์ข
๋ชฉ ์ ๋ณด ํฌํจ ์ฌ๋ถ"
},
"include_relative_strength": {
"type": "boolean",
"default": False,
"description": "์๋ ๊ฐ๋ ๋ถ์ ํฌํจ ์ฌ๋ถ"
},
"include_rotation_analysis": {
"type": "boolean",
"default": False,
"description": "์นํฐ ๋กํ
์ด์
๋ถ์ ํฌํจ ์ฌ๋ถ"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""์นํฐ ๋ถ์ ๋ฐ์ดํฐ ์กฐํ ๋ฐ ๋ถ์"""
try:
# ํ๋ผ๋ฏธํฐ ์ถ์ถ ๋ฐ ๊ฒ์ฆ
market = arguments.get("market", "ALL")
sector = arguments.get("sector", "ALL")
sort_by = arguments.get("sort_by", "market_cap")
include_stocks = arguments.get("include_stocks", False)
include_relative_strength = arguments.get("include_relative_strength", False)
include_rotation_analysis = arguments.get("include_rotation_analysis", False)
self._validate_parameters(market, sort_by)
# ์บ์ ํ์ธ
cache_key = self._generate_cache_key(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ฐ์ดํฐ ์กฐํ
data = await self._fetch_sector_data(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis)
# ์บ์ ์ ์ฅ
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Sector analysis data fetched for {market}/{sector}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in sector analysis tool: {e}")
raise
def _validate_parameters(self, market: str, sort_by: str):
"""ํ๋ผ๋ฏธํฐ ๊ฒ์ฆ"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
valid_sort_by = ["market_cap", "change_rate", "volume", "strength"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if sort_by not in valid_sort_by:
raise ValueError(f"Invalid sort_by: {sort_by}")
def _generate_cache_key(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> str:
"""์บ์ ํค ์์ฑ"""
return f"sector_analysis:{market}:{sector}:{sort_by}:{include_stocks}:{include_rs}:{include_rotation}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""๋ฐ์ดํฐ ์ ์ ๋ ํ์ธ"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_sector_data(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> Dict[str, Any]:
"""๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์นํฐ ๋ถ์ ๋ฐ์ดํฐ ์กฐํ"""
try:
# ์ฟผ๋ฆฌ ๊ตฌ์ฑ
query = """
SELECT sector_name, sector_code, market_cap, total_volume, change_rate,
advancing_stocks, declining_stocks, unchanged_stocks, total_stocks,
top_performer, top_performer_change, worst_performer, worst_performer_change,
date, timestamp
FROM sector_performance
WHERE DATE(date) = CURRENT_DATE
"""
params = []
# ์์ฅ ํํฐ
if market != "ALL":
query += " AND market = %s"
params.append(market)
# ์นํฐ ํํฐ
if sector != "ALL":
query += " AND (sector_name = %s OR sector_code = %s)"
params.extend([sector, sector])
# ์ ๋ ฌ ์ถ๊ฐ
sort_column = self._get_sort_column(sort_by)
query += f" ORDER BY {sort_column} DESC"
# ๋ฐ์ดํฐ ์กฐํ
sector_data = await self.db_manager.fetch_all(query, *params)
# ์ ๋ ฌ ์ ์ฉ
sorted_data = self._sort_sectors(sector_data, sort_by)
# ๊ฒฐ๊ณผ ๊ตฌ์ฑ
result = {
"timestamp": datetime.now().isoformat(),
"market": market,
"sector": sector,
"sort_by": sort_by,
"sector_analysis": [self._format_sector_data(data) for data in sorted_data],
"market_summary": self._calculate_market_summary(sorted_data)
}
# ๋น ๋ฐ์ดํฐ ์ฒ๋ฆฌ
if not sector_data:
result["message"] = "์์ฒญํ ์นํฐ์ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"
result["sector_analysis"] = []
# ์ฃผ์ ์ข
๋ชฉ ์ ๋ณด ํฌํจ
if include_stocks and sector_data:
await self._add_major_stocks_info(result, sorted_data)
# ์๋ ๊ฐ๋ ๋ถ์ ํฌํจ
if include_rs and sector_data:
result["relative_strength"] = self._analyze_relative_strength(sorted_data)
# ์นํฐ ๋กํ
์ด์
๋ถ์ ํฌํจ
if include_rotation and sector_data:
rotation_data = await self._analyze_sector_rotation(sorted_data)
result["rotation_analysis"] = rotation_data
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch sector data: {e}")
def _get_sort_column(self, sort_by: str) -> str:
"""์ ๋ ฌ ์ปฌ๋ผ ๋งคํ"""
column_map = {
"market_cap": "market_cap",
"change_rate": "change_rate",
"volume": "total_volume",
"strength": "change_rate" # ์์๋ก change_rate ์ฌ์ฉ
}
return column_map.get(sort_by, "market_cap")
def _sort_sectors(self, data: List[Dict[str, Any]], sort_by: str) -> List[Dict[str, Any]]:
"""์นํฐ ๋ฐ์ดํฐ ์ ๋ ฌ"""
if not data:
return data
if sort_by == "market_cap":
return sorted(data, key=lambda x: x.get("market_cap", 0), reverse=True)
elif sort_by == "change_rate":
return sorted(data, key=lambda x: x.get("change_rate", 0), reverse=True)
elif sort_by == "volume":
return sorted(data, key=lambda x: x.get("total_volume", 0), reverse=True)
elif sort_by == "strength":
# ์นํฐ ๊ฐ๋๋ก ์ ๋ ฌ (์์น์ข
๋ชฉ๋น์จ + ๋ณํ์จ ๊ณ ๋ ค)
return sorted(data, key=lambda x: self._calculate_sector_strength_score(x), reverse=True)
else:
return data
def _calculate_sector_strength_score(self, sector_data: Dict[str, Any]) -> float:
"""์นํฐ ๊ฐ๋ ์ ์ ๊ณ์ฐ"""
advancing = sector_data.get("advancing_stocks", 0)
total = sector_data.get("total_stocks", 1)
change_rate = sector_data.get("change_rate", 0)
advance_ratio = advancing / total if total > 0 else 0
return advance_ratio * 0.6 + (change_rate / 10.0) * 0.4
def _format_sector_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""์นํฐ ๋ฐ์ดํฐ ํฌ๋งทํ
"""
advancing = data.get("advancing_stocks", 0)
declining = data.get("declining_stocks", 0)
unchanged = data.get("unchanged_stocks", 0)
change_rate = data.get("change_rate", 0)
# ์นํฐ ๊ฐ๋ ๊ณ์ฐ
strength = self._calculate_sector_strength(advancing, declining, unchanged, change_rate)
return {
"sector_name": data.get("sector_name"),
"sector_code": data.get("sector_code"),
"market_cap": int(data.get("market_cap", 0)),
"market_cap_formatted": self._format_market_cap(data.get("market_cap", 0)),
"total_volume": int(data.get("total_volume", 0)),
"change_rate": float(data.get("change_rate", 0)),
"advancing_stocks": int(advancing),
"declining_stocks": int(declining),
"unchanged_stocks": int(unchanged),
"total_stocks": int(data.get("total_stocks", 0)),
"advance_decline_ratio": round(advancing / declining if declining > 0 else float('inf'), 2),
"sector_strength": strength,
"top_performer": data.get("top_performer"),
"top_performer_change": float(data.get("top_performer_change", 0)),
"worst_performer": data.get("worst_performer"),
"worst_performer_change": float(data.get("worst_performer_change", 0)),
"date": data.get("date").isoformat() if data.get("date") else None,
"timestamp": data.get("timestamp").isoformat() if data.get("timestamp") else None
}
def _calculate_sector_strength(self, advancing: int, declining: int, unchanged: int, change_rate: float) -> str:
"""์นํฐ ๊ฐ๋ ๊ณ์ฐ"""
total = advancing + declining + unchanged
if total == 0:
return "์ ์ ์์"
advance_ratio = advancing / total
# ์์น ์ข
๋ชฉ ๋น์จ๊ณผ ์นํฐ ๋ณํ์จ์ ์ข
ํฉ ๊ณ ๋ ค
if advance_ratio >= 0.59 and change_rate >= 4.0:
return "๋งค์ฐ ๊ฐํจ"
elif advance_ratio >= 0.5 and change_rate >= 1.0:
return "๊ฐํจ"
elif advance_ratio >= 0.3 and change_rate >= -1.0:
return "๋ณดํต"
elif advance_ratio >= 0.2 and change_rate >= -3.0:
return "์ฝํจ"
else:
return "๋งค์ฐ ์ฝํจ"
def _format_market_cap(self, market_cap: float) -> str:
"""์๊ฐ์ด์ก ํฌ๋งทํ
"""
if market_cap >= 1000000000000: # 1์กฐ ์ด์
trillion = market_cap / 1000000000000
return f"{trillion:.1f}์กฐ"
elif market_cap >= 100000000000: # 1000์ต ์ด์
hundred_billion = market_cap / 100000000000
return f"{hundred_billion:.0f}00์ต"
elif market_cap >= 100000000: # 1์ต ์ด์
hundred_million = market_cap / 100000000
return f"{hundred_million:.0f}์ต"
else:
return f"{int(market_cap):,}์"
def _calculate_market_summary(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์์ฅ ์์ฝ ๊ณ์ฐ"""
if not sector_data:
return {}
total_market_cap = sum(data.get("market_cap", 0) for data in sector_data)
positive_sectors = len([s for s in sector_data if s.get("change_rate", 0) > 0])
negative_sectors = len([s for s in sector_data if s.get("change_rate", 0) < 0])
avg_change_rate = statistics.mean([s.get("change_rate", 0) for s in sector_data])
return {
"sector_count": len(sector_data),
"total_market_cap": int(total_market_cap),
"total_market_cap_formatted": self._format_market_cap(total_market_cap),
"positive_sectors": positive_sectors,
"negative_sectors": negative_sectors,
"neutral_sectors": len(sector_data) - positive_sectors - negative_sectors,
"avg_change_rate": round(avg_change_rate, 2),
"market_sentiment": "์์น" if avg_change_rate > 0.5 else "ํ๋ฝ" if avg_change_rate < -0.5 else "๋ณดํฉ"
}
async def _add_major_stocks_info(self, result: Dict[str, Any], sector_data: List[Dict[str, Any]]):
"""์ฃผ์ ์ข
๋ชฉ ์ ๋ณด ์ถ๊ฐ"""
try:
# ๊ฐ ์นํฐ๋ณ๋ก ์ฃผ์ ์ข
๋ชฉ ์กฐํ
major_stocks_queries = []
for sector in sector_data[:5]: # ์์ 5๊ฐ ์นํฐ๋ง
sector_code = sector.get("sector_code")
if sector_code:
query = """
SELECT stock_code, stock_name, current_price, change_rate, volume, market_cap
FROM stock_daily_data
WHERE sector_code = %s AND DATE(date) = CURRENT_DATE
ORDER BY market_cap DESC
LIMIT 3
"""
major_stocks_queries.append((query, sector_code))
# ๋ฐฐ์น ์ฟผ๋ฆฌ ์คํ
if major_stocks_queries:
queries = [q[0] for q in major_stocks_queries]
params_list = [[q[1]] for q in major_stocks_queries]
stocks_results = await self.db_manager.fetch_many(queries, params_list)
# ๊ฒฐ๊ณผ๋ฅผ ์นํฐ๋ณ๋ก ๋งคํ
for i, sector in enumerate(result["sector_analysis"][:len(stocks_results)]):
if i < len(stocks_results):
sector["major_stocks"] = [
{
"stock_code": stock.get("stock_code"),
"stock_name": stock.get("stock_name"),
"current_price": int(stock.get("current_price", 0)),
"change_rate": float(stock.get("change_rate", 0)),
"volume": int(stock.get("volume", 0)),
"market_cap": int(stock.get("market_cap", 0))
}
for stock in stocks_results[i]
]
except Exception as e:
self.logger.warning(f"Failed to fetch major stocks info: {e}")
def _analyze_relative_strength(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์๋ ๊ฐ๋ ๋ถ์"""
if len(sector_data) < 2:
return {"error": "์๋ ๊ฐ๋ ๋ถ์์ ์ํ ์ถฉ๋ถํ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
# ๋ณํ์จ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ
sorted_by_performance = sorted(sector_data, key=lambda x: x.get("change_rate", 0), reverse=True)
strongest = sorted_by_performance[0]
weakest = sorted_by_performance[-1]
return {
"strongest_sector": {
"name": strongest.get("sector_name"),
"change_rate": strongest.get("change_rate"),
"advance_ratio": strongest.get("advancing_stocks", 0) / max(strongest.get("total_stocks", 1), 1)
},
"weakest_sector": {
"name": weakest.get("sector_name"),
"change_rate": weakest.get("change_rate"),
"advance_ratio": weakest.get("advancing_stocks", 0) / max(weakest.get("total_stocks", 1), 1)
},
"performance_spread": round(strongest.get("change_rate", 0) - weakest.get("change_rate", 0), 2)
}
async def _analyze_sector_rotation(self, current_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์นํฐ ๋กํ
์ด์
๋ถ์"""
try:
# ๊ณผ๊ฑฐ ๋ฐ์ดํฐ ์กฐํ (1์ฃผ์ผ ์ )
historical_query = """
SELECT sector_name, change_rate
FROM sector_performance
WHERE DATE(date) = CURRENT_DATE - INTERVAL '7 days'
"""
historical_data = await self.db_manager.fetch_all(historical_query)
if not historical_data:
return {"error": "๋กํ
์ด์
๋ถ์์ ์ํ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค"}
# ํ์ฌ์ ๊ณผ๊ฑฐ ์ฑ๊ณผ ๋น๊ต
momentum_leaders = []
momentum_laggards = []
# ์นํฐ๋ณ ๋ชจ๋ฉํ
๊ณ์ฐ
historical_dict = {h.get("sector_name"): h.get("change_rate", 0) for h in historical_data}
for current in current_data:
sector_name = current.get("sector_name")
current_rate = current.get("change_rate", 0)
historical_rate = historical_dict.get(sector_name, 0)
momentum = current_rate - historical_rate
if momentum > 1.0: # 1% ์ด์ ๊ฐ์
momentum_leaders.append({
"sector": sector_name,
"momentum": round(momentum, 2),
"current_rate": current_rate,
"historical_rate": historical_rate
})
elif momentum < -1.0: # 1% ์ด์ ์
ํ
momentum_laggards.append({
"sector": sector_name,
"momentum": round(momentum, 2),
"current_rate": current_rate,
"historical_rate": historical_rate
})
return {
"momentum_leaders": sorted(momentum_leaders, key=lambda x: x["momentum"], reverse=True)[:3],
"momentum_laggards": sorted(momentum_laggards, key=lambda x: x["momentum"])[:3],
"analysis_period": "1์ฃผ์ผ"
}
except Exception as e:
self.logger.warning(f"Sector rotation analysis failed: {e}")
return {"error": f"๋กํ
์ด์
๋ถ์ ์ค ์ค๋ฅ ๋ฐ์: {e}"}