Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
market_breadth_tools.pyβ€’12.3 kB
"""μ‹œμž₯ 폭 μ§€ν‘œ 도ꡬ""" import json import logging import statistics from datetime import datetime, timedelta from typing import Any, Dict, List from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class MarketBreadthTool(BaseTool): """μ‹œμž₯ 폭 μ§€ν‘œ 뢄석 도ꡬ""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 300 # 5λΆ„ @property def name(self) -> str: return "get_market_breadth" @property def description(self) -> str: return "μ‹œμž₯ 폭 μ§€ν‘œ(μƒμŠΉ/ν•˜λ½ μ’…λͺ© λΉ„μœ¨, A/D Line λ“±)λ₯Ό λΆ„μ„ν•˜μ—¬ μ‹œμž₯의 강도와 λ°©ν–₯성을 μ œκ³΅ν•©λ‹ˆλ‹€." def get_tool_definition(self) -> ToolSchema: """도ꡬ μ •μ˜ λ°˜ν™˜""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "market": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"], "default": "KOSPI", "description": "μ‘°νšŒν•  μ‹œμž₯" }, "period": { "type": "string", "enum": ["1d", "1w", "1m", "3m"], "default": "1d", "description": "쑰회 κΈ°κ°„ (1d: 1일, 1w: 1μ£Ό, 1m: 1κ°œμ›”, 3m: 3κ°œμ›”)" }, "include_volume_analysis": { "type": "boolean", "default": False, "description": "κ±°λž˜λŸ‰ 뢄석 포함 μ—¬λΆ€" } }, "required": [] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """μ‹œμž₯ 폭 μ§€ν‘œ 데이터 쑰회 및 뢄석""" try: # νŒŒλΌλ―Έν„° μΆ”μΆœ 및 검증 market = arguments.get("market", "KOSPI") period = arguments.get("period", "1d") include_volume_analysis = arguments.get("include_volume_analysis", False) self._validate_parameters(market, period) # μΊμ‹œ 확인 cache_key = self._generate_cache_key(market, period, include_volume_analysis) cached_data = await self.cache_manager.get(cache_key) if cached_data and self._is_data_fresh(cached_data): self.logger.info(f"Cache hit for {cache_key}") return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))] # λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ 데이터 쑰회 data = await self._fetch_breadth_data(market, period, include_volume_analysis) # μΊμ‹œ μ €μž₯ await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl) self.logger.info(f"Market breadth data fetched for {market}/{period}") return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in market breadth tool: {e}") raise def _validate_parameters(self, market: str, period: str): """νŒŒλΌλ―Έν„° 검증""" valid_markets = ["KOSPI", "KOSDAQ", "ALL"] valid_periods = ["1d", "1w", "1m", "3m"] if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if period not in valid_periods: raise ValueError(f"Invalid period: {period}") def _generate_cache_key(self, market: str, period: str, include_volume: bool) -> str: """μΊμ‹œ ν‚€ 생성""" return f"market_breadth:{market}:{period}:{include_volume}" def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """데이터 신선도 확인""" if "timestamp" not in data: return False try: timestamp = datetime.fromisoformat(data["timestamp"]) return datetime.now() - timestamp < timedelta(minutes=5) except (ValueError, TypeError): return False async def _fetch_breadth_data(self, market: str, period: str, include_volume: bool) -> Dict[str, Any]: """λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ μ‹œμž₯ 폭 μ§€ν‘œ 데이터 쑰회""" try: # κΈ°κ°„ μ„€μ • days = self._get_period_days(period) # 쿼리 ꡬ성 query = """ SELECT date, market, advancing, declining, unchanged, total_issues, advance_decline_ratio, advance_volume, decline_volume, timestamp FROM market_breadth WHERE date >= CURRENT_DATE - INTERVAL '%s days' """ params = [days] # μ‹œμž₯ ν•„ν„° if market != "ALL": query += " AND market = %s" params.append(market) query += " ORDER BY date DESC, market" # 데이터 쑰회 breadth_data = await self.db_manager.fetch_all(query, *params) # κ²°κ³Ό ꡬ성 result = { "timestamp": datetime.now().isoformat(), "period": period, "market": market, "breadth_data": [self._format_breadth_data(data) for data in breadth_data], "summary": self._calculate_summary(breadth_data) } # 빈 데이터 처리 if not breadth_data: result["message"] = "μš”μ²­ν•œ κΈ°κ°„μ˜ μ‹œμž₯ 폭 데이터가 μ—†μŠ΅λ‹ˆλ‹€" result["breadth_data"] = [] # νŠΈλ Œλ“œ 뢄석 (μ£Όκ°„/μ›”κ°„ κΈ°κ°„μ˜ 경우) if period in ["1w", "1m", "3m"] and breadth_data: result["trend_analysis"] = self._analyze_trend(breadth_data) # κ±°λž˜λŸ‰ 뢄석 포함 if include_volume and breadth_data: result["volume_analysis"] = self._analyze_volume(breadth_data) return result except Exception as e: self.logger.error(f"Database query failed: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Failed to fetch market breadth data: {e}") def _get_period_days(self, period: str) -> int: """기간을 일수둜 λ³€ν™˜""" period_map = { "1d": 1, "1w": 7, "1m": 30, "3m": 90 } return period_map.get(period, 1) def _format_breadth_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """μ‹œμž₯ 폭 데이터 ν¬λ§·νŒ…""" advancing = data.get("advancing", 0) declining = data.get("declining", 0) return { "date": data.get("date").isoformat() if data.get("date") else None, "market": data.get("market"), "advancing": int(advancing), "declining": int(declining), "unchanged": int(data.get("unchanged", 0)), "total_issues": int(data.get("total_issues", 0)), "advance_decline_ratio": float(data.get("advance_decline_ratio", 0)), "advance_volume": int(data.get("advance_volume", 0)), "decline_volume": int(data.get("decline_volume", 0)), "timestamp": data.get("timestamp").isoformat() if data.get("timestamp") else None } def _calculate_ad_ratio(self, advancing: int, declining: int) -> float: """μƒμŠΉν•˜λ½λΉ„μœ¨ 계산""" if declining == 0: return float('inf') if advancing > 0 else 0.0 return advancing / declining def _interpret_market_sentiment(self, ratio: float) -> str: """μ‹œμž₯ 심리 해석""" if ratio >= 2.0: return "κ°•ν•œ μƒμŠΉμ„Έ" elif ratio >= 1.2: return "μƒμŠΉμ„Έ" elif ratio >= 0.8: return "보합" elif ratio >= 0.5: return "ν•˜λ½μ„Έ" else: return "κ°•ν•œ ν•˜λ½μ„Έ" def _calculate_summary(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]: """μ‹œμž₯ 폭 μš”μ•½ 계산""" if not breadth_data: return {} # 평균 μƒμŠΉν•˜λ½λΉ„μœ¨ 계산 ratios = [data.get("advance_decline_ratio", 0) for data in breadth_data if data.get("advance_decline_ratio") is not None] avg_ratio = statistics.mean(ratios) if ratios else 0.0 # μ‹œμž₯ 심리 νŒλ‹¨ market_sentiment = self._interpret_market_sentiment(avg_ratio) # μ΅œμ‹  데이터 κΈ°μ€€ μΆ”κ°€ 정보 latest = breadth_data[0] if breadth_data else {} return { "avg_advance_decline_ratio": round(avg_ratio, 2), "market_sentiment": market_sentiment, "latest_advancing": latest.get("advancing", 0), "latest_declining": latest.get("declining", 0), "latest_unchanged": latest.get("unchanged", 0), "data_points": len(breadth_data) } def _analyze_trend(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]: """νŠΈλ Œλ“œ 뢄석""" if len(breadth_data) < 2: return {"error": "νŠΈλ Œλ“œ 뢄석을 μœ„ν•œ μΆ©λΆ„ν•œ 데이터가 μ—†μŠ΅λ‹ˆλ‹€"} # μ‹œκ°„μˆœ μ •λ ¬ (였래된 것뢀터) sorted_data = sorted(breadth_data, key=lambda x: x.get("date", datetime.min)) # μƒμŠΉν•˜λ½λΉ„μœ¨μ˜ λ³€ν™” 계산 ratios = [data.get("advance_decline_ratio", 0) for data in sorted_data] # μ„ ν˜• νšŒκ·€λ₯Ό ν†΅ν•œ νŠΈλ Œλ“œ λ°©ν–₯ 계산 (λ‹¨μˆœν™”) if len(ratios) >= 2: first_half_avg = statistics.mean(ratios[:len(ratios)//2]) second_half_avg = statistics.mean(ratios[len(ratios)//2:]) trend_change = second_half_avg - first_half_avg if trend_change > 0.1: direction = "μƒμŠΉ" strength = "강함" if trend_change > 0.2 else "약함" elif trend_change < -0.1: direction = "ν•˜λ½" strength = "강함" if trend_change < -0.2 else "약함" else: direction = "보합" strength = "보톡" else: direction = "뢈λͺ…" strength = "뢈λͺ…" return { "direction": direction, "strength": strength, "trend_change": round(trend_change if 'trend_change' in locals() else 0.0, 3), "days_analyzed": len(sorted_data) } def _analyze_volume(self, breadth_data: List[Dict[str, Any]]) -> Dict[str, Any]: """κ±°λž˜λŸ‰ 뢄석""" if not breadth_data: return {} # μ΅œμ‹  데이터 latest = breadth_data[0] advance_volume = latest.get("advance_volume", 0) decline_volume = latest.get("decline_volume", 0) # κ±°λž˜λŸ‰ λΉ„μœ¨ 계산 total_volume = advance_volume + decline_volume volume_ratio = advance_volume / decline_volume if decline_volume > 0 else float('inf') # κ±°λž˜λŸ‰ νŠΈλ Œλ“œ νŒλ‹¨ if volume_ratio >= 1.5: volume_trend = "μƒμŠΉ κ±°λž˜λŸ‰ μš°μ„Έ" elif volume_ratio >= 0.67: volume_trend = "κ· ν˜•" else: volume_trend = "ν•˜λ½ κ±°λž˜λŸ‰ μš°μ„Έ" return { "advance_volume": advance_volume, "decline_volume": decline_volume, "total_volume": total_volume, "volume_ratio": round(volume_ratio, 2) if volume_ratio != float('inf') else "λ¬΄ν•œλŒ€", "volume_trend": volume_trend, "advance_volume_pct": round((advance_volume / total_volume * 100), 1) if total_volume > 0 else 0.0 }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server