Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
investor_tools.pyβ€’15.5 kB
"""투자자 동ν–₯ 뢄석 도ꡬ""" import json import logging from datetime import datetime, timedelta from typing import Any, Dict, List from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class InvestorFlowsTool(BaseTool): """νˆ¬μžμžλ³„ λ§€λ§€ 동ν–₯을 λΆ„μ„ν•˜λŠ” 도ꡬ""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 300 # 5λΆ„ @property def name(self) -> str: return "get_investor_flows" @property def description(self) -> str: return "νˆ¬μžμžλ³„(개인, κΈ°κ΄€, 외ꡭ인) λ§€λ§€ 동ν–₯κ³Ό 자금 흐름을 λΆ„μ„ν•©λ‹ˆλ‹€." def get_tool_definition(self) -> ToolSchema: """도ꡬ μ •μ˜ λ°˜ν™˜""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "market": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"], "default": "KOSPI", "description": "μ‘°νšŒν•  μ‹œμž₯" }, "period": { "type": "string", "enum": ["1d", "1w", "1m", "3m"], "default": "1d", "description": "쑰회 κΈ°κ°„ (1d: 1일, 1w: 1μ£Ό, 1m: 1κ°œμ›”, 3m: 3κ°œμ›”)" }, "investor_type": { "type": "string", "enum": ["개인", "κΈ°κ΄€", "외ꡭ인", "ALL"], "default": "ALL", "description": "μ‘°νšŒν•  투자자 μœ ν˜•" }, "include_market_impact": { "type": "boolean", "default": False, "description": "μ‹œμž₯ 영ν–₯도 뢄석 포함 μ—¬λΆ€" } }, "required": [] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """투자자 동ν–₯ 데이터 쑰회 및 뢄석""" try: # νŒŒλΌλ―Έν„° μΆ”μΆœ 및 검증 market = arguments.get("market", "KOSPI") period = arguments.get("period", "1d") investor_type = arguments.get("investor_type", "ALL") include_market_impact = arguments.get("include_market_impact", False) self._validate_parameters(market, period, investor_type) # μΊμ‹œ 확인 cache_key = self._generate_cache_key(market, period, investor_type, include_market_impact) cached_data = await self.cache_manager.get(cache_key) if cached_data and self._is_data_fresh(cached_data): self.logger.info(f"Cache hit for {cache_key}") return [TextContent(text=json.dumps(cached_data, ensure_ascii=False))] # λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ 데이터 쑰회 data = await self._fetch_investor_data(market, period, investor_type, include_market_impact) # μΊμ‹œ μ €μž₯ await self.cache_manager.set(cache_key, data, ttl=self.cache_ttl) self.logger.info(f"Investor flows data fetched for {market}/{period}") return [TextContent(text=json.dumps(data, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in investor flows tool: {e}") raise def _validate_parameters(self, market: str, period: str, investor_type: str): """νŒŒλΌλ―Έν„° 검증""" valid_markets = ["KOSPI", "KOSDAQ", "ALL"] valid_periods = ["1d", "1w", "1m", "3m"] valid_investor_types = ["개인", "κΈ°κ΄€", "외ꡭ인", "ALL"] if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if period not in valid_periods: raise ValueError(f"Invalid period: {period}") if investor_type not in valid_investor_types: raise ValueError(f"Invalid investor_type: {investor_type}") def _generate_cache_key(self, market: str, period: str, investor_type: str, include_impact: bool) -> str: """μΊμ‹œ ν‚€ 생성""" return f"investor_flows:{market}:{period}:{investor_type}:{include_impact}" def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """데이터 신선도 확인""" if "timestamp" not in data: return False try: timestamp = datetime.fromisoformat(data["timestamp"]) return datetime.now() - timestamp < timedelta(minutes=5) except (ValueError, TypeError): return False async def _fetch_investor_data(self, market: str, period: str, investor_type: str, include_impact: bool) -> Dict[str, Any]: """λ°μ΄ν„°λ² μ΄μŠ€μ—μ„œ 투자자 동ν–₯ 데이터 쑰회""" try: # κΈ°κ°„ μ„€μ • days = self._get_period_days(period) # 쿼리 ꡬ성 query = """ SELECT date, investor_type, buy_amount, sell_amount, net_amount, market FROM investor_flows WHERE date >= CURRENT_DATE - INTERVAL '%s days' """ params = [days] # μ‹œμž₯ ν•„ν„° if market != "ALL": query += " AND market = %s" params.append(market) # 투자자 μœ ν˜• ν•„ν„° if investor_type != "ALL": query += " AND investor_type = %s" params.append(investor_type) query += " ORDER BY date DESC, investor_type" # 데이터 쑰회 flows_data = await self.db_manager.fetch_all(query, *params) # κ²°κ³Ό ꡬ성 result = { "timestamp": datetime.now().isoformat(), "period": period, "market": market, "investor_type": investor_type, "flows": [self._format_flow_data(flow) for flow in flows_data], "summary": self._calculate_summary(flows_data) } # 빈 데이터 처리 if not flows_data: result["message"] = "μš”μ²­ν•œ κΈ°κ°„μ˜ 투자자 동ν–₯ 데이터가 μ—†μŠ΅λ‹ˆλ‹€" result["flows"] = [] # λˆ„μ  ν”Œλ‘œμš° 계산 (μ£Όκ°„/μ›”κ°„ κΈ°κ°„μ˜ 경우) if period in ["1w", "1m", "3m"] and flows_data: result["cumulative_flows"] = self._calculate_cumulative_flows(flows_data) # μ‹œμž₯ 영ν–₯도 뢄석 포함 if include_impact and flows_data: impact_data = await self._analyze_market_impact(market, period, flows_data) result["market_impact"] = impact_data return result except Exception as e: self.logger.error(f"Database query failed: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Failed to fetch investor data: {e}") def _get_period_days(self, period: str) -> int: """기간을 일수둜 λ³€ν™˜""" period_map = { "1d": 1, "1w": 7, "1m": 30, "3m": 90 } return period_map.get(period, 1) def _format_flow_data(self, flow: Dict[str, Any]) -> Dict[str, Any]: """ν”Œλ‘œμš° 데이터 ν¬λ§·νŒ…""" net_amount = flow.get("net_amount", 0) return { "date": flow.get("date").isoformat() if flow.get("date") else None, "investor_type": flow.get("investor_type"), "buy_amount": int(flow.get("buy_amount", 0)), "sell_amount": int(flow.get("sell_amount", 0)), "net_amount": int(net_amount), "net_amount_formatted": self._format_amount(net_amount), "market": flow.get("market") } def _format_amount(self, amount: int) -> str: """κΈˆμ•‘ ν¬λ§·νŒ… (μ‘°, μ–΅ λ‹¨μœ„)""" if amount == 0: return "0원" abs_amount = abs(amount) sign = "-" if amount < 0 else "" if abs_amount >= 1000000000000: # 1μ‘° 이상 trillion = abs_amount / 1000000000000 if trillion == int(trillion): return f"{sign}{int(trillion)}μ‘°" else: return f"{sign}{trillion:.1f}μ‘°" elif abs_amount >= 100000000: # 1μ–΅ 이상 hundred_million = abs_amount / 100000000 if hundred_million >= 1000: # 1000μ–΅ 이상은 콀마 ν‘œμ‹œ return f"{sign}{int(hundred_million):,}μ–΅" else: return f"{sign}{int(hundred_million)}μ–΅" else: return f"{sign}{int(abs_amount):,}원" def _calculate_summary(self, flows_data: List[Dict[str, Any]]) -> Dict[str, Any]: """νˆ¬μžμžλ³„ 순맀맀 μš”μ•½ 계산""" summary = { "net_individual": 0, "net_foreign": 0, "net_institution": 0, "total_trading_value": 0 } for flow in flows_data: investor_type = flow.get("investor_type", "") net_amount = flow.get("net_amount", 0) buy_amount = flow.get("buy_amount", 0) sell_amount = flow.get("sell_amount", 0) if investor_type == "개인": summary["net_individual"] += net_amount elif investor_type == "외ꡭ인": summary["net_foreign"] += net_amount elif investor_type == "κΈ°κ΄€": summary["net_institution"] += net_amount summary["total_trading_value"] += buy_amount + sell_amount # ν¬λ§·νŒ…λœ 버전도 μΆ”κ°€ summary["net_individual_formatted"] = self._format_amount(summary["net_individual"]) summary["net_foreign_formatted"] = self._format_amount(summary["net_foreign"]) summary["net_institution_formatted"] = self._format_amount(summary["net_institution"]) return summary def _calculate_cumulative_flows(self, flows_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """λˆ„μ  ν”Œλ‘œμš° 계산""" # λ‚ μ§œλ³„λ‘œ κ·Έλ£Ήν™” daily_flows = {} for flow in flows_data: date = flow.get("date") if date: date_str = date.isoformat() if hasattr(date, 'isoformat') else str(date) if date_str not in daily_flows: daily_flows[date_str] = 0 daily_flows[date_str] += flow.get("net_amount", 0) # λ‚ μ§œ 순으둜 μ •λ ¬ν•˜κ³  λˆ„μ  계산 sorted_dates = sorted(daily_flows.keys()) cumulative = 0 cumulative_flows = [] for date_str in sorted_dates: daily_net = daily_flows[date_str] cumulative += daily_net cumulative_flows.append({ "date": date_str, "daily_net": daily_net, "cumulative_net": cumulative, "daily_net_formatted": self._format_amount(daily_net), "cumulative_net_formatted": self._format_amount(cumulative) }) return cumulative_flows async def _analyze_market_impact(self, market: str, period: str, flows_data: List[Dict[str, Any]]) -> Dict[str, Any]: """μ‹œμž₯ 영ν–₯도 뢄석""" try: # 동일 κΈ°κ°„ μ‹œμž₯ μ§€μˆ˜ λ³€ν™” 쑰회 days = self._get_period_days(period) index_data = await self.db_manager.fetch_one( """ SELECT (SELECT current_value FROM market_indices WHERE index_code = %s ORDER BY timestamp DESC LIMIT 1) - (SELECT current_value FROM market_indices WHERE index_code = %s AND timestamp <= CURRENT_TIMESTAMP - INTERVAL '%s days' ORDER BY timestamp DESC LIMIT 1) as index_change, ((SELECT current_value FROM market_indices WHERE index_code = %s ORDER BY timestamp DESC LIMIT 1) / (SELECT current_value FROM market_indices WHERE index_code = %s AND timestamp <= CURRENT_TIMESTAMP - INTERVAL '%s days' ORDER BY timestamp DESC LIMIT 1) - 1) * 100 as index_change_rate """, market if market != "ALL" else "KOSPI", market if market != "ALL" else "KOSPI", days, market if market != "ALL" else "KOSPI", market if market != "ALL" else "KOSPI", days ) if not index_data: return {"error": "μ‹œμž₯ μ§€μˆ˜ 데이터λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€"} # νˆ¬μžμžλ³„ μˆœλ§€λ§€μ™€ μ§€μˆ˜ λ³€ν™” 상관관계 뢄석 summary = self._calculate_summary(flows_data) index_change_rate = index_data.get("index_change_rate", 0) # λ‹¨μˆœ 상관관계 계산 (μ‹€μ œλ‘œλŠ” 더 λ³΅μž‘ν•œ 톡계적 뢄석이 ν•„μš”) foreign_correlation = -1 if (summary["net_foreign"] < 0 and index_change_rate > 0) or \ (summary["net_foreign"] > 0 and index_change_rate < 0) else 1 individual_correlation = 1 if (summary["net_individual"] > 0 and index_change_rate > 0) or \ (summary["net_individual"] < 0 and index_change_rate < 0) else -1 return { "index_change": float(index_data.get("index_change", 0)), "index_change_rate": float(index_change_rate), "correlation_analysis": { "foreign_vs_index": foreign_correlation, "individual_vs_index": individual_correlation, "interpretation": self._interpret_correlation(foreign_correlation, individual_correlation) } } except Exception as e: self.logger.warning(f"Market impact analysis failed: {e}") return {"error": f"μ‹œμž₯ 영ν–₯도 뢄석 쀑 였λ₯˜ λ°œμƒ: {e}"} def _interpret_correlation(self, foreign_corr: float, individual_corr: float) -> str: """상관관계 해석""" interpretations = [] if foreign_corr < 0: interpretations.append("외ꡭ인 자금이 μ§€μˆ˜μ™€ λ°˜λŒ€ λ°©ν–₯으둜 μ›€μ§μž„") else: interpretations.append("외ꡭ인 자금이 μ§€μˆ˜μ™€ 같은 λ°©ν–₯으둜 μ›€μ§μž„") if individual_corr > 0: interpretations.append("개인 νˆ¬μžμžκ°€ μ§€μˆ˜μ™€ 같은 λ°©ν–₯으둜 μ›€μ§μž„") else: interpretations.append("개인 νˆ¬μžμžκ°€ μ§€μˆ˜μ™€ λ°˜λŒ€ λ°©ν–₯으둜 μ›€μ§μž„") return "; ".join(interpretations)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server