"""섹터 분석 도구"""
import json
import logging
import statistics
from datetime import datetime, timedelta
from typing import Any, Dict, List
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class SectorAnalysisTool(BaseTool):
"""섹터별 분석 도구"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 300 # 5분
@property
def name(self) -> str:
return "get_sector_analysis"
@property
def description(self) -> str:
return "업종별/섹터별 주식 시장 분석을 제공합니다. 섹터 성과, 시가총액, 주요 종목 등을 분석합니다."
def get_tool_definition(self) -> ToolSchema:
"""도구 정의 반환"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"market": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"],
"default": "ALL",
"description": "조회할 시장"
},
"sector": {
"type": "string",
"default": "ALL",
"description": "조회할 섹터 (예: IT, 금융, 바이오, ALL)"
},
"sort_by": {
"type": "string",
"enum": ["market_cap", "change_rate", "volume", "strength"],
"default": "market_cap",
"description": "정렬 기준"
},
"include_stocks": {
"type": "boolean",
"default": False,
"description": "주요 종목 정보 포함 여부"
},
"include_relative_strength": {
"type": "boolean",
"default": False,
"description": "상대 강도 분석 포함 여부"
},
"include_rotation_analysis": {
"type": "boolean",
"default": False,
"description": "섹터 로테이션 분석 포함 여부"
}
},
"required": []
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""섹터 분석 데이터 조회 및 분석"""
try:
# 파라미터 추출 및 검증
market = arguments.get("market", "ALL")
sector = arguments.get("sector", "ALL")
sort_by = arguments.get("sort_by", "market_cap")
include_stocks = arguments.get("include_stocks", False)
include_relative_strength = arguments.get("include_relative_strength", False)
include_rotation_analysis = arguments.get("include_rotation_analysis", False)
self._validate_parameters(market, sort_by)
# 캐시 확인
cache_key = self._generate_cache_key(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis)
cached_data = await self.cache_manager.get(cache_key)
if cached_data and self._is_data_fresh(cached_data):
self.logger.info(f"Cache hit for {cache_key}")
return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))]
# 데이터베이스에서 데이터 조회
data = await self._fetch_sector_data(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis)
# 캐시 저장
await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl)
self.logger.info(f"Sector analysis data fetched for {market}/{sector}")
return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in sector analysis tool: {e}")
raise
def _validate_parameters(self, market: str, sort_by: str):
"""파라미터 검증"""
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
valid_sort_by = ["market_cap", "change_rate", "volume", "strength"]
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if sort_by not in valid_sort_by:
raise ValueError(f"Invalid sort_by: {sort_by}")
def _generate_cache_key(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> str:
"""캐시 키 생성"""
return f"sector_analysis:{market}:{sector}:{sort_by}:{include_stocks}:{include_rs}:{include_rotation}"
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""데이터 신선도 확인"""
if "timestamp" not in data:
return False
try:
timestamp = datetime.fromisoformat(data["timestamp"])
return datetime.now() - timestamp < timedelta(minutes=5)
except (ValueError, TypeError):
return False
async def _fetch_sector_data(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> Dict[str, Any]:
"""데이터베이스에서 섹터 분석 데이터 조회"""
try:
# 쿼리 구성
query = """
SELECT sector_name, sector_code, market_cap, total_volume, change_rate,
advancing_stocks, declining_stocks, unchanged_stocks, total_stocks,
top_performer, top_performer_change, worst_performer, worst_performer_change,
date, timestamp
FROM sector_performance
WHERE DATE(date) = CURRENT_DATE
"""
params = []
# 시장 필터
if market != "ALL":
query += " AND market = %s"
params.append(market)
# 섹터 필터
if sector != "ALL":
query += " AND (sector_name = %s OR sector_code = %s)"
params.extend([sector, sector])
# 정렬 추가
sort_column = self._get_sort_column(sort_by)
query += f" ORDER BY {sort_column} DESC"
# 데이터 조회
sector_data = await self.db_manager.fetch_all(query, *params)
# 정렬 적용
sorted_data = self._sort_sectors(sector_data, sort_by)
# 결과 구성
result = {
"timestamp": datetime.now().isoformat(),
"market": market,
"sector": sector,
"sort_by": sort_by,
"sector_analysis": [self._format_sector_data(data) for data in sorted_data],
"market_summary": self._calculate_market_summary(sorted_data)
}
# 빈 데이터 처리
if not sector_data:
result["message"] = "요청한 섹터의 데이터가 없습니다"
result["sector_analysis"] = []
# 주요 종목 정보 포함
if include_stocks and sector_data:
await self._add_major_stocks_info(result, sorted_data)
# 상대 강도 분석 포함
if include_rs and sector_data:
result["relative_strength"] = self._analyze_relative_strength(sorted_data)
# 섹터 로테이션 분석 포함
if include_rotation and sector_data:
rotation_data = await self._analyze_sector_rotation(sorted_data)
result["rotation_analysis"] = rotation_data
return result
except Exception as e:
self.logger.error(f"Database query failed: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Failed to fetch sector data: {e}")
def _get_sort_column(self, sort_by: str) -> str:
"""정렬 컬럼 매핑"""
column_map = {
"market_cap": "market_cap",
"change_rate": "change_rate",
"volume": "total_volume",
"strength": "change_rate" # 임시로 change_rate 사용
}
return column_map.get(sort_by, "market_cap")
def _sort_sectors(self, data: List[Dict[str, Any]], sort_by: str) -> List[Dict[str, Any]]:
"""섹터 데이터 정렬"""
if not data:
return data
if sort_by == "market_cap":
return sorted(data, key=lambda x: x.get("market_cap", 0), reverse=True)
elif sort_by == "change_rate":
return sorted(data, key=lambda x: x.get("change_rate", 0), reverse=True)
elif sort_by == "volume":
return sorted(data, key=lambda x: x.get("total_volume", 0), reverse=True)
elif sort_by == "strength":
# 섹터 강도로 정렬 (상승종목비율 + 변화율 고려)
return sorted(data, key=lambda x: self._calculate_sector_strength_score(x), reverse=True)
else:
return data
def _calculate_sector_strength_score(self, sector_data: Dict[str, Any]) -> float:
"""섹터 강도 점수 계산"""
advancing = sector_data.get("advancing_stocks", 0)
total = sector_data.get("total_stocks", 1)
change_rate = sector_data.get("change_rate", 0)
advance_ratio = advancing / total if total > 0 else 0
return advance_ratio * 0.6 + (change_rate / 10.0) * 0.4
def _format_sector_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""섹터 데이터 포맷팅"""
advancing = data.get("advancing_stocks", 0)
declining = data.get("declining_stocks", 0)
unchanged = data.get("unchanged_stocks", 0)
change_rate = data.get("change_rate", 0)
# 섹터 강도 계산
strength = self._calculate_sector_strength(advancing, declining, unchanged, change_rate)
return {
"sector_name": data.get("sector_name"),
"sector_code": data.get("sector_code"),
"market_cap": int(data.get("market_cap", 0)),
"market_cap_formatted": self._format_market_cap(data.get("market_cap", 0)),
"total_volume": int(data.get("total_volume", 0)),
"change_rate": float(data.get("change_rate", 0)),
"advancing_stocks": int(advancing),
"declining_stocks": int(declining),
"unchanged_stocks": int(unchanged),
"total_stocks": int(data.get("total_stocks", 0)),
"advance_decline_ratio": round(advancing / declining if declining > 0 else float('inf'), 2),
"sector_strength": strength,
"top_performer": data.get("top_performer"),
"top_performer_change": float(data.get("top_performer_change", 0)),
"worst_performer": data.get("worst_performer"),
"worst_performer_change": float(data.get("worst_performer_change", 0)),
"date": data.get("date").isoformat() if data.get("date") else None,
"timestamp": data.get("timestamp").isoformat() if data.get("timestamp") else None
}
def _calculate_sector_strength(self, advancing: int, declining: int, unchanged: int, change_rate: float) -> str:
"""섹터 강도 계산"""
total = advancing + declining + unchanged
if total == 0:
return "알 수 없음"
advance_ratio = advancing / total
# 상승 종목 비율과 섹터 변화율을 종합 고려
if advance_ratio >= 0.59 and change_rate >= 4.0:
return "매우 강함"
elif advance_ratio >= 0.5 and change_rate >= 1.0:
return "강함"
elif advance_ratio >= 0.3 and change_rate >= -1.0:
return "보통"
elif advance_ratio >= 0.2 and change_rate >= -3.0:
return "약함"
else:
return "매우 약함"
def _format_market_cap(self, market_cap: float) -> str:
"""시가총액 포맷팅"""
if market_cap >= 1000000000000: # 1조 이상
trillion = market_cap / 1000000000000
return f"{trillion:.1f}조"
elif market_cap >= 100000000000: # 1000억 이상
hundred_billion = market_cap / 100000000000
return f"{hundred_billion:.0f}00억"
elif market_cap >= 100000000: # 1억 이상
hundred_million = market_cap / 100000000
return f"{hundred_million:.0f}억"
else:
return f"{int(market_cap):,}원"
def _calculate_market_summary(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""시장 요약 계산"""
if not sector_data:
return {}
total_market_cap = sum(data.get("market_cap", 0) for data in sector_data)
positive_sectors = len([s for s in sector_data if s.get("change_rate", 0) > 0])
negative_sectors = len([s for s in sector_data if s.get("change_rate", 0) < 0])
avg_change_rate = statistics.mean([s.get("change_rate", 0) for s in sector_data])
return {
"sector_count": len(sector_data),
"total_market_cap": int(total_market_cap),
"total_market_cap_formatted": self._format_market_cap(total_market_cap),
"positive_sectors": positive_sectors,
"negative_sectors": negative_sectors,
"neutral_sectors": len(sector_data) - positive_sectors - negative_sectors,
"avg_change_rate": round(avg_change_rate, 2),
"market_sentiment": "상승" if avg_change_rate > 0.5 else "하락" if avg_change_rate < -0.5 else "보합"
}
async def _add_major_stocks_info(self, result: Dict[str, Any], sector_data: List[Dict[str, Any]]):
"""주요 종목 정보 추가"""
try:
# 각 섹터별로 주요 종목 조회
major_stocks_queries = []
for sector in sector_data[:5]: # 상위 5개 섹터만
sector_code = sector.get("sector_code")
if sector_code:
query = """
SELECT stock_code, stock_name, current_price, change_rate, volume, market_cap
FROM stock_daily_data
WHERE sector_code = %s AND DATE(date) = CURRENT_DATE
ORDER BY market_cap DESC
LIMIT 3
"""
major_stocks_queries.append((query, sector_code))
# 배치 쿼리 실행
if major_stocks_queries:
queries = [q[0] for q in major_stocks_queries]
params_list = [[q[1]] for q in major_stocks_queries]
stocks_results = await self.db_manager.fetch_many(queries, params_list)
# 결과를 섹터별로 매핑
for i, sector in enumerate(result["sector_analysis"][:len(stocks_results)]):
if i < len(stocks_results):
sector["major_stocks"] = [
{
"stock_code": stock.get("stock_code"),
"stock_name": stock.get("stock_name"),
"current_price": int(stock.get("current_price", 0)),
"change_rate": float(stock.get("change_rate", 0)),
"volume": int(stock.get("volume", 0)),
"market_cap": int(stock.get("market_cap", 0))
}
for stock in stocks_results[i]
]
except Exception as e:
self.logger.warning(f"Failed to fetch major stocks info: {e}")
def _analyze_relative_strength(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""상대 강도 분석"""
if len(sector_data) < 2:
return {"error": "상대 강도 분석을 위한 충분한 데이터가 없습니다"}
# 변화율 기준으로 정렬
sorted_by_performance = sorted(sector_data, key=lambda x: x.get("change_rate", 0), reverse=True)
strongest = sorted_by_performance[0]
weakest = sorted_by_performance[-1]
return {
"strongest_sector": {
"name": strongest.get("sector_name"),
"change_rate": strongest.get("change_rate"),
"advance_ratio": strongest.get("advancing_stocks", 0) / max(strongest.get("total_stocks", 1), 1)
},
"weakest_sector": {
"name": weakest.get("sector_name"),
"change_rate": weakest.get("change_rate"),
"advance_ratio": weakest.get("advancing_stocks", 0) / max(weakest.get("total_stocks", 1), 1)
},
"performance_spread": round(strongest.get("change_rate", 0) - weakest.get("change_rate", 0), 2)
}
async def _analyze_sector_rotation(self, current_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""섹터 로테이션 분석"""
try:
# 과거 데이터 조회 (1주일 전)
historical_query = """
SELECT sector_name, change_rate
FROM sector_performance
WHERE DATE(date) = CURRENT_DATE - INTERVAL '7 days'
"""
historical_data = await self.db_manager.fetch_all(historical_query)
if not historical_data:
return {"error": "로테이션 분석을 위한 과거 데이터가 없습니다"}
# 현재와 과거 성과 비교
momentum_leaders = []
momentum_laggards = []
# 섹터별 모멘텀 계산
historical_dict = {h.get("sector_name"): h.get("change_rate", 0) for h in historical_data}
for current in current_data:
sector_name = current.get("sector_name")
current_rate = current.get("change_rate", 0)
historical_rate = historical_dict.get(sector_name, 0)
momentum = current_rate - historical_rate
if momentum > 1.0: # 1% 이상 개선
momentum_leaders.append({
"sector": sector_name,
"momentum": round(momentum, 2),
"current_rate": current_rate,
"historical_rate": historical_rate
})
elif momentum < -1.0: # 1% 이상 악화
momentum_laggards.append({
"sector": sector_name,
"momentum": round(momentum, 2),
"current_rate": current_rate,
"historical_rate": historical_rate
})
return {
"momentum_leaders": sorted(momentum_leaders, key=lambda x: x["momentum"], reverse=True)[:3],
"momentum_laggards": sorted(momentum_laggards, key=lambda x: x["momentum"])[:3],
"analysis_period": "1주일"
}
except Exception as e:
self.logger.warning(f"Sector rotation analysis failed: {e}")
return {"error": f"로테이션 분석 중 오류 발생: {e}"}