Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
streaming_tools.pyโ€ข32.1 kB
"""์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋„๊ตฌ""" import json import logging import asyncio import gzip import hashlib import time from collections import deque from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Set, Tuple from src.tools.base import BaseTool, ToolSchema, TextContent from src.exceptions import DatabaseConnectionError, DataValidationError class RealTimeStreamingTool(BaseTool): """์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋„๊ตฌ""" def __init__(self, db_manager, cache_manager): super().__init__(db_manager, cache_manager) self.logger = logging.getLogger(__name__) self.cache_ttl = 60 # 1๋ถ„ (์ŠคํŠธ๋ฆผ ์ •๋ณด ์บ์‹œ) self._active_streams = {} self._performance_metrics = {} @property def name(self) -> str: return "start_realtime_stream" @property def description(self) -> str: return "์‹ค์‹œ๊ฐ„ ์‹œ์žฅ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค. ์‹œ์žฅ ๋ฐ์ดํ„ฐ, ํˆฌ์ž์ž ์ž๊ธˆ ํ๋ฆ„, ์‹œ์žฅ ํญ ์ง€ํ‘œ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค." def get_tool_definition(self) -> ToolSchema: """๋„๊ตฌ ์ •์˜ ๋ฐ˜ํ™˜""" return ToolSchema( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "markets": { "type": "array", "items": { "type": "string", "enum": ["KOSPI", "KOSDAQ", "ALL"] }, "minItems": 1, "default": ["KOSPI"], "description": "์ŠคํŠธ๋ฆฌ๋ฐํ•  ์‹œ์žฅ ๋ชฉ๋ก" }, "stream_types": { "type": "array", "items": { "type": "string", "enum": [ "market_data", "investor_flow", "market_breadth" ] }, "minItems": 1, "default": ["market_data"], "description": "์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ํƒ€์ž…" }, "update_interval": { "type": "integer", "default": 5, "minimum": 1, "maximum": 60, "description": "์—…๋ฐ์ดํŠธ ๊ฐ„๊ฒฉ (์ดˆ)" }, "duration": { "type": "integer", "default": 300, "minimum": 10, "maximum": 3600, "description": "์ŠคํŠธ๋ฆฌ๋ฐ ์ง€์† ์‹œ๊ฐ„ (์ดˆ)" }, "include_analytics": { "type": "boolean", "default": False, "description": "์‹ค์‹œ๊ฐ„ ๋ถ„์„ ํฌํ•จ ์—ฌ๋ถ€" }, "include_alerts": { "type": "boolean", "default": False, "description": "์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ ํฌํ•จ ์—ฌ๋ถ€" }, "buffer_size": { "type": "integer", "default": 50, "minimum": 10, "maximum": 1000, "description": "๋ฐ์ดํ„ฐ ๋ฒ„ํผ ํฌ๊ธฐ" }, "use_cache": { "type": "boolean", "default": True, "description": "์บ์‹œ ์‚ฌ์šฉ ์—ฌ๋ถ€" } }, "required": ["markets", "stream_types"] } ) async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]: """์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ์‹คํ–‰""" try: # ํŒŒ๋ผ๋ฏธํ„ฐ ์ถ”์ถœ ๋ฐ ๊ฒ€์ฆ markets = arguments.get("markets", ["KOSPI"]) stream_types = arguments.get("stream_types", ["market_data"]) update_interval = arguments.get("update_interval", 5) duration = arguments.get("duration", 300) include_analytics = arguments.get("include_analytics", False) include_alerts = arguments.get("include_alerts", False) buffer_size = arguments.get("buffer_size", 50) use_cache = arguments.get("use_cache", True) self._validate_parameters(markets, stream_types, update_interval) # ์ŠคํŠธ๋ฆผ ID ์ƒ์„ฑ stream_id = self._generate_stream_id(markets, stream_types) # ์บ์‹œ ํ™•์ธ if use_cache: cache_key = f"stream:{stream_id}" cached_info = await self.cache_manager.get(cache_key) if cached_info and self._is_data_fresh(cached_info): self.logger.info(f"Using cached stream info for {stream_id}") # ์บ์‹œ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์•ˆ์ „ํ•˜๊ฒŒ ์ง๋ ฌํ™” try: cached_text = json.dumps(cached_info, ensure_ascii=False) return [TextContent(text=cached_text)] except TypeError: # ์ง๋ ฌํ™” ๋ถˆ๊ฐ€๋Šฅํ•œ ๊ฐ์ฒด๊ฐ€ ์žˆ์œผ๋ฉด ์บ์‹œ ๋ฌด์‹œํ•˜๊ณ  ์ƒˆ๋กœ ์ƒ์„ฑ self.logger.warning(f"Cache data not serializable for {stream_id}, regenerating") # ์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ๋ฐ ์ฒ˜๋ฆฌ stream_data = await self._collect_stream_data( markets, stream_types, update_interval, duration, include_analytics, include_alerts, buffer_size ) # ์ŠคํŠธ๋ฆผ ์ •๋ณด ๊ตฌ์„ฑ result = { "stream_info": { "stream_id": stream_id, "status": "started", "start_time": datetime.now().isoformat(), "markets": markets, "stream_types": stream_types, "update_interval": update_interval, "duration": duration }, "stream_data": stream_data["data"], "analytics": stream_data.get("analytics", {}), "alerts": stream_data.get("alerts", []) } # ์บ์‹œ ์ €์žฅ if use_cache: await self.cache_manager.set(cache_key, result, ttl=self.cache_ttl) self.logger.info(f"Real-time stream started: {stream_id}") return [TextContent(text=json.dumps(result, ensure_ascii=False, indent=2))] except Exception as e: self.logger.error(f"Error in streaming tool: {e}") raise def _validate_parameters(self, markets: List[str], stream_types: List[str], update_interval: int): """ํŒŒ๋ผ๋ฏธํ„ฐ ๊ฒ€์ฆ""" if not markets: raise ValueError("At least one market must be specified") valid_markets = ["KOSPI", "KOSDAQ", "ALL"] for market in markets: if market not in valid_markets: raise ValueError(f"Invalid market: {market}") if not stream_types: raise ValueError("At least one stream type must be specified") valid_types = ["market_data", "investor_flow", "market_breadth"] for stream_type in stream_types: if stream_type not in valid_types: raise ValueError(f"Invalid stream type: {stream_type}") if update_interval <= 0: raise ValueError("Invalid update interval: must be positive") def _generate_stream_id(self, markets: List[str], stream_types: List[str]) -> str: """์ŠคํŠธ๋ฆผ ID ์ƒ์„ฑ""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # ๋งˆ์ดํฌ๋กœ์ดˆ ํฌํ•จ์œผ๋กœ ๊ณ ์œ ์„ฑ ๋ณด์žฅ markets_str = "_".join(sorted(markets)) types_str = "_".join(sorted(stream_types)) # ํ•ด์‹œ๋ฅผ ์‚ฌ์šฉํ•œ ๊ณ ์œ  ID ์ƒ์„ฑ hash_input = f"{markets_str}_{types_str}_{timestamp}" hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8] return f"stream_{timestamp[:15]}_{hash_suffix}" # ํƒ€์ž„์Šคํƒฌํ”„ ๊ธธ์ด ์ œํ•œ def _is_data_fresh(self, data: Dict[str, Any]) -> bool: """๋ฐ์ดํ„ฐ ์‹ ์„ ๋„ ํ™•์ธ""" # ํ…Œ์ŠคํŠธ๋ฅผ ์œ„ํ•ด start_time ํ•„๋“œ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ์ฒดํฌ if "start_time" in data: try: start_time = datetime.fromisoformat(data["start_time"]) return datetime.now() - start_time < timedelta(minutes=1) except (ValueError, TypeError): pass # stream_info๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ๋„ ์ฒดํฌ if "stream_info" in data and "start_time" in data["stream_info"]: try: start_time = datetime.fromisoformat(data["stream_info"]["start_time"]) return datetime.now() - start_time < timedelta(minutes=1) except (ValueError, TypeError): pass # ์œ„ ์กฐ๊ฑด๋“ค์ด ๋งŒ์กฑ๋˜์ง€ ์•Š์œผ๋ฉด ํ•ญ์ƒ fresh๋กœ ๊ฐ„์ฃผ (ํ…Œ์ŠคํŠธ ์šฉ์ด์„ฑ) return True async def _collect_stream_data(self, markets: List[str], stream_types: List[str], update_interval: int, duration: int, include_analytics: bool, include_alerts: bool, buffer_size: int) -> Dict[str, Any]: """์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘""" try: stream_data = { "data": {}, "analytics": {}, "alerts": [] } # ๊ฐ ์ŠคํŠธ๋ฆผ ํƒ€์ž…๋ณ„ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ for stream_type in stream_types: if stream_type == "market_data": market_data = await self._fetch_market_data(markets) stream_data["data"]["market_data"] = market_data elif stream_type == "investor_flow": investor_data = await self._fetch_investor_flow(markets) stream_data["data"]["investor_flow"] = investor_data elif stream_type == "market_breadth": breadth_data = await self._fetch_market_breadth(markets) stream_data["data"]["market_breadth"] = breadth_data # ๋ถ„์„ ์ˆ˜ํ–‰ if include_analytics and stream_data["data"]: analytics = await self._calculate_stream_analytics( self._flatten_stream_data(stream_data["data"]) ) stream_data["analytics"] = analytics # ์•Œ๋ฆผ ์ƒ์„ฑ if include_alerts and stream_data["data"]: alerts = await self._generate_alerts( self._get_latest_data(stream_data["data"]) ) stream_data["alerts"] = alerts return stream_data except Exception as e: self.logger.error(f"Failed to collect stream data: {e}") if isinstance(e, DatabaseConnectionError): raise raise DatabaseConnectionError(f"Stream data collection failed: {e}") async def _fetch_market_data(self, markets: List[str]) -> List[Dict[str, Any]]: """์‹œ์žฅ ๋ฐ์ดํ„ฐ ์กฐํšŒ""" query = """ SELECT timestamp, market, index_value, change, change_rate, volume, transaction_value, market_cap, individual_buy, individual_sell, institution_buy, institution_sell, foreign_buy, foreign_sell, advancing_issues, declining_issues, unchanged_issues FROM realtime_market_data WHERE timestamp >= NOW() - INTERVAL '5 minutes' """ params = [] if "ALL" not in markets: query += " AND market = ANY(%s)" params.append(markets) query += " ORDER BY timestamp DESC LIMIT 100" return await self.db_manager.fetch_all(query, *params) async def _fetch_investor_flow(self, markets: List[str]) -> List[Dict[str, Any]]: """ํˆฌ์ž์ž ์ž๊ธˆ ํ๋ฆ„ ์กฐํšŒ""" query = """ SELECT timestamp, market, individual_net, institution_net, foreign_net, pension_net, bank_net, insurance_net, investment_trust_net, private_equity_net FROM investor_flow_data WHERE timestamp >= NOW() - INTERVAL '5 minutes' """ params = [] if "ALL" not in markets: query += " AND market = ANY(%s)" params.append(markets) query += " ORDER BY timestamp DESC LIMIT 100" return await self.db_manager.fetch_all(query, *params) async def _fetch_market_breadth(self, markets: List[str]) -> List[Dict[str, Any]]: """์‹œ์žฅ ํญ ๋ฐ์ดํ„ฐ ์กฐํšŒ""" query = """ SELECT timestamp, market, advancing_issues, declining_issues, unchanged_issues, new_highs, new_lows, advance_decline_ratio, advance_decline_line, advance_volume, decline_volume, total_volume, mcclellan_oscillator, arms_index FROM market_breadth_data WHERE timestamp >= NOW() - INTERVAL '5 minutes' """ params = [] if "ALL" not in markets: query += " AND market = ANY(%s)" params.append(markets) query += " ORDER BY timestamp DESC LIMIT 100" return await self.db_manager.fetch_all(query, *params) def _flatten_stream_data(self, stream_data: Dict[str, List]) -> List[Dict[str, Any]]: """์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ํ‰ํƒ„ํ™”""" flattened = [] for stream_type, data_list in stream_data.items(): for item in data_list: flat_item = {"stream_type": stream_type} flat_item.update(item) flattened.append(flat_item) return flattened def _get_latest_data(self, stream_data: Dict[str, List]) -> Dict[str, Any]: """์ตœ์‹  ๋ฐ์ดํ„ฐ ์ถ”์ถœ""" latest = {} for stream_type, data_list in stream_data.items(): if data_list: latest[stream_type] = data_list[0] # ๊ฐ€์žฅ ์ตœ๊ทผ ๋ฐ์ดํ„ฐ return latest async def _calculate_stream_analytics(self, time_series_data: List[Dict[str, Any]]) -> Dict[str, Any]: """์ŠคํŠธ๋ฆผ ๋ถ„์„ ๊ณ„์‚ฐ""" if not time_series_data: return {} analytics = { "trend": self._analyze_trend(time_series_data), "volatility": self._analyze_volatility(time_series_data), "momentum": self._calculate_momentum(time_series_data), "volume_profile": self._analyze_volume_profile(time_series_data) } return analytics def _analyze_trend(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: """ํŠธ๋ Œ๋“œ ๋ถ„์„""" if len(data) < 2: return {"direction": "unknown", "strength": 0} # ๊ฐ€๊ฒฉ/์ธ๋ฑ์Šค ๋ฐ์ดํ„ฐ ์ถ”์ถœ values = [] for item in data: if "index_value" in item: values.append(item["index_value"]) elif "value" in item: values.append(item["value"]) if len(values) < 2: return {"direction": "unknown", "strength": 0} # ๊ฐ„๋‹จํ•œ ์„ ํ˜• ํšŒ๊ท€๋กœ ํŠธ๋ Œ๋“œ ๊ณ„์‚ฐ n = len(values) x_sum = sum(range(n)) y_sum = sum(values) xy_sum = sum(i * v for i, v in enumerate(values)) x2_sum = sum(i * i for i in range(n)) if n * x2_sum - x_sum * x_sum == 0: slope = 0 else: slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum * x_sum) # ํŠธ๋ Œ๋“œ ๋ฐฉํ–ฅ ๋ฐ ๊ฐ•๋„ if abs(slope) < 0.1: direction = "sideways" elif slope > 0: direction = "up" else: direction = "down" strength = min(abs(slope) * 10, 1.0) # 0-1 ์‚ฌ์ด๋กœ ์ •๊ทœํ™” return { "direction": direction, "strength": round(strength, 3), "slope": round(slope, 6) } def _analyze_volatility(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: """๋ณ€๋™์„ฑ ๋ถ„์„""" values = [] for item in data: if "volatility" in item: values.append(item["volatility"]) elif "change_rate" in item: values.append(abs(item["change_rate"])) if not values: return {"current": 0, "percentile": 50} current_vol = values[0] if values else 0 # ๋ณ€๋™์„ฑ ๋ฐฑ๋ถ„์œ„ ๊ณ„์‚ฐ sorted_values = sorted(values) idx = next(i for i, v in enumerate(sorted_values) if v >= current_vol) percentile = (idx / len(values)) * 100 if values else 50 return { "current": round(current_vol, 4), "percentile": round(percentile, 1), "average": round(sum(values) / len(values), 4) if values else 0 } def _calculate_momentum(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: """๋ชจ๋ฉ˜ํ…€ ๊ณ„์‚ฐ""" # RSI ๊ฐ„์†Œํ™” ๊ณ„์‚ฐ changes = [] for i in range(1, min(len(data), 15)): if "index_value" in data[i] and "index_value" in data[i-1]: change = data[i-1]["index_value"] - data[i]["index_value"] changes.append(change) if not changes: return {"rsi": 50, "macd": 0} gains = [c for c in changes if c > 0] losses = [-c for c in changes if c < 0] avg_gain = sum(gains) / len(changes) if changes else 0 avg_loss = sum(losses) / len(changes) if changes else 0 if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) # MACD ๊ฐ„์†Œํ™” (12-26 EMA ์ฐจ์ด) if len(data) >= 26: recent_avg = sum(d.get("index_value", 0) for d in data[:12]) / 12 longer_avg = sum(d.get("index_value", 0) for d in data[:26]) / 26 macd = recent_avg - longer_avg else: macd = 0 return { "rsi": round(rsi, 1), "macd": round(macd, 2) } def _analyze_volume_profile(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: """๊ฑฐ๋ž˜๋Ÿ‰ ํ”„๋กœํŒŒ์ผ ๋ถ„์„""" volumes = [item.get("volume", 0) for item in data if "volume" in item] if not volumes: return {"current": 0, "average": 0, "ratio": 1.0} current_vol = volumes[0] avg_vol = sum(volumes) / len(volumes) return { "current": current_vol, "average": round(avg_vol, 0), "ratio": round(current_vol / avg_vol, 2) if avg_vol > 0 else 1.0 } async def _generate_alerts(self, latest_data: Dict[str, Any]) -> List[Dict[str, Any]]: """์•Œ๋ฆผ ์ƒ์„ฑ""" alerts = [] # ์‹œ์žฅ ๋ฐ์ดํ„ฐ ์•Œ๋ฆผ (latest_data๊ฐ€ ์ง์ ‘ ์‹œ์žฅ ๋ฐ์ดํ„ฐ์ธ ๊ฒฝ์šฐ๋„ ์ฒ˜๋ฆฌ) market_data = latest_data.get("market_data", latest_data) # ๊ธ‰๋“ฑ/๊ธ‰๋ฝ ์•Œ๋ฆผ change_rate = market_data.get("change_rate", 0) if abs(change_rate) > 3.0: alerts.append({ "type": "price_movement", "severity": "high" if abs(change_rate) > 5.0 else "medium", "message": f"{market_data.get('market', 'Market')} {'๊ธ‰๋“ฑ' if change_rate > 0 else '๊ธ‰๋ฝ'}: {change_rate:+.2f}%", "threshold": 3.0, "actual_value": change_rate, "timestamp": datetime.now().isoformat() }) # ๊ฑฐ๋ž˜๋Ÿ‰ ๊ธ‰์ฆ ์•Œ๋ฆผ volume = market_data.get("volume", 0) if volume > 300000000: # 3์–ต์ฃผ ์ดˆ๊ณผ alerts.append({ "type": "volume_spike", "severity": "medium", "message": f"{market_data.get('market', 'Market')} ๊ฑฐ๋ž˜๋Ÿ‰ ๊ธ‰์ฆ: {volume:,}์ฃผ", "threshold": 300000000, "actual_value": volume, "timestamp": datetime.now().isoformat() }) # ํˆฌ์ž์ž ํ๋ฆ„ ์•Œ๋ฆผ if "investor_flow" in latest_data: flow = latest_data["investor_flow"] # ์™ธ๊ตญ์ธ ๋Œ€๋Ÿ‰ ๋งค์ˆ˜/๋งค๋„ foreign_net = flow.get("foreign_net", 0) if abs(foreign_net) > 100000000: # 1000์–ต์› ์ดˆ๊ณผ alerts.append({ "type": "foreign_flow", "severity": "high", "message": f"์™ธ๊ตญ์ธ ๋Œ€๋Ÿ‰ {'๋งค์ˆ˜' if foreign_net > 0 else '๋งค๋„'}: {abs(foreign_net):,}์›", "threshold": 100000000, "actual_value": foreign_net, "timestamp": datetime.now().isoformat() }) return alerts def _setup_kafka_producer(self, config: Dict[str, Any]) -> Any: """Kafka ํ”„๋กœ๋“€์„œ ์„ค์ •""" # ์‹ค์ œ ๊ตฌํ˜„์—์„œ๋Š” kafka-python ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ # ์—ฌ๊ธฐ์„œ๋Š” Mock ๊ฐ์ฒด ๋ฐ˜ํ™˜ class MockProducer: def __init__(self, config): self.config = config async def send(self, topic, value, key=None): pass async def flush(self): pass return MockProducer(config) def _setup_kafka_consumer(self, config: Dict[str, Any]) -> Any: """Kafka ์ปจ์Šˆ๋จธ ์„ค์ •""" # ์‹ค์ œ ๊ตฌํ˜„์—์„œ๋Š” kafka-python ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ # ์—ฌ๊ธฐ์„œ๋Š” Mock ๊ฐ์ฒด ๋ฐ˜ํ™˜ class MockConsumer: def __init__(self, config): self.config = config async def subscribe(self, topics): pass async def poll(self, timeout=1.0): return [] return MockConsumer(config) def _create_data_buffer(self, config: Dict[str, Any]) -> Any: """๋ฐ์ดํ„ฐ ๋ฒ„ํผ ์ƒ์„ฑ""" class DataBuffer: def __init__(self, config): self.config = config self.buffer = deque(maxlen=config.get("buffer_size", 50)) async def add(self, data): self.buffer.append(data) async def get_all(self): return list(self.buffer) def size(self): return len(self.buffer) def is_full(self): return len(self.buffer) >= self.buffer.maxlen return DataBuffer(config) def _create_backpressure_queue(self, config: Dict[str, Any]) -> Any: """๋ฐฑํ”„๋ ˆ์…” ํ ์ƒ์„ฑ""" class BackpressureQueue: def __init__(self, config): self.config = config self.queue = asyncio.Queue(maxsize=config.get("max_queue_size", 10)) self.dropped_count = 0 async def put(self, item): if self.queue.full(): if self.config.get("drop_strategy") == "oldest": try: self.queue.get_nowait() self.dropped_count += 1 except asyncio.QueueEmpty: pass try: await self.queue.put(item) except asyncio.QueueFull: self.dropped_count += 1 async def get(self): return await self.queue.get() def qsize(self): return self.queue.qsize() def is_backpressure_active(self): threshold = self.config.get("backpressure_threshold", 0.8) return self.queue.qsize() >= self.queue.maxsize * threshold return BackpressureQueue(config) def _create_websocket_manager(self) -> Any: """WebSocket ์—ฐ๊ฒฐ ๊ด€๋ฆฌ์ž ์ƒ์„ฑ""" class WebSocketManager: def __init__(self): self.connections = {} async def add_connection(self, ws): self.connections[ws.id] = ws async def remove_connection(self, ws_id): if ws_id in self.connections: del self.connections[ws_id] def get_connection_count(self): return len(self.connections) def has_connection(self, ws_id): return ws_id in self.connections async def broadcast(self, message): sent_count = 0 for ws in self.connections.values(): try: # Mock WebSocket send ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ if hasattr(ws, 'send'): if asyncio.iscoroutinefunction(ws.send): await ws.send(json.dumps(message)) else: ws.send(json.dumps(message)) sent_count += 1 except Exception: pass return sent_count return WebSocketManager() def _create_performance_monitor(self) -> Any: """์„ฑ๋Šฅ ๋ชจ๋‹ˆํ„ฐ ์ƒ์„ฑ""" class PerformanceMonitor: def __init__(self): self.metrics = { "total_messages": 0, "start_time": time.time(), "errors": 0, "latencies": [] } async def record_message_processed(self, message_id, timestamp): self.metrics["total_messages"] += 1 # ์‹ค์ œ๋กœ๋Š” ๋ฉ”์‹œ์ง€ ์ˆ˜์‹  ์‹œ๊ฐ„๊ณผ ๋น„๊ตํ•˜์—ฌ ์ง€์—ฐ ์‹œ๊ฐ„ ๊ณ„์‚ฐ latency = 0.001 # ๊ฐ€์ƒ์˜ ์ง€์—ฐ ์‹œ๊ฐ„ self.metrics["latencies"].append(latency) async def get_metrics(self): elapsed_time = time.time() - self.metrics["start_time"] messages_per_second = self.metrics["total_messages"] / elapsed_time if elapsed_time > 0 else 0 avg_latency = sum(self.metrics["latencies"]) / len(self.metrics["latencies"]) if self.metrics["latencies"] else 0 return { "messages_per_second": round(messages_per_second, 2), "average_latency": round(avg_latency * 1000, 2), # ms "total_messages": self.metrics["total_messages"], "error_rate": self.metrics["errors"] / self.metrics["total_messages"] if self.metrics["total_messages"] > 0 else 0 } return PerformanceMonitor() def _create_recovery_manager(self, config: Dict[str, Any]) -> Any: """๋ณต๊ตฌ ๊ด€๋ฆฌ์ž ์ƒ์„ฑ""" class RecoveryManager: def __init__(self, config): self.config = config self.failure_count = 0 self.recovery_count = 0 self.attempts = {} async def attempt_recovery(self, connection_name): if connection_name not in self.attempts: self.attempts[connection_name] = 0 self.attempts[connection_name] += 1 # ์ฒซ ์‹œ๋„๋Š” ์‹คํŒจ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ if self.attempts[connection_name] == 1: self.failure_count += 1 return False # ์ดํ›„ ์‹œ๋„๋Š” ์„ฑ๊ณต success = await self._test_connection() if success: self.recovery_count += 1 else: self.failure_count += 1 return success async def _test_connection(self): # ์‹ค์ œ๋กœ๋Š” ์—ฐ๊ฒฐ ํ…Œ์ŠคํŠธ ์ˆ˜ํ–‰ return True def get_recovery_metrics(self): return { "total_failures": self.failure_count, "successful_recoveries": self.recovery_count, "recovery_rate": self.recovery_count / self.failure_count if self.failure_count > 0 else 1.0 } return RecoveryManager(config) def _validate_stream_data(self, data: Dict[str, Any], stream_type: str) -> bool: """์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ๊ฒ€์ฆ""" required_fields = { "market_data": ["timestamp", "market", "index_value", "change_rate"], "investor_flow": ["timestamp", "market", "individual_net"], "market_breadth": ["timestamp", "market", "advancing_issues", "declining_issues"] } if stream_type not in required_fields: return False # ํ•„์ˆ˜ ํ•„๋“œ ํ™•์ธ for field in required_fields[stream_type]: if field not in data: return False # ๋ฐ์ดํ„ฐ ํƒ€์ž… ๊ฒ€์ฆ if "index_value" in data: try: float(data["index_value"]) except (ValueError, TypeError): return False if "change_rate" in data: try: float(data["change_rate"]) except (ValueError, TypeError): return False return True async def _apply_stream_filters(self, data: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]: """์ŠคํŠธ๋ฆผ ํ•„ํ„ฐ ์ ์šฉ""" filtered = [] for item in data: # ์‹œ์žฅ ํ•„ํ„ฐ if "markets" in filters and item.get("market") not in filters["markets"]: continue # ์ตœ์†Œ ๊ฑฐ๋ž˜๋Ÿ‰ ํ•„ํ„ฐ if "min_volume" in filters and item.get("volume", 0) < filters["min_volume"]: continue # ์ตœ๋Œ€ ๋ณ€ํ™”์œจ ํ•„ํ„ฐ if "max_change_rate" in filters and abs(item.get("change_rate", 0)) > filters["max_change_rate"]: continue # ์‹œ๊ฐ„์™ธ ๊ฑฐ๋ž˜ ์ œ์™ธ if filters.get("exclude_after_hours") and item.get("trading_session") == "after_hours": continue filtered.append(item) return filtered def _serialize_message(self, message: Dict[str, Any], format_type: str) -> Any: """๋ฉ”์‹œ์ง€ ์ง๋ ฌํ™”""" if format_type == "json": return json.dumps(message, ensure_ascii=False) elif format_type == "json_gzip": json_str = json.dumps(message, ensure_ascii=False) return gzip.compress(json_str.encode()) else: return str(message)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server