streaming_tools.pyโข32.1 kB
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋๊ตฌ"""
import json
import logging
import asyncio
import gzip
import hashlib
import time
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Set, Tuple
from src.tools.base import BaseTool, ToolSchema, TextContent
from src.exceptions import DatabaseConnectionError, DataValidationError
class RealTimeStreamingTool(BaseTool):
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋๊ตฌ"""
def __init__(self, db_manager, cache_manager):
super().__init__(db_manager, cache_manager)
self.logger = logging.getLogger(__name__)
self.cache_ttl = 60 # 1๋ถ (์คํธ๋ฆผ ์ ๋ณด ์บ์)
self._active_streams = {}
self._performance_metrics = {}
@property
def name(self) -> str:
return "start_realtime_stream"
@property
def description(self) -> str:
return "์ค์๊ฐ ์์ฅ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ์ ์์ํฉ๋๋ค. ์์ฅ ๋ฐ์ดํฐ, ํฌ์์ ์๊ธ ํ๋ฆ, ์์ฅ ํญ ์งํ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ ๊ณตํฉ๋๋ค."
def get_tool_definition(self) -> ToolSchema:
"""๋๊ตฌ ์ ์ ๋ฐํ"""
return ToolSchema(
name=self.name,
description=self.description,
inputSchema={
"type": "object",
"properties": {
"markets": {
"type": "array",
"items": {
"type": "string",
"enum": ["KOSPI", "KOSDAQ", "ALL"]
},
"minItems": 1,
"default": ["KOSPI"],
"description": "์คํธ๋ฆฌ๋ฐํ ์์ฅ ๋ชฉ๋ก"
},
"stream_types": {
"type": "array",
"items": {
"type": "string",
"enum": [
"market_data",
"investor_flow",
"market_breadth"
]
},
"minItems": 1,
"default": ["market_data"],
"description": "์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ํ์
"
},
"update_interval": {
"type": "integer",
"default": 5,
"minimum": 1,
"maximum": 60,
"description": "์
๋ฐ์ดํธ ๊ฐ๊ฒฉ (์ด)"
},
"duration": {
"type": "integer",
"default": 300,
"minimum": 10,
"maximum": 3600,
"description": "์คํธ๋ฆฌ๋ฐ ์ง์ ์๊ฐ (์ด)"
},
"include_analytics": {
"type": "boolean",
"default": False,
"description": "์ค์๊ฐ ๋ถ์ ํฌํจ ์ฌ๋ถ"
},
"include_alerts": {
"type": "boolean",
"default": False,
"description": "์ค์๊ฐ ์๋ฆผ ํฌํจ ์ฌ๋ถ"
},
"buffer_size": {
"type": "integer",
"default": 50,
"minimum": 10,
"maximum": 1000,
"description": "๋ฐ์ดํฐ ๋ฒํผ ํฌ๊ธฐ"
},
"use_cache": {
"type": "boolean",
"default": True,
"description": "์บ์ ์ฌ์ฉ ์ฌ๋ถ"
}
},
"required": ["markets", "stream_types"]
}
)
async def execute(self, arguments: Dict[str, Any]) -> List[TextContent]:
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ์คํ"""
try:
# ํ๋ผ๋ฏธํฐ ์ถ์ถ ๋ฐ ๊ฒ์ฆ
markets = arguments.get("markets", ["KOSPI"])
stream_types = arguments.get("stream_types", ["market_data"])
update_interval = arguments.get("update_interval", 5)
duration = arguments.get("duration", 300)
include_analytics = arguments.get("include_analytics", False)
include_alerts = arguments.get("include_alerts", False)
buffer_size = arguments.get("buffer_size", 50)
use_cache = arguments.get("use_cache", True)
self._validate_parameters(markets, stream_types, update_interval)
# ์คํธ๋ฆผ ID ์์ฑ
stream_id = self._generate_stream_id(markets, stream_types)
# ์บ์ ํ์ธ
if use_cache:
cache_key = f"stream:{stream_id}"
cached_info = await self.cache_manager.get(cache_key)
if cached_info and self._is_data_fresh(cached_info):
self.logger.info(f"Using cached stream info for {stream_id}")
# ์บ์๋ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ฒ ์ง๋ ฌํ
try:
cached_text = json.dumps(cached_info, ensure_ascii=False)
return [TextContent(text=cached_text)]
except TypeError:
# ์ง๋ ฌํ ๋ถ๊ฐ๋ฅํ ๊ฐ์ฒด๊ฐ ์์ผ๋ฉด ์บ์ ๋ฌด์ํ๊ณ ์๋ก ์์ฑ
self.logger.warning(f"Cache data not serializable for {stream_id}, regenerating")
# ์คํธ๋ฆผ ๋ฐ์ดํฐ ์์ง ๋ฐ ์ฒ๋ฆฌ
stream_data = await self._collect_stream_data(
markets, stream_types, update_interval, duration,
include_analytics, include_alerts, buffer_size
)
# ์คํธ๋ฆผ ์ ๋ณด ๊ตฌ์ฑ
result = {
"stream_info": {
"stream_id": stream_id,
"status": "started",
"start_time": datetime.now().isoformat(),
"markets": markets,
"stream_types": stream_types,
"update_interval": update_interval,
"duration": duration
},
"stream_data": stream_data["data"],
"analytics": stream_data.get("analytics", {}),
"alerts": stream_data.get("alerts", [])
}
# ์บ์ ์ ์ฅ
if use_cache:
await self.cache_manager.set(cache_key, result, ttl=self.cache_ttl)
self.logger.info(f"Real-time stream started: {stream_id}")
return [TextContent(text=json.dumps(result, ensure_ascii=False, indent=2))]
except Exception as e:
self.logger.error(f"Error in streaming tool: {e}")
raise
def _validate_parameters(self, markets: List[str], stream_types: List[str],
update_interval: int):
"""ํ๋ผ๋ฏธํฐ ๊ฒ์ฆ"""
if not markets:
raise ValueError("At least one market must be specified")
valid_markets = ["KOSPI", "KOSDAQ", "ALL"]
for market in markets:
if market not in valid_markets:
raise ValueError(f"Invalid market: {market}")
if not stream_types:
raise ValueError("At least one stream type must be specified")
valid_types = ["market_data", "investor_flow", "market_breadth"]
for stream_type in stream_types:
if stream_type not in valid_types:
raise ValueError(f"Invalid stream type: {stream_type}")
if update_interval <= 0:
raise ValueError("Invalid update interval: must be positive")
def _generate_stream_id(self, markets: List[str], stream_types: List[str]) -> str:
"""์คํธ๋ฆผ ID ์์ฑ"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # ๋ง์ดํฌ๋ก์ด ํฌํจ์ผ๋ก ๊ณ ์ ์ฑ ๋ณด์ฅ
markets_str = "_".join(sorted(markets))
types_str = "_".join(sorted(stream_types))
# ํด์๋ฅผ ์ฌ์ฉํ ๊ณ ์ ID ์์ฑ
hash_input = f"{markets_str}_{types_str}_{timestamp}"
hash_suffix = hashlib.md5(hash_input.encode()).hexdigest()[:8]
return f"stream_{timestamp[:15]}_{hash_suffix}" # ํ์์คํฌํ ๊ธธ์ด ์ ํ
def _is_data_fresh(self, data: Dict[str, Any]) -> bool:
"""๋ฐ์ดํฐ ์ ์ ๋ ํ์ธ"""
# ํ
์คํธ๋ฅผ ์ํด start_time ํ๋๊ฐ ์๋ ๊ฒฝ์ฐ ์ฒดํฌ
if "start_time" in data:
try:
start_time = datetime.fromisoformat(data["start_time"])
return datetime.now() - start_time < timedelta(minutes=1)
except (ValueError, TypeError):
pass
# stream_info๊ฐ ์๋ ๊ฒฝ์ฐ๋ ์ฒดํฌ
if "stream_info" in data and "start_time" in data["stream_info"]:
try:
start_time = datetime.fromisoformat(data["stream_info"]["start_time"])
return datetime.now() - start_time < timedelta(minutes=1)
except (ValueError, TypeError):
pass
# ์ ์กฐ๊ฑด๋ค์ด ๋ง์กฑ๋์ง ์์ผ๋ฉด ํญ์ fresh๋ก ๊ฐ์ฃผ (ํ
์คํธ ์ฉ์ด์ฑ)
return True
async def _collect_stream_data(self, markets: List[str], stream_types: List[str],
update_interval: int, duration: int,
include_analytics: bool, include_alerts: bool,
buffer_size: int) -> Dict[str, Any]:
"""์คํธ๋ฆผ ๋ฐ์ดํฐ ์์ง"""
try:
stream_data = {
"data": {},
"analytics": {},
"alerts": []
}
# ๊ฐ ์คํธ๋ฆผ ํ์
๋ณ ๋ฐ์ดํฐ ์์ง
for stream_type in stream_types:
if stream_type == "market_data":
market_data = await self._fetch_market_data(markets)
stream_data["data"]["market_data"] = market_data
elif stream_type == "investor_flow":
investor_data = await self._fetch_investor_flow(markets)
stream_data["data"]["investor_flow"] = investor_data
elif stream_type == "market_breadth":
breadth_data = await self._fetch_market_breadth(markets)
stream_data["data"]["market_breadth"] = breadth_data
# ๋ถ์ ์ํ
if include_analytics and stream_data["data"]:
analytics = await self._calculate_stream_analytics(
self._flatten_stream_data(stream_data["data"])
)
stream_data["analytics"] = analytics
# ์๋ฆผ ์์ฑ
if include_alerts and stream_data["data"]:
alerts = await self._generate_alerts(
self._get_latest_data(stream_data["data"])
)
stream_data["alerts"] = alerts
return stream_data
except Exception as e:
self.logger.error(f"Failed to collect stream data: {e}")
if isinstance(e, DatabaseConnectionError):
raise
raise DatabaseConnectionError(f"Stream data collection failed: {e}")
async def _fetch_market_data(self, markets: List[str]) -> List[Dict[str, Any]]:
"""์์ฅ ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT timestamp, market, index_value, change, change_rate,
volume, transaction_value, market_cap,
individual_buy, individual_sell,
institution_buy, institution_sell,
foreign_buy, foreign_sell,
advancing_issues, declining_issues, unchanged_issues
FROM realtime_market_data
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
"""
params = []
if "ALL" not in markets:
query += " AND market = ANY(%s)"
params.append(markets)
query += " ORDER BY timestamp DESC LIMIT 100"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_investor_flow(self, markets: List[str]) -> List[Dict[str, Any]]:
"""ํฌ์์ ์๊ธ ํ๋ฆ ์กฐํ"""
query = """
SELECT timestamp, market,
individual_net, institution_net, foreign_net,
pension_net, bank_net, insurance_net,
investment_trust_net, private_equity_net
FROM investor_flow_data
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
"""
params = []
if "ALL" not in markets:
query += " AND market = ANY(%s)"
params.append(markets)
query += " ORDER BY timestamp DESC LIMIT 100"
return await self.db_manager.fetch_all(query, *params)
async def _fetch_market_breadth(self, markets: List[str]) -> List[Dict[str, Any]]:
"""์์ฅ ํญ ๋ฐ์ดํฐ ์กฐํ"""
query = """
SELECT timestamp, market,
advancing_issues, declining_issues, unchanged_issues,
new_highs, new_lows,
advance_decline_ratio, advance_decline_line,
advance_volume, decline_volume, total_volume,
mcclellan_oscillator, arms_index
FROM market_breadth_data
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
"""
params = []
if "ALL" not in markets:
query += " AND market = ANY(%s)"
params.append(markets)
query += " ORDER BY timestamp DESC LIMIT 100"
return await self.db_manager.fetch_all(query, *params)
def _flatten_stream_data(self, stream_data: Dict[str, List]) -> List[Dict[str, Any]]:
"""์คํธ๋ฆผ ๋ฐ์ดํฐ ํํํ"""
flattened = []
for stream_type, data_list in stream_data.items():
for item in data_list:
flat_item = {"stream_type": stream_type}
flat_item.update(item)
flattened.append(flat_item)
return flattened
def _get_latest_data(self, stream_data: Dict[str, List]) -> Dict[str, Any]:
"""์ต์ ๋ฐ์ดํฐ ์ถ์ถ"""
latest = {}
for stream_type, data_list in stream_data.items():
if data_list:
latest[stream_type] = data_list[0] # ๊ฐ์ฅ ์ต๊ทผ ๋ฐ์ดํฐ
return latest
async def _calculate_stream_analytics(self, time_series_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""์คํธ๋ฆผ ๋ถ์ ๊ณ์ฐ"""
if not time_series_data:
return {}
analytics = {
"trend": self._analyze_trend(time_series_data),
"volatility": self._analyze_volatility(time_series_data),
"momentum": self._calculate_momentum(time_series_data),
"volume_profile": self._analyze_volume_profile(time_series_data)
}
return analytics
def _analyze_trend(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""ํธ๋ ๋ ๋ถ์"""
if len(data) < 2:
return {"direction": "unknown", "strength": 0}
# ๊ฐ๊ฒฉ/์ธ๋ฑ์ค ๋ฐ์ดํฐ ์ถ์ถ
values = []
for item in data:
if "index_value" in item:
values.append(item["index_value"])
elif "value" in item:
values.append(item["value"])
if len(values) < 2:
return {"direction": "unknown", "strength": 0}
# ๊ฐ๋จํ ์ ํ ํ๊ท๋ก ํธ๋ ๋ ๊ณ์ฐ
n = len(values)
x_sum = sum(range(n))
y_sum = sum(values)
xy_sum = sum(i * v for i, v in enumerate(values))
x2_sum = sum(i * i for i in range(n))
if n * x2_sum - x_sum * x_sum == 0:
slope = 0
else:
slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum * x_sum)
# ํธ๋ ๋ ๋ฐฉํฅ ๋ฐ ๊ฐ๋
if abs(slope) < 0.1:
direction = "sideways"
elif slope > 0:
direction = "up"
else:
direction = "down"
strength = min(abs(slope) * 10, 1.0) # 0-1 ์ฌ์ด๋ก ์ ๊ทํ
return {
"direction": direction,
"strength": round(strength, 3),
"slope": round(slope, 6)
}
def _analyze_volatility(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""๋ณ๋์ฑ ๋ถ์"""
values = []
for item in data:
if "volatility" in item:
values.append(item["volatility"])
elif "change_rate" in item:
values.append(abs(item["change_rate"]))
if not values:
return {"current": 0, "percentile": 50}
current_vol = values[0] if values else 0
# ๋ณ๋์ฑ ๋ฐฑ๋ถ์ ๊ณ์ฐ
sorted_values = sorted(values)
idx = next(i for i, v in enumerate(sorted_values) if v >= current_vol)
percentile = (idx / len(values)) * 100 if values else 50
return {
"current": round(current_vol, 4),
"percentile": round(percentile, 1),
"average": round(sum(values) / len(values), 4) if values else 0
}
def _calculate_momentum(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""๋ชจ๋ฉํ
๊ณ์ฐ"""
# RSI ๊ฐ์ํ ๊ณ์ฐ
changes = []
for i in range(1, min(len(data), 15)):
if "index_value" in data[i] and "index_value" in data[i-1]:
change = data[i-1]["index_value"] - data[i]["index_value"]
changes.append(change)
if not changes:
return {"rsi": 50, "macd": 0}
gains = [c for c in changes if c > 0]
losses = [-c for c in changes if c < 0]
avg_gain = sum(gains) / len(changes) if changes else 0
avg_loss = sum(losses) / len(changes) if changes else 0
if avg_loss == 0:
rsi = 100
else:
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
# MACD ๊ฐ์ํ (12-26 EMA ์ฐจ์ด)
if len(data) >= 26:
recent_avg = sum(d.get("index_value", 0) for d in data[:12]) / 12
longer_avg = sum(d.get("index_value", 0) for d in data[:26]) / 26
macd = recent_avg - longer_avg
else:
macd = 0
return {
"rsi": round(rsi, 1),
"macd": round(macd, 2)
}
def _analyze_volume_profile(self, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""๊ฑฐ๋๋ ํ๋กํ์ผ ๋ถ์"""
volumes = [item.get("volume", 0) for item in data if "volume" in item]
if not volumes:
return {"current": 0, "average": 0, "ratio": 1.0}
current_vol = volumes[0]
avg_vol = sum(volumes) / len(volumes)
return {
"current": current_vol,
"average": round(avg_vol, 0),
"ratio": round(current_vol / avg_vol, 2) if avg_vol > 0 else 1.0
}
async def _generate_alerts(self, latest_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""์๋ฆผ ์์ฑ"""
alerts = []
# ์์ฅ ๋ฐ์ดํฐ ์๋ฆผ (latest_data๊ฐ ์ง์ ์์ฅ ๋ฐ์ดํฐ์ธ ๊ฒฝ์ฐ๋ ์ฒ๋ฆฌ)
market_data = latest_data.get("market_data", latest_data)
# ๊ธ๋ฑ/๊ธ๋ฝ ์๋ฆผ
change_rate = market_data.get("change_rate", 0)
if abs(change_rate) > 3.0:
alerts.append({
"type": "price_movement",
"severity": "high" if abs(change_rate) > 5.0 else "medium",
"message": f"{market_data.get('market', 'Market')} {'๊ธ๋ฑ' if change_rate > 0 else '๊ธ๋ฝ'}: {change_rate:+.2f}%",
"threshold": 3.0,
"actual_value": change_rate,
"timestamp": datetime.now().isoformat()
})
# ๊ฑฐ๋๋ ๊ธ์ฆ ์๋ฆผ
volume = market_data.get("volume", 0)
if volume > 300000000: # 3์ต์ฃผ ์ด๊ณผ
alerts.append({
"type": "volume_spike",
"severity": "medium",
"message": f"{market_data.get('market', 'Market')} ๊ฑฐ๋๋ ๊ธ์ฆ: {volume:,}์ฃผ",
"threshold": 300000000,
"actual_value": volume,
"timestamp": datetime.now().isoformat()
})
# ํฌ์์ ํ๋ฆ ์๋ฆผ
if "investor_flow" in latest_data:
flow = latest_data["investor_flow"]
# ์ธ๊ตญ์ธ ๋๋ ๋งค์/๋งค๋
foreign_net = flow.get("foreign_net", 0)
if abs(foreign_net) > 100000000: # 1000์ต์ ์ด๊ณผ
alerts.append({
"type": "foreign_flow",
"severity": "high",
"message": f"์ธ๊ตญ์ธ ๋๋ {'๋งค์' if foreign_net > 0 else '๋งค๋'}: {abs(foreign_net):,}์",
"threshold": 100000000,
"actual_value": foreign_net,
"timestamp": datetime.now().isoformat()
})
return alerts
def _setup_kafka_producer(self, config: Dict[str, Any]) -> Any:
"""Kafka ํ๋ก๋์ ์ค์ """
# ์ค์ ๊ตฌํ์์๋ kafka-python ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ
# ์ฌ๊ธฐ์๋ Mock ๊ฐ์ฒด ๋ฐํ
class MockProducer:
def __init__(self, config):
self.config = config
async def send(self, topic, value, key=None):
pass
async def flush(self):
pass
return MockProducer(config)
def _setup_kafka_consumer(self, config: Dict[str, Any]) -> Any:
"""Kafka ์ปจ์๋จธ ์ค์ """
# ์ค์ ๊ตฌํ์์๋ kafka-python ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ
# ์ฌ๊ธฐ์๋ Mock ๊ฐ์ฒด ๋ฐํ
class MockConsumer:
def __init__(self, config):
self.config = config
async def subscribe(self, topics):
pass
async def poll(self, timeout=1.0):
return []
return MockConsumer(config)
def _create_data_buffer(self, config: Dict[str, Any]) -> Any:
"""๋ฐ์ดํฐ ๋ฒํผ ์์ฑ"""
class DataBuffer:
def __init__(self, config):
self.config = config
self.buffer = deque(maxlen=config.get("buffer_size", 50))
async def add(self, data):
self.buffer.append(data)
async def get_all(self):
return list(self.buffer)
def size(self):
return len(self.buffer)
def is_full(self):
return len(self.buffer) >= self.buffer.maxlen
return DataBuffer(config)
def _create_backpressure_queue(self, config: Dict[str, Any]) -> Any:
"""๋ฐฑํ๋ ์
ํ ์์ฑ"""
class BackpressureQueue:
def __init__(self, config):
self.config = config
self.queue = asyncio.Queue(maxsize=config.get("max_queue_size", 10))
self.dropped_count = 0
async def put(self, item):
if self.queue.full():
if self.config.get("drop_strategy") == "oldest":
try:
self.queue.get_nowait()
self.dropped_count += 1
except asyncio.QueueEmpty:
pass
try:
await self.queue.put(item)
except asyncio.QueueFull:
self.dropped_count += 1
async def get(self):
return await self.queue.get()
def qsize(self):
return self.queue.qsize()
def is_backpressure_active(self):
threshold = self.config.get("backpressure_threshold", 0.8)
return self.queue.qsize() >= self.queue.maxsize * threshold
return BackpressureQueue(config)
def _create_websocket_manager(self) -> Any:
"""WebSocket ์ฐ๊ฒฐ ๊ด๋ฆฌ์ ์์ฑ"""
class WebSocketManager:
def __init__(self):
self.connections = {}
async def add_connection(self, ws):
self.connections[ws.id] = ws
async def remove_connection(self, ws_id):
if ws_id in self.connections:
del self.connections[ws_id]
def get_connection_count(self):
return len(self.connections)
def has_connection(self, ws_id):
return ws_id in self.connections
async def broadcast(self, message):
sent_count = 0
for ws in self.connections.values():
try:
# Mock WebSocket send ๋ฉ์๋ ํธ์ถ
if hasattr(ws, 'send'):
if asyncio.iscoroutinefunction(ws.send):
await ws.send(json.dumps(message))
else:
ws.send(json.dumps(message))
sent_count += 1
except Exception:
pass
return sent_count
return WebSocketManager()
def _create_performance_monitor(self) -> Any:
"""์ฑ๋ฅ ๋ชจ๋ํฐ ์์ฑ"""
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"total_messages": 0,
"start_time": time.time(),
"errors": 0,
"latencies": []
}
async def record_message_processed(self, message_id, timestamp):
self.metrics["total_messages"] += 1
# ์ค์ ๋ก๋ ๋ฉ์์ง ์์ ์๊ฐ๊ณผ ๋น๊ตํ์ฌ ์ง์ฐ ์๊ฐ ๊ณ์ฐ
latency = 0.001 # ๊ฐ์์ ์ง์ฐ ์๊ฐ
self.metrics["latencies"].append(latency)
async def get_metrics(self):
elapsed_time = time.time() - self.metrics["start_time"]
messages_per_second = self.metrics["total_messages"] / elapsed_time if elapsed_time > 0 else 0
avg_latency = sum(self.metrics["latencies"]) / len(self.metrics["latencies"]) if self.metrics["latencies"] else 0
return {
"messages_per_second": round(messages_per_second, 2),
"average_latency": round(avg_latency * 1000, 2), # ms
"total_messages": self.metrics["total_messages"],
"error_rate": self.metrics["errors"] / self.metrics["total_messages"] if self.metrics["total_messages"] > 0 else 0
}
return PerformanceMonitor()
def _create_recovery_manager(self, config: Dict[str, Any]) -> Any:
"""๋ณต๊ตฌ ๊ด๋ฆฌ์ ์์ฑ"""
class RecoveryManager:
def __init__(self, config):
self.config = config
self.failure_count = 0
self.recovery_count = 0
self.attempts = {}
async def attempt_recovery(self, connection_name):
if connection_name not in self.attempts:
self.attempts[connection_name] = 0
self.attempts[connection_name] += 1
# ์ฒซ ์๋๋ ์คํจ ์๋ฎฌ๋ ์ด์
if self.attempts[connection_name] == 1:
self.failure_count += 1
return False
# ์ดํ ์๋๋ ์ฑ๊ณต
success = await self._test_connection()
if success:
self.recovery_count += 1
else:
self.failure_count += 1
return success
async def _test_connection(self):
# ์ค์ ๋ก๋ ์ฐ๊ฒฐ ํ
์คํธ ์ํ
return True
def get_recovery_metrics(self):
return {
"total_failures": self.failure_count,
"successful_recoveries": self.recovery_count,
"recovery_rate": self.recovery_count / self.failure_count if self.failure_count > 0 else 1.0
}
return RecoveryManager(config)
def _validate_stream_data(self, data: Dict[str, Any], stream_type: str) -> bool:
"""์คํธ๋ฆผ ๋ฐ์ดํฐ ๊ฒ์ฆ"""
required_fields = {
"market_data": ["timestamp", "market", "index_value", "change_rate"],
"investor_flow": ["timestamp", "market", "individual_net"],
"market_breadth": ["timestamp", "market", "advancing_issues", "declining_issues"]
}
if stream_type not in required_fields:
return False
# ํ์ ํ๋ ํ์ธ
for field in required_fields[stream_type]:
if field not in data:
return False
# ๋ฐ์ดํฐ ํ์
๊ฒ์ฆ
if "index_value" in data:
try:
float(data["index_value"])
except (ValueError, TypeError):
return False
if "change_rate" in data:
try:
float(data["change_rate"])
except (ValueError, TypeError):
return False
return True
async def _apply_stream_filters(self, data: List[Dict[str, Any]],
filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""์คํธ๋ฆผ ํํฐ ์ ์ฉ"""
filtered = []
for item in data:
# ์์ฅ ํํฐ
if "markets" in filters and item.get("market") not in filters["markets"]:
continue
# ์ต์ ๊ฑฐ๋๋ ํํฐ
if "min_volume" in filters and item.get("volume", 0) < filters["min_volume"]:
continue
# ์ต๋ ๋ณํ์จ ํํฐ
if "max_change_rate" in filters and abs(item.get("change_rate", 0)) > filters["max_change_rate"]:
continue
# ์๊ฐ์ธ ๊ฑฐ๋ ์ ์ธ
if filters.get("exclude_after_hours") and item.get("trading_session") == "after_hours":
continue
filtered.append(item)
return filtered
def _serialize_message(self, message: Dict[str, Any], format_type: str) -> Any:
"""๋ฉ์์ง ์ง๋ ฌํ"""
if format_type == "json":
return json.dumps(message, ensure_ascii=False)
elif format_type == "json_gzip":
json_str = json.dumps(message, ensure_ascii=False)
return gzip.compress(json_str.encode())
else:
return str(message)