Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
sector_tools.pyโ€ข20.3 kB
"""์„นํ„ฐ ๋ถ„์„ ๋„๊ตฌ""" import json import logging import statistics from datetime import datetime, timedelta from typing import Any, Dict, List from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class SectorAnalysisTool(BaseTool): """์„นํ„ฐ๋ณ„ ๋ถ„์„ ๋„๊ตฌ""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 300 # 5๋ถ„ @property def name(self) -> str: return "get_sector_analysis" @property def description(self) -> str: return "์—…์ข…๋ณ„/์„นํ„ฐ๋ณ„ ์ฃผ์‹ ์‹œ์žฅ ๋ถ„์„์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์„นํ„ฐ ์„ฑ๊ณผ, ์‹œ๊ฐ€์ด์•ก, ์ฃผ์š” ์ข…๋ชฉ ๋“ฑ์„ ๋ถ„์„ํ•ฉ๋‹ˆ๋‹ค." def get_tool_definition(self) -> ToolSchema: """๋„๊ตฌ ์ •์˜ ๋ฐ˜ํ™˜""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "market": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"], "default": "ALL", "description": "์กฐํšŒํ•  ์‹œ์žฅ" }, "sector": { "type": "string", "default": "ALL", "description": "์กฐํšŒํ•  ์„นํ„ฐ (์˜ˆ: IT, ๊ธˆ์œต, ๋ฐ”์ด์˜ค, ALL)" }, "sort_by": { "type": "string", "enum": ["market_cap", "change_rate", "volume", "strength"], "default": "market_cap", "description": "์ •๋ ฌ ๊ธฐ์ค€" }, "include_stocks": { "type": "boolean", "default": False, "description": "์ฃผ์š” ์ข…๋ชฉ ์ •๋ณด ํฌํ•จ ์—ฌ๋ถ€" }, "include_relative_strength": { "type": "boolean", "default": False, "description": "์ƒ๋Œ€ ๊ฐ•๋„ ๋ถ„์„ ํฌํ•จ ์—ฌ๋ถ€" }, "include_rotation_analysis": { "type": "boolean", "default": False, "description": "์„นํ„ฐ ๋กœํ…Œ์ด์…˜ ๋ถ„์„ ํฌํ•จ ์—ฌ๋ถ€" } }, "required": [] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """์„นํ„ฐ ๋ถ„์„ ๋ฐ์ดํ„ฐ ์กฐํšŒ ๋ฐ ๋ถ„์„""" try: # ํŒŒ๋ผ๋ฏธํ„ฐ ์ถ”์ถœ ๋ฐ ๊ฒ€์ฆ market = arguments.get("market", "ALL") sector = arguments.get("sector", "ALL") sort_by = arguments.get("sort_by", "market_cap") include_stocks = arguments.get("include_stocks", False) include_relative_strength = arguments.get("include_relative_strength", False) include_rotation_analysis = arguments.get("include_rotation_analysis", False) self._validate_parameters(market, sort_by) # ์บ์‹œ ํ™•์ธ cache_key = self._generate_cache_key(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis) cached_data = await self.cache_manager.get(cache_key) if cached_data and self._is_data_fresh(cached_data): self.logger.info(f"Cache hit for {cache_key}") return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))] # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ ๋ฐ์ดํ„ฐ ์กฐํšŒ data = await self._fetch_sector_data(market, sector, sort_by, include_stocks, include_relative_strength, include_rotation_analysis) # ์บ์‹œ ์ €์žฅ await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl) self.logger.info(f"Sector analysis data fetched for {market}/{sector}") return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in sector analysis tool: {e}") raise def _validate_parameters(self, market: str, sort_by: str): """ํŒŒ๋ผ๋ฏธํ„ฐ ๊ฒ€์ฆ""" valid_markets = ["KOSPI", "KOSDAQ", "ALL"] valid_sort_by = ["market_cap", "change_rate", "volume", "strength"] if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if sort_by not in valid_sort_by: raise ValueError(f"Invalid sort_by: {sort_by}") def _generate_cache_key(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> str: """์บ์‹œ ํ‚ค ์ƒ์„ฑ""" return f"sector_analysis:{market}:{sector}:{sort_by}:{include_stocks}:{include_rs}:{include_rotation}" def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """๋ฐ์ดํ„ฐ ์‹ ์„ ๋„ ํ™•์ธ""" if "timestamp" not in data: return False try: timestamp = datetime.fromisoformat(data["timestamp"]) return datetime.now() - timestamp < timedelta(minutes=5) except (ValueError, TypeError): return False async def _fetch_sector_data(self, market: str, sector: str, sort_by: str, include_stocks: bool, include_rs: bool, include_rotation: bool) -> Dict[str, Any]: """๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ ์„นํ„ฐ ๋ถ„์„ ๋ฐ์ดํ„ฐ ์กฐํšŒ""" try: # ์ฟผ๋ฆฌ ๊ตฌ์„ฑ query = """ SELECT sector_name, sector_code, market_cap, total_volume, change_rate, advancing_stocks, declining_stocks, unchanged_stocks, total_stocks, top_performer, top_performer_change, worst_performer, worst_performer_change, date, timestamp FROM sector_performance WHERE DATE(date) = CURRENT_DATE """ params = [] # ์‹œ์žฅ ํ•„ํ„ฐ if market != "ALL": query += " AND market = %s" params.append(market) # ์„นํ„ฐ ํ•„ํ„ฐ if sector != "ALL": query += " AND (sector_name = %s OR sector_code = %s)" params.extend([sector, sector]) # ์ •๋ ฌ ์ถ”๊ฐ€ sort_column = self._get_sort_column(sort_by) query += f" ORDER BY {sort_column} DESC" # ๋ฐ์ดํ„ฐ ์กฐํšŒ sector_data = await self.db_manager.fetch_all(query, *params) # ์ •๋ ฌ ์ ์šฉ sorted_data = self._sort_sectors(sector_data, sort_by) # ๊ฒฐ๊ณผ ๊ตฌ์„ฑ result = { "timestamp": datetime.now().isoformat(), "market": market, "sector": sector, "sort_by": sort_by, "sector_analysis": [self._format_sector_data(data) for data in sorted_data], "market_summary": self._calculate_market_summary(sorted_data) } # ๋นˆ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ if not sector_data: result["message"] = "์š”์ฒญํ•œ ์„นํ„ฐ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค" result["sector_analysis"] = [] # ์ฃผ์š” ์ข…๋ชฉ ์ •๋ณด ํฌํ•จ if include_stocks and sector_data: await self._add_major_stocks_info(result, sorted_data) # ์ƒ๋Œ€ ๊ฐ•๋„ ๋ถ„์„ ํฌํ•จ if include_rs and sector_data: result["relative_strength"] = self._analyze_relative_strength(sorted_data) # ์„นํ„ฐ ๋กœํ…Œ์ด์…˜ ๋ถ„์„ ํฌํ•จ if include_rotation and sector_data: rotation_data = await self._analyze_sector_rotation(sorted_data) result["rotation_analysis"] = rotation_data return result except Exception as e: self.logger.error(f"Database query failed: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Failed to fetch sector data: {e}") def _get_sort_column(self, sort_by: str) -> str: """์ •๋ ฌ ์ปฌ๋Ÿผ ๋งคํ•‘""" column_map = { "market_cap": "market_cap", "change_rate": "change_rate", "volume": "total_volume", "strength": "change_rate" # ์ž„์‹œ๋กœ change_rate ์‚ฌ์šฉ } return column_map.get(sort_by, "market_cap") def _sort_sectors(self, data: List[Dict[str, Any]], sort_by: str) -> List[Dict[str, Any]]: """์„นํ„ฐ ๋ฐ์ดํ„ฐ ์ •๋ ฌ""" if not data: return data if sort_by == "market_cap": return sorted(data, key=lambda x: x.get("market_cap", 0), reverse=True) elif sort_by == "change_rate": return sorted(data, key=lambda x: x.get("change_rate", 0), reverse=True) elif sort_by == "volume": return sorted(data, key=lambda x: x.get("total_volume", 0), reverse=True) elif sort_by == "strength": # ์„นํ„ฐ ๊ฐ•๋„๋กœ ์ •๋ ฌ (์ƒ์Šน์ข…๋ชฉ๋น„์œจ + ๋ณ€ํ™”์œจ ๊ณ ๋ ค) return sorted(data, key=lambda x: self._calculate_sector_strength_score(x), reverse=True) else: return data def _calculate_sector_strength_score(self, sector_data: Dict[str, Any]) -> float: """์„นํ„ฐ ๊ฐ•๋„ ์ ์ˆ˜ ๊ณ„์‚ฐ""" advancing = sector_data.get("advancing_stocks", 0) total = sector_data.get("total_stocks", 1) change_rate = sector_data.get("change_rate", 0) advance_ratio = advancing / total if total > 0 else 0 return advance_ratio * 0.6 + (change_rate / 10.0) * 0.4 def _format_sector_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """์„นํ„ฐ ๋ฐ์ดํ„ฐ ํฌ๋งทํŒ…""" advancing = data.get("advancing_stocks", 0) declining = data.get("declining_stocks", 0) unchanged = data.get("unchanged_stocks", 0) change_rate = data.get("change_rate", 0) # ์„นํ„ฐ ๊ฐ•๋„ ๊ณ„์‚ฐ strength = self._calculate_sector_strength(advancing, declining, unchanged, change_rate) return { "sector_name": data.get("sector_name"), "sector_code": data.get("sector_code"), "market_cap": int(data.get("market_cap", 0)), "market_cap_formatted": self._format_market_cap(data.get("market_cap", 0)), "total_volume": int(data.get("total_volume", 0)), "change_rate": float(data.get("change_rate", 0)), "advancing_stocks": int(advancing), "declining_stocks": int(declining), "unchanged_stocks": int(unchanged), "total_stocks": int(data.get("total_stocks", 0)), "advance_decline_ratio": round(advancing / declining if declining > 0 else float('inf'), 2), "sector_strength": strength, "top_performer": data.get("top_performer"), "top_performer_change": float(data.get("top_performer_change", 0)), "worst_performer": data.get("worst_performer"), "worst_performer_change": float(data.get("worst_performer_change", 0)), "date": data.get("date").isoformat() if data.get("date") else None, "timestamp": data.get("timestamp").isoformat() if data.get("timestamp") else None } def _calculate_sector_strength(self, advancing: int, declining: int, unchanged: int, change_rate: float) -> str: """์„นํ„ฐ ๊ฐ•๋„ ๊ณ„์‚ฐ""" total = advancing + declining + unchanged if total == 0: return "์•Œ ์ˆ˜ ์—†์Œ" advance_ratio = advancing / total # ์ƒ์Šน ์ข…๋ชฉ ๋น„์œจ๊ณผ ์„นํ„ฐ ๋ณ€ํ™”์œจ์„ ์ข…ํ•ฉ ๊ณ ๋ ค if advance_ratio >= 0.59 and change_rate >= 4.0: return "๋งค์šฐ ๊ฐ•ํ•จ" elif advance_ratio >= 0.5 and change_rate >= 1.0: return "๊ฐ•ํ•จ" elif advance_ratio >= 0.3 and change_rate >= -1.0: return "๋ณดํ†ต" elif advance_ratio >= 0.2 and change_rate >= -3.0: return "์•ฝํ•จ" else: return "๋งค์šฐ ์•ฝํ•จ" def _format_market_cap(self, market_cap: float) -> str: """์‹œ๊ฐ€์ด์•ก ํฌ๋งทํŒ…""" if market_cap >= 1000000000000: # 1์กฐ ์ด์ƒ trillion = market_cap / 1000000000000 return f"{trillion:.1f}์กฐ" elif market_cap >= 100000000000: # 1000์–ต ์ด์ƒ hundred_billion = market_cap / 100000000000 return f"{hundred_billion:.0f}00์–ต" elif market_cap >= 100000000: # 1์–ต ์ด์ƒ hundred_million = market_cap / 100000000 return f"{hundred_million:.0f}์–ต" else: return f"{int(market_cap):,}์›" def _calculate_market_summary(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]: """์‹œ์žฅ ์š”์•ฝ ๊ณ„์‚ฐ""" if not sector_data: return {} total_market_cap = sum(data.get("market_cap", 0) for data in sector_data) positive_sectors = len([s for s in sector_data if s.get("change_rate", 0) > 0]) negative_sectors = len([s for s in sector_data if s.get("change_rate", 0) < 0]) avg_change_rate = statistics.mean([s.get("change_rate", 0) for s in sector_data]) return { "sector_count": len(sector_data), "total_market_cap": int(total_market_cap), "total_market_cap_formatted": self._format_market_cap(total_market_cap), "positive_sectors": positive_sectors, "negative_sectors": negative_sectors, "neutral_sectors": len(sector_data) - positive_sectors - negative_sectors, "avg_change_rate": round(avg_change_rate, 2), "market_sentiment": "์ƒ์Šน" if avg_change_rate > 0.5 else "ํ•˜๋ฝ" if avg_change_rate < -0.5 else "๋ณดํ•ฉ" } async def _add_major_stocks_info(self, result: Dict[str, Any], sector_data: List[Dict[str, Any]]): """์ฃผ์š” ์ข…๋ชฉ ์ •๋ณด ์ถ”๊ฐ€""" try: # ๊ฐ ์„นํ„ฐ๋ณ„๋กœ ์ฃผ์š” ์ข…๋ชฉ ์กฐํšŒ major_stocks_queries = [] for sector in sector_data[:5]: # ์ƒ์œ„ 5๊ฐœ ์„นํ„ฐ๋งŒ sector_code = sector.get("sector_code") if sector_code: query = """ SELECT stock_code, stock_name, current_price, change_rate, volume, market_cap FROM stock_daily_data WHERE sector_code = %s AND DATE(date) = CURRENT_DATE ORDER BY market_cap DESC LIMIT 3 """ major_stocks_queries.append((query, sector_code)) # ๋ฐฐ์น˜ ์ฟผ๋ฆฌ ์‹คํ–‰ if major_stocks_queries: queries = [q[0] for q in major_stocks_queries] params_list = [[q[1]] for q in major_stocks_queries] stocks_results = await self.db_manager.fetch_many(queries, params_list) # ๊ฒฐ๊ณผ๋ฅผ ์„นํ„ฐ๋ณ„๋กœ ๋งคํ•‘ for i, sector in enumerate(result["sector_analysis"][:len(stocks_results)]): if i < len(stocks_results): sector["major_stocks"] = [ { "stock_code": stock.get("stock_code"), "stock_name": stock.get("stock_name"), "current_price": int(stock.get("current_price", 0)), "change_rate": float(stock.get("change_rate", 0)), "volume": int(stock.get("volume", 0)), "market_cap": int(stock.get("market_cap", 0)) } for stock in stocks_results[i] ] except Exception as e: self.logger.warning(f"Failed to fetch major stocks info: {e}") def _analyze_relative_strength(self, sector_data: List[Dict[str, Any]]) -> Dict[str, Any]: """์ƒ๋Œ€ ๊ฐ•๋„ ๋ถ„์„""" if len(sector_data) < 2: return {"error": "์ƒ๋Œ€ ๊ฐ•๋„ ๋ถ„์„์„ ์œ„ํ•œ ์ถฉ๋ถ„ํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค"} # ๋ณ€ํ™”์œจ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌ sorted_by_performance = sorted(sector_data, key=lambda x: x.get("change_rate", 0), reverse=True) strongest = sorted_by_performance[0] weakest = sorted_by_performance[-1] return { "strongest_sector": { "name": strongest.get("sector_name"), "change_rate": strongest.get("change_rate"), "advance_ratio": strongest.get("advancing_stocks", 0) / max(strongest.get("total_stocks", 1), 1) }, "weakest_sector": { "name": weakest.get("sector_name"), "change_rate": weakest.get("change_rate"), "advance_ratio": weakest.get("advancing_stocks", 0) / max(weakest.get("total_stocks", 1), 1) }, "performance_spread": round(strongest.get("change_rate", 0) - weakest.get("change_rate", 0), 2) } async def _analyze_sector_rotation(self, current_data: List[Dict[str, Any]]) -> Dict[str, Any]: """์„นํ„ฐ ๋กœํ…Œ์ด์…˜ ๋ถ„์„""" try: # ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ ์กฐํšŒ (1์ฃผ์ผ ์ „) historical_query = """ SELECT sector_name, change_rate FROM sector_performance WHERE DATE(date) = CURRENT_DATE - INTERVAL '7 days' """ historical_data = await self.db_manager.fetch_all(historical_query) if not historical_data: return {"error": "๋กœํ…Œ์ด์…˜ ๋ถ„์„์„ ์œ„ํ•œ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค"} # ํ˜„์žฌ์™€ ๊ณผ๊ฑฐ ์„ฑ๊ณผ ๋น„๊ต momentum_leaders = [] momentum_laggards = [] # ์„นํ„ฐ๋ณ„ ๋ชจ๋ฉ˜ํ…€ ๊ณ„์‚ฐ historical_dict = {h.get("sector_name"): h.get("change_rate", 0) for h in historical_data} for current in current_data: sector_name = current.get("sector_name") current_rate = current.get("change_rate", 0) historical_rate = historical_dict.get(sector_name, 0) momentum = current_rate - historical_rate if momentum > 1.0: # 1% ์ด์ƒ ๊ฐœ์„  momentum_leaders.append({ "sector": sector_name, "momentum": round(momentum, 2), "current_rate": current_rate, "historical_rate": historical_rate }) elif momentum < -1.0: # 1% ์ด์ƒ ์•…ํ™” momentum_laggards.append({ "sector": sector_name, "momentum": round(momentum, 2), "current_rate": current_rate, "historical_rate": historical_rate }) return { "momentum_leaders": sorted(momentum_leaders, key=lambda x: x["momentum"], reverse=True)[:3], "momentum_laggards": sorted(momentum_laggards, key=lambda x: x["momentum"])[:3], "analysis_period": "1์ฃผ์ผ" } except Exception as e: self.logger.warning(f"Sector rotation analysis failed: {e}") return {"error": f"๋กœํ…Œ์ด์…˜ ๋ถ„์„ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server