"""Real-time monitoring system for news and market data."""
import asyncio
import logging
import json
import time
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Any, Optional, AsyncIterator, Callable, Union
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
import hashlib
import statistics
from collections import defaultdict, deque
class AlertLevel(Enum):
"""Alert severity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class MonitoringError(Exception):
"""Monitoring system specific error."""
pass
@dataclass
class NewsUpdate:
"""News update data structure."""
news_id: str
title: str
content: str
timestamp: datetime
source: str
sentiment_score: float
impact_score: float
entities: Dict[str, List[str]] = field(default_factory=dict)
keywords: List[str] = field(default_factory=list)
@dataclass
class MarketEvent:
"""Market event data structure."""
event_type: str
stock_code: str
current_price: float
previous_price: float
change_percent: float
volume: int
timestamp: datetime
market_cap: Optional[float] = None
sector: Optional[str] = None
@dataclass
class SystemMetrics:
"""System performance metrics."""
cpu_usage: float
memory_usage: float
active_connections: int
processing_rate: float
error_rate: float
uptime: float
timestamp: datetime
@dataclass
class MonitoringResult:
"""Result of monitoring operation."""
update_type: str
data: Dict[str, Any]
timestamp: datetime
alerts: List[Dict[str, Any]] = field(default_factory=list)
processing_time: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AlertRule:
"""Alert rule configuration."""
rule_id: str
condition: str
alert_level: AlertLevel
message: str
enabled: bool = True
cooldown_seconds: int = 300
last_triggered: Optional[datetime] = None
class RealtimeMonitor:
"""Real-time monitoring system for news and market data."""
def __init__(self):
"""Initialize real-time monitor."""
self.logger = logging.getLogger("realtime_monitor")
self.is_running = False
# Alert rules
self.alert_rules: Dict[str, AlertRule] = {}
# Connection management
self.connections = {}
self.connection_status = {}
self.websocket_connections = []
# Data buffers
self.news_buffer = deque(maxlen=1000)
self.market_buffer = deque(maxlen=1000)
self.alert_buffer = deque(maxlen=100)
# Performance tracking
self.metrics = {
"updates_processed": 0,
"alerts_generated": 0,
"errors_count": 0,
"start_time": None,
"processing_times": deque(maxlen=1000)
}
# Rate limiting
self.rate_limits = {}
self.request_counts = defaultdict(deque)
# Configuration
self.config = {
"alert_thresholds": {
"high_impact": 0.8,
"medium_impact": 0.5,
"low_impact": 0.3
},
"rate_limits": {
"news_api": 60, # requests per minute
"market_api": 120
},
"buffer_sizes": {
"news": 1000,
"market": 1000,
"alerts": 100
}
}
# Custom metrics
self.custom_metrics = {}
# Historical data for comparison
self.historical_data = defaultdict(list)
# Market hours configuration (Korean market)
self.market_open_time = 9 # 9:00 AM KST
self.market_close_time = 15 # 3:30 PM KST (15:30)
async def start_monitoring(self) -> bool:
"""Start real-time monitoring."""
try:
self.logger.info("Starting real-time monitoring system")
# Initialize connections
if not await self._initialize_connections():
raise MonitoringError("Failed to initialize connections")
self.is_running = True
self.metrics["start_time"] = datetime.now(timezone.utc)
# Start monitoring tasks
await self._start_monitoring_tasks()
self.logger.info("Real-time monitoring system started successfully")
return True
except Exception as e:
self.logger.error(f"Failed to start monitoring: {e}")
raise MonitoringError(f"Failed to start monitoring: {e}")
async def stop_monitoring(self) -> bool:
"""Stop real-time monitoring."""
try:
self.logger.info("Stopping real-time monitoring system")
self.is_running = False
# Cleanup connections
await self._cleanup_connections()
# Save final metrics
await self._save_metrics()
self.logger.info("Real-time monitoring system stopped")
return True
except Exception as e:
self.logger.error(f"Error stopping monitoring: {e}")
return False
async def monitor_news_stream(self) -> AsyncIterator[NewsUpdate]:
"""Monitor news stream."""
try:
news_stream = await self._get_news_stream()
async for update in news_stream:
if not self.is_running:
break
# Add to buffer
self.news_buffer.append(update)
# Yield update
yield update
except Exception as e:
self.logger.error(f"Error in news stream monitoring: {e}")
raise MonitoringError(f"News stream error: {e}")
async def monitor_market_data(self) -> AsyncIterator[MarketEvent]:
"""Monitor market data stream."""
try:
market_stream = await self._get_market_stream()
async for event in market_stream:
if not self.is_running:
break
# Add to buffer
self.market_buffer.append(event)
# Yield event
yield event
except Exception as e:
self.logger.error(f"Error in market data monitoring: {e}")
raise MonitoringError(f"Market data stream error: {e}")
async def process_news_update(self, update: NewsUpdate) -> MonitoringResult:
"""Process a news update."""
if update is None:
raise MonitoringError("News update cannot be None")
start_time = time.time()
try:
# Extract data
data = {
"news_id": update.news_id,
"title": update.title,
"sentiment_score": update.sentiment_score,
"impact_score": update.impact_score,
"source": update.source,
"entities": update.entities
}
# Evaluate alerts
alerts = await self.evaluate_alert_conditions(update)
# Update metrics
self.metrics["updates_processed"] += 1
processing_time = time.time() - start_time
self.metrics["processing_times"].append(processing_time)
return MonitoringResult(
update_type="news",
data=data,
timestamp=update.timestamp,
alerts=alerts,
processing_time=processing_time
)
except Exception as e:
self.metrics["errors_count"] += 1
self.logger.error(f"Error processing news update: {e}")
raise MonitoringError(f"Failed to process news update: {e}")
async def process_market_event(self, event: MarketEvent) -> MonitoringResult:
"""Process a market event."""
if event is None:
raise MonitoringError("Market event cannot be None")
start_time = time.time()
try:
# Extract data
data = {
"stock_code": event.stock_code,
"current_price": event.current_price,
"previous_price": event.previous_price,
"change_percent": event.change_percent,
"volume": event.volume,
"event_type": event.event_type
}
# Evaluate alerts
alerts = await self.evaluate_alert_conditions(event)
# Update metrics
self.metrics["updates_processed"] += 1
processing_time = time.time() - start_time
self.metrics["processing_times"].append(processing_time)
return MonitoringResult(
update_type="market",
data=data,
timestamp=event.timestamp,
alerts=alerts,
processing_time=processing_time
)
except Exception as e:
self.metrics["errors_count"] += 1
self.logger.error(f"Error processing market event: {e}")
raise MonitoringError(f"Failed to process market event: {e}")
def add_alert_rule(self, rule_id: str, condition: str, alert_level: AlertLevel,
message: str, cooldown_seconds: int = 300):
"""Add alert rule."""
rule = AlertRule(
rule_id=rule_id,
condition=condition,
alert_level=alert_level,
message=message,
cooldown_seconds=cooldown_seconds
)
self.alert_rules[rule_id] = rule
self.logger.info(f"Added alert rule: {rule_id}")
async def evaluate_alert_conditions(self, data: Union[NewsUpdate, MarketEvent]) -> List[Dict[str, Any]]:
"""Evaluate alert conditions against data."""
alerts = []
for rule_id, rule in self.alert_rules.items():
if not rule.enabled:
continue
# Check cooldown
if (rule.last_triggered and
(datetime.now(timezone.utc) - rule.last_triggered).total_seconds() < rule.cooldown_seconds):
continue
try:
# Simple condition evaluation (would be more sophisticated in real implementation)
if await self._evaluate_condition(rule.condition, data):
alert = {
"rule_id": rule_id,
"level": rule.alert_level,
"message": rule.message,
"timestamp": datetime.now(timezone.utc),
"data": self._extract_alert_data(data)
}
alerts.append(alert)
rule.last_triggered = datetime.now(timezone.utc)
self.metrics["alerts_generated"] += 1
except Exception as e:
self.logger.error(f"Error evaluating alert rule {rule_id}: {e}")
return alerts
async def detect_volume_anomaly(self, event: MarketEvent) -> Dict[str, Any]:
"""Detect volume anomalies."""
try:
# Get historical volume data for comparison
historical_volumes = await self._get_historical_volumes(event.stock_code, days=30)
if not historical_volumes:
return {"is_anomaly": False, "anomaly_score": 0.0}
avg_volume = statistics.mean(historical_volumes)
std_volume = statistics.stdev(historical_volumes) if len(historical_volumes) > 1 else 0
if std_volume == 0:
volume_multiple = event.volume / avg_volume if avg_volume > 0 else 1.0
else:
z_score = (event.volume - avg_volume) / std_volume
volume_multiple = event.volume / avg_volume if avg_volume > 0 else 1.0
# Consider volume anomaly if more than 2 standard deviations or 3x average
is_anomaly = abs(z_score) > 2 or volume_multiple > 3
anomaly_score = min(abs(z_score) / 3, 1.0) if std_volume > 0 else min(volume_multiple / 3, 1.0)
return {
"is_anomaly": is_anomaly,
"anomaly_score": anomaly_score,
"volume_multiple": volume_multiple,
"z_score": z_score if std_volume > 0 else 0
}
except Exception as e:
self.logger.error(f"Error detecting volume anomaly: {e}")
return {"is_anomaly": False, "anomaly_score": 0.0}
async def analyze_news_market_correlation(self, news_data: Dict[str, Any],
market_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze correlation between news and market movements."""
try:
# Time difference between news and market movement
news_time = news_data.get("timestamp", datetime.now(timezone.utc))
market_time = market_data.get("timestamp", datetime.now(timezone.utc))
time_delay = abs((market_time - news_time).total_seconds()) / 60 # minutes
# Sentiment vs price change correlation
sentiment_score = news_data.get("sentiment_score", 0.5)
price_change = market_data.get("change_percent", 0.0)
# Simple correlation calculation
correlation_strength = 0.0
if sentiment_score > 0.6 and price_change > 0:
correlation_strength = min((sentiment_score - 0.5) * 2 * (price_change / 5.0), 1.0)
elif sentiment_score < 0.4 and price_change < 0:
correlation_strength = min((0.5 - sentiment_score) * 2 * abs(price_change) / 5.0, 1.0)
# Confidence based on time delay and magnitude
confidence = max(0.0, 1.0 - (time_delay / 60.0)) # Decreases with time delay
confidence *= min(abs(price_change) / 2.0, 1.0) # Increases with price change magnitude
return {
"correlation_strength": correlation_strength,
"time_delay": time_delay,
"confidence": confidence,
"sentiment_score": sentiment_score,
"price_change": price_change
}
except Exception as e:
self.logger.error(f"Error analyzing news-market correlation: {e}")
return {"correlation_strength": 0.0, "time_delay": 0.0, "confidence": 0.0}
async def detect_price_trend(self, price_history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Detect price trends in historical data."""
try:
if len(price_history) < 3:
return {"direction": "insufficient_data", "strength": 0.0, "duration": 0}
prices = [p["price"] for p in price_history]
timestamps = [p["timestamp"] for p in price_history]
# Calculate trend direction using linear regression approximation
n = len(prices)
sum_x = sum(range(n))
sum_y = sum(prices)
sum_xy = sum(i * prices[i] for i in range(n))
sum_x2 = sum(i * i for i in range(n))
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
# Determine direction
if slope > 0.01:
direction = "upward"
elif slope < -0.01:
direction = "downward"
else:
direction = "sideways"
# Calculate strength (normalized slope)
price_range = max(prices) - min(prices)
strength = abs(slope) / (price_range / n) if price_range > 0 else 0
strength = min(strength, 1.0)
# Calculate duration
duration = (timestamps[-1] - timestamps[0]).total_seconds() / 3600 # hours
return {
"direction": direction,
"strength": strength,
"duration": duration,
"slope": slope
}
except Exception as e:
self.logger.error(f"Error detecting price trend: {e}")
return {"direction": "error", "strength": 0.0, "duration": 0}
async def process_multi_asset_updates(self, updates: List[MarketEvent]) -> List[MonitoringResult]:
"""Process updates for multiple assets."""
results = []
for update in updates:
try:
result = await self.process_market_event(update)
results.append(result)
except Exception as e:
self.logger.error(f"Error processing update for {update.stock_code}: {e}")
# Continue processing other updates
return results
async def get_system_metrics(self) -> SystemMetrics:
"""Get current system performance metrics."""
try:
import psutil
# Get system metrics
cpu_usage = psutil.cpu_percent()
memory = psutil.virtual_memory()
memory_usage = memory.percent
# Calculate processing rate
processing_rate = 0.0
if self.metrics["processing_times"]:
avg_processing_time = statistics.mean(self.metrics["processing_times"])
processing_rate = 1.0 / avg_processing_time if avg_processing_time > 0 else 0.0
# Calculate error rate
total_operations = self.metrics["updates_processed"]
error_rate = (self.metrics["errors_count"] / total_operations) if total_operations > 0 else 0.0
# Calculate uptime
uptime = 0.0
if self.metrics["start_time"]:
uptime = (datetime.now(timezone.utc) - self.metrics["start_time"]).total_seconds()
return SystemMetrics(
cpu_usage=cpu_usage,
memory_usage=memory_usage,
active_connections=len(self.websocket_connections),
processing_rate=processing_rate,
error_rate=error_rate,
uptime=uptime,
timestamp=datetime.now(timezone.utc)
)
except ImportError:
# Fallback if psutil not available
return SystemMetrics(
cpu_usage=0.0,
memory_usage=0.0,
active_connections=len(self.websocket_connections),
processing_rate=0.0,
error_rate=0.0,
uptime=0.0,
timestamp=datetime.now(timezone.utc)
)
async def handle_connection_failure(self) -> bool:
"""Handle connection failures."""
try:
# Check connection health
if not self._is_connection_healthy():
self.logger.warning("Connection unhealthy, attempting reconnection")
return await self._reconnect()
return True
except Exception as e:
self.logger.error(f"Error handling connection failure: {e}")
return False
def add_to_buffer(self, data: Dict[str, Any]):
"""Add data to buffer."""
buffer_key = data.get("type", "general")
if buffer_key == "news":
self.news_buffer.append(data)
elif buffer_key == "market":
self.market_buffer.append(data)
else:
# Generic buffer
if not hasattr(self, 'generic_buffer'):
self.generic_buffer = deque(maxlen=1000)
self.generic_buffer.append(data)
def get_buffer_size(self) -> int:
"""Get total buffer size."""
total_size = len(self.news_buffer) + len(self.market_buffer)
if hasattr(self, 'generic_buffer'):
total_size += len(self.generic_buffer)
return total_size
async def flush_buffer(self) -> List[Dict[str, Any]]:
"""Flush all buffers and return data."""
flushed_data = []
# Flush news buffer
while self.news_buffer:
flushed_data.append(self.news_buffer.popleft())
# Flush market buffer
while self.market_buffer:
flushed_data.append(self.market_buffer.popleft())
# Flush generic buffer if exists
if hasattr(self, 'generic_buffer'):
while self.generic_buffer:
flushed_data.append(self.generic_buffer.popleft())
return flushed_data
def set_rate_limit(self, requests_per_second: int):
"""Set rate limit for requests."""
self.rate_limits["default"] = requests_per_second
async def rate_limited_request(self, request_id: str):
"""Make a rate-limited request."""
current_time = time.time()
requests_per_second = self.rate_limits.get("default", 10)
# Clean old requests
cutoff_time = current_time - 1.0 # 1 second window
while (self.request_counts["default"] and
self.request_counts["default"][0] < cutoff_time):
self.request_counts["default"].popleft()
# Check if we can make the request
if len(self.request_counts["default"]) >= requests_per_second:
# Wait until we can make the request
sleep_time = 1.0 / requests_per_second
await asyncio.sleep(sleep_time)
# Record the request
self.request_counts["default"].append(current_time)
# Simulate request processing
await asyncio.sleep(0.01)
async def aggregate_alerts(self, alerts: List[Dict[str, Any]],
time_window: int = 60) -> List[Dict[str, Any]]:
"""Aggregate similar alerts within time window."""
if not alerts:
return []
# Group alerts by rule_id and message
alert_groups = defaultdict(list)
for alert in alerts:
key = (alert.get("rule_id"), alert.get("message"))
alert_groups[key].append(alert)
aggregated = []
for (rule_id, message), group_alerts in alert_groups.items():
# Sort by timestamp
group_alerts.sort(key=lambda x: x.get("timestamp", datetime.min.replace(tzinfo=timezone.utc)))
# Create aggregated alert
aggregated_alert = {
"rule_id": rule_id,
"message": message,
"count": len(group_alerts),
"first_occurrence": group_alerts[0].get("timestamp"),
"last_occurrence": group_alerts[-1].get("timestamp"),
"level": group_alerts[0].get("level")
}
aggregated.append(aggregated_alert)
return aggregated
async def compare_with_historical(self, current_metrics: Dict[str, Any],
lookback_days: int = 30) -> Dict[str, Any]:
"""Compare current metrics with historical data."""
comparison = {}
for metric_name, current_value in current_metrics.items():
historical_values = self.historical_data.get(metric_name, [])
if not historical_values:
comparison[f"{metric_name}_percentile"] = 50.0 # Default to median
continue
# Calculate percentile
historical_values_sorted = sorted(historical_values)
percentile = self._calculate_percentile(current_value, historical_values_sorted)
comparison[f"{metric_name}_percentile"] = percentile
return comparison
async def prioritize_events(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Prioritize events based on importance."""
prioritized = []
for event in events:
priority_score = 0.0
# Calculate priority based on event type and attributes
if event.get("type") == "news":
priority_score += event.get("impact_score", 0.0) * 0.4
if "삼성전자" in str(event.get("entities", [])):
priority_score += 0.2
elif event.get("type") == "price":
change_percent = abs(event.get("change_percent", 0.0))
priority_score += min(change_percent / 5.0, 1.0) * 0.5
elif event.get("type") == "volume":
volume_spike = event.get("volume_spike", 1.0)
priority_score += min(volume_spike / 3.0, 1.0) * 0.3
event["priority_score"] = priority_score
prioritized.append(event)
# Sort by priority score (highest first)
prioritized.sort(key=lambda x: x["priority_score"], reverse=True)
return prioritized
async def get_dashboard_data(self) -> Dict[str, Any]:
"""Get data for real-time dashboard."""
dashboard_data = {
"active_alerts": len(self.alert_buffer),
"recent_updates": {
"news": len(self.news_buffer),
"market": len(self.market_buffer)
},
"system_status": {
"is_running": self.is_running,
"uptime": self._get_uptime(),
"error_rate": self._get_error_rate()
},
"performance_metrics": {
"updates_processed": self.metrics["updates_processed"],
"alerts_generated": self.metrics["alerts_generated"],
"avg_processing_time": self._get_avg_processing_time()
}
}
return dashboard_data
async def broadcast_update(self, update_data: Dict[str, Any]):
"""Broadcast update to WebSocket connections."""
if not self.websocket_connections:
return
message = json.dumps(update_data)
# Send to all active connections
for ws in self.websocket_connections[:]: # Copy list to avoid modification issues
try:
await ws.send(message)
except Exception as e:
self.logger.error(f"Error broadcasting to WebSocket: {e}")
# Remove failed connection
self.websocket_connections.remove(ws)
def add_custom_metric(self, name: str, calculation_func: Callable):
"""Add custom metric calculation."""
self.custom_metrics[name] = calculation_func
async def calculate_custom_metric(self, name: str, data_points: List[Dict[str, Any]]) -> float:
"""Calculate custom metric value."""
if name not in self.custom_metrics:
raise MonitoringError(f"Custom metric '{name}' not found")
try:
return self.custom_metrics[name](data_points)
except Exception as e:
self.logger.error(f"Error calculating custom metric {name}: {e}")
return 0.0
def is_market_open(self, timestamp: datetime) -> bool:
"""Check if market is open at given timestamp."""
# Convert to KST (UTC+9)
kst_time = timestamp + timedelta(hours=9)
hour = kst_time.hour
minute = kst_time.minute
# Check if weekday (Monday=0, Sunday=6)
if kst_time.weekday() >= 5: # Saturday or Sunday
return False
# Check time range (9:00 AM to 3:30 PM KST)
if hour < self.market_open_time or hour > self.market_close_time:
return False
if hour == self.market_close_time and minute > 30:
return False
return True
async def analyze_sentiment_trend(self, sentiment_history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze sentiment trends over time."""
if not sentiment_history:
return {"current_sentiment": 0.5, "trend_direction": "stable", "volatility": 0.0}
sentiments = [item["sentiment"] for item in sentiment_history]
current_sentiment = sentiments[-1] if sentiments else 0.5
# Calculate trend
if len(sentiments) >= 3:
recent_avg = statistics.mean(sentiments[-3:])
early_avg = statistics.mean(sentiments[:3])
if recent_avg > early_avg + 0.1:
trend_direction = "improving"
elif recent_avg < early_avg - 0.1:
trend_direction = "declining"
else:
trend_direction = "stable"
else:
trend_direction = "insufficient_data"
# Calculate volatility
volatility = statistics.stdev(sentiments) if len(sentiments) > 1 else 0.0
return {
"current_sentiment": current_sentiment,
"trend_direction": trend_direction,
"volatility": volatility
}
async def detect_anomaly_ml(self, features: Dict[str, float]) -> Dict[str, Any]:
"""ML-based anomaly detection."""
# Simple rule-based anomaly detection (placeholder for ML model)
anomaly_score = 0.0
contributing_factors = []
# Price change anomaly
price_change = abs(features.get("price_change", 0.0))
if price_change > 5.0:
anomaly_score += 0.3
contributing_factors.append("high_price_change")
# Volume anomaly
volume_ratio = features.get("volume_ratio", 1.0)
if volume_ratio > 3.0:
anomaly_score += 0.3
contributing_factors.append("high_volume")
# Sentiment anomaly
sentiment_score = features.get("sentiment_score", 0.5)
if sentiment_score > 0.9 or sentiment_score < 0.1:
anomaly_score += 0.2
contributing_factors.append("extreme_sentiment")
# Time of day anomaly (trading outside normal hours)
time_of_day = features.get("time_of_day", 12)
if time_of_day < 9 or time_of_day > 15:
anomaly_score += 0.2
contributing_factors.append("off_hours_trading")
is_anomaly = anomaly_score > 0.5
return {
"is_anomaly": is_anomaly,
"anomaly_score": min(anomaly_score, 1.0),
"contributing_factors": contributing_factors
}
async def cross_validate_data(self, source_data: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
"""Cross-validate data from multiple sources."""
if not source_data:
return {"consensus_price": 0.0, "data_quality_score": 0.0, "outlier_sources": []}
# Extract prices from sources
prices = []
sources = []
for source, data in source_data.items():
if "price" in data:
prices.append(data["price"])
sources.append(source)
if not prices:
return {"consensus_price": 0.0, "data_quality_score": 0.0, "outlier_sources": []}
# Calculate consensus price (median)
consensus_price = statistics.median(prices)
# Identify outliers
outlier_sources = []
if len(prices) > 2:
price_std = statistics.stdev(prices)
for i, price in enumerate(prices):
if abs(price - consensus_price) > 2 * price_std:
outlier_sources.append(sources[i])
# Calculate data quality score
price_range = max(prices) - min(prices)
data_quality_score = max(0.0, 1.0 - (price_range / consensus_price)) if consensus_price > 0 else 0.0
return {
"consensus_price": consensus_price,
"data_quality_score": data_quality_score,
"outlier_sources": outlier_sources
}
async def benchmark_processing(self, test_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Benchmark processing performance."""
start_time = time.time()
processed_count = 0
for data in test_data:
# Simulate processing
await asyncio.sleep(0.001) # 1ms per item
processed_count += 1
end_time = time.time()
total_time = end_time - start_time
processing_rate = processed_count / total_time if total_time > 0 else 0
return {
"processed_count": processed_count,
"total_time": total_time,
"processing_rate": processing_rate
}
async def should_escalate_alert(self, alert: Dict[str, Any]) -> bool:
"""Check if alert should be escalated."""
# Check if alert has been active for too long
created_at = alert.get("created_at", datetime.now(timezone.utc))
age_minutes = (datetime.now(timezone.utc) - created_at).total_seconds() / 60
# Escalate medium alerts after 30 minutes
if alert.get("level") == AlertLevel.MEDIUM and age_minutes > 30:
return True
# Escalate low alerts after 60 minutes
if alert.get("level") == AlertLevel.LOW and age_minutes > 60:
return True
return False
async def escalate_alert(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""Escalate alert to higher severity."""
escalated_alert = alert.copy()
# Increase alert level
current_level = alert.get("level", AlertLevel.LOW)
if current_level == AlertLevel.LOW:
escalated_alert["level"] = AlertLevel.MEDIUM
elif current_level == AlertLevel.MEDIUM:
escalated_alert["level"] = AlertLevel.HIGH
elif current_level == AlertLevel.HIGH:
escalated_alert["level"] = AlertLevel.CRITICAL
escalated_alert["escalated"] = True
escalated_alert["escalated_at"] = datetime.now(timezone.utc)
return escalated_alert
def add_monitoring_data(self, data: Dict[str, Any]):
"""Add data for monitoring and retention."""
if not hasattr(self, 'monitoring_data'):
self.monitoring_data = []
self.monitoring_data.append(data)
async def apply_retention_policy(self, retention_days: int = 30) -> int:
"""Apply data retention policy and cleanup old data."""
if not hasattr(self, 'monitoring_data'):
return 0
cutoff_date = datetime.now(timezone.utc) - timedelta(days=retention_days)
initial_count = len(self.monitoring_data)
# Filter out old data
self.monitoring_data = [
data for data in self.monitoring_data
if data.get("timestamp", datetime.now(timezone.utc)) > cutoff_date
]
cleaned_count = initial_count - len(self.monitoring_data)
return cleaned_count
async def get_monitoring_statistics(self) -> Dict[str, Any]:
"""Get monitoring system statistics."""
uptime = self._get_uptime()
stats = {
"uptime": uptime,
"total_updates_processed": self.metrics["updates_processed"],
"alerts_generated": self.metrics["alerts_generated"],
"average_processing_time": self._get_avg_processing_time(),
"error_rate": self._get_error_rate(),
"active_alert_rules": len([r for r in self.alert_rules.values() if r.enabled]),
"buffer_utilization": {
"news": len(self.news_buffer),
"market": len(self.market_buffer),
"alerts": len(self.alert_buffer)
}
}
return stats
async def reload_configuration(self, new_config: Dict[str, Any]) -> bool:
"""Hot reload configuration."""
try:
# Update configuration
self.config.update(new_config)
# Apply new rate limits if specified
if "rate_limits" in new_config:
self.rate_limits.update(new_config["rate_limits"])
self.logger.info("Configuration reloaded successfully")
return True
except Exception as e:
self.logger.error(f"Error reloading configuration: {e}")
return False
def get_config(self, key: str) -> Any:
"""Get configuration value by key (supports dot notation)."""
keys = key.split(".")
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return None
return value
async def process_with_fallback(self, data: Any) -> Any:
"""Process data with fallback handling."""
try:
# Attempt main processing
return await self._external_api_call(data)
except Exception as e:
self.logger.warning(f"Main processing failed, using fallback: {e}")
# Return fallback result
return {"status": "fallback", "data": data}
# Private helper methods
async def _initialize_connections(self) -> bool:
"""Initialize connections to data sources."""
# Mock implementation
self.connections["news_api"] = "connected"
self.connections["market_api"] = "connected"
return True
async def _cleanup_connections(self):
"""Cleanup connections."""
self.connections.clear()
self.websocket_connections.clear()
async def _start_monitoring_tasks(self):
"""Start background monitoring tasks."""
# In real implementation, would start asyncio tasks
pass
async def _save_metrics(self):
"""Save final metrics."""
# In real implementation, would save to database or file
pass
async def _get_news_stream(self):
"""Get news stream (mock implementation)."""
# Mock async generator that yields finite items
class MockNewsStream:
def __init__(self):
self.count = 0
self.max_items = 1 # Limit for testing
async def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.max_items:
raise StopAsyncIteration
await asyncio.sleep(0.01) # Shorter delay for testing
self.count += 1
return NewsUpdate(
news_id="test",
title="Test News",
content="Test Content",
timestamp=datetime.now(timezone.utc),
source="test",
sentiment_score=0.7,
impact_score=0.6
)
return MockNewsStream()
async def _get_market_stream(self):
"""Get market stream (mock implementation)."""
class MockMarketStream:
def __init__(self):
self.count = 0
self.max_items = 1 # Limit for testing
async def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.max_items:
raise StopAsyncIteration
await asyncio.sleep(0.01) # Shorter delay for testing
self.count += 1
return MarketEvent(
event_type="price_update",
stock_code="005930",
current_price=75000,
previous_price=74000,
change_percent=1.35,
volume=1000000,
timestamp=datetime.now(timezone.utc)
)
return MockMarketStream()
async def _evaluate_condition(self, condition: str, data: Union[NewsUpdate, MarketEvent]) -> bool:
"""Evaluate alert condition."""
# Simple condition evaluation
try:
if isinstance(data, NewsUpdate):
namespace = {
"impact_score": data.impact_score,
"sentiment_score": data.sentiment_score
}
elif isinstance(data, MarketEvent):
namespace = {
"change_percent": abs(data.change_percent),
"volume": data.volume
}
else:
return False
# Safe evaluation of simple conditions
condition = condition.replace("abs(change_percent)", str(namespace.get("change_percent", 0)))
condition = condition.replace("impact_score", str(namespace.get("impact_score", 0)))
condition = condition.replace("sentiment_score", str(namespace.get("sentiment_score", 0)))
return eval(condition)
except Exception as e:
self.logger.error(f"Error evaluating condition '{condition}': {e}")
return False
def _extract_alert_data(self, data: Union[NewsUpdate, MarketEvent]) -> Dict[str, Any]:
"""Extract relevant data for alert."""
if isinstance(data, NewsUpdate):
return {
"news_id": data.news_id,
"title": data.title,
"sentiment_score": data.sentiment_score,
"impact_score": data.impact_score
}
elif isinstance(data, MarketEvent):
return {
"stock_code": data.stock_code,
"change_percent": data.change_percent,
"volume": data.volume,
"current_price": data.current_price
}
else:
return {}
async def _get_historical_volumes(self, stock_code: str, days: int = 30) -> List[float]:
"""Get historical volume data."""
# Mock implementation - would fetch from database
import random
return [random.randint(500000, 2000000) for _ in range(days)]
def _is_connection_healthy(self) -> bool:
"""Check if connections are healthy."""
return len(self.connections) > 0 and all(
status == "connected" for status in self.connections.values()
)
async def _reconnect(self) -> bool:
"""Attempt to reconnect."""
# Mock reconnection
await asyncio.sleep(1)
return await self._initialize_connections()
def _calculate_percentile(self, value: float, sorted_values: List[float]) -> float:
"""Calculate percentile of value in sorted list."""
if not sorted_values:
return 50.0
count_below = sum(1 for v in sorted_values if v < value)
percentile = (count_below / len(sorted_values)) * 100
return percentile
def _get_uptime(self) -> float:
"""Get system uptime in seconds."""
if self.metrics["start_time"]:
return (datetime.now(timezone.utc) - self.metrics["start_time"]).total_seconds()
return 0.0
def _get_error_rate(self) -> float:
"""Get current error rate."""
total_ops = self.metrics["updates_processed"]
return (self.metrics["errors_count"] / total_ops) if total_ops > 0 else 0.0
def _get_avg_processing_time(self) -> float:
"""Get average processing time."""
if self.metrics["processing_times"]:
return statistics.mean(self.metrics["processing_times"])
return 0.0
async def _external_api_call(self, data: Any) -> Any:
"""Mock external API call."""
# Simulate API call
await asyncio.sleep(0.1)
return {"processed": True, "data": data}