"""Tests for real-time monitoring system."""
import pytest
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from datetime import datetime, timezone, timedelta
import asyncio
from typing import Dict, List, Any
from src.monitoring.realtime_monitor import (
RealtimeMonitor,
MonitoringResult,
AlertLevel,
MonitoringError,
NewsUpdate,
MarketEvent,
SystemMetrics
)
class TestRealtimeMonitor:
"""Test cases for RealtimeMonitor."""
@pytest.fixture
def monitor(self):
"""Create RealtimeMonitor instance for testing."""
return RealtimeMonitor()
@pytest.fixture
def sample_news_update(self):
"""Sample news update for testing."""
return NewsUpdate(
news_id="news_123",
title="삼성전자 실적 발표",
content="삼성전자가 3분기 실적을 발표했습니다.",
timestamp=datetime.now(timezone.utc),
source="test_source",
sentiment_score=0.8,
impact_score=0.7
)
@pytest.fixture
def sample_market_event(self):
"""Sample market event for testing."""
return MarketEvent(
event_type="price_change",
stock_code="005930",
current_price=75000,
previous_price=74000,
change_percent=1.35,
volume=1000000,
timestamp=datetime.now(timezone.utc)
)
def test_realtime_monitor_initialization(self, monitor):
"""Test RealtimeMonitor initialization."""
assert monitor is not None
assert hasattr(monitor, 'start_monitoring')
assert hasattr(monitor, 'stop_monitoring')
assert hasattr(monitor, 'add_alert_rule')
@pytest.mark.asyncio
async def test_start_monitoring(self, monitor):
"""Test starting real-time monitoring."""
# Mock dependencies
with patch.object(monitor, '_initialize_connections') as mock_init:
mock_init.return_value = True
result = await monitor.start_monitoring()
assert result is True
assert monitor.is_running is True
mock_init.assert_called_once()
@pytest.mark.asyncio
async def test_stop_monitoring(self, monitor):
"""Test stopping real-time monitoring."""
# Start monitoring first
monitor.is_running = True
with patch.object(monitor, '_cleanup_connections') as mock_cleanup:
result = await monitor.stop_monitoring()
assert result is True
assert monitor.is_running is False
mock_cleanup.assert_called_once()
@pytest.mark.asyncio
async def test_monitor_news_stream(self, monitor, sample_news_update):
"""Test monitoring news stream."""
# Set monitor as running
monitor.is_running = True
# Create a proper mock async iterator
async def mock_stream():
yield sample_news_update
with patch.object(monitor, '_get_news_stream') as mock_get_stream:
mock_get_stream.return_value = mock_stream()
results = []
async for update in monitor.monitor_news_stream():
results.append(update)
break # Stop after first update
assert len(results) == 1
assert isinstance(results[0], NewsUpdate)
@pytest.mark.asyncio
async def test_monitor_market_data(self, monitor, sample_market_event):
"""Test monitoring market data stream."""
# Set monitor as running
monitor.is_running = True
# Create a proper mock async iterator
async def mock_stream():
yield sample_market_event
with patch.object(monitor, '_get_market_stream') as mock_get_stream:
mock_get_stream.return_value = mock_stream()
results = []
async for event in monitor.monitor_market_data():
results.append(event)
break # Stop after first event
assert len(results) == 1
assert isinstance(results[0], MarketEvent)
@pytest.mark.asyncio
async def test_process_news_update(self, monitor, sample_news_update):
"""Test processing news updates."""
result = await monitor.process_news_update(sample_news_update)
assert isinstance(result, MonitoringResult)
assert result.update_type == "news"
assert result.timestamp is not None
assert result.data["news_id"] == "news_123"
@pytest.mark.asyncio
async def test_process_market_event(self, monitor, sample_market_event):
"""Test processing market events."""
result = await monitor.process_market_event(sample_market_event)
assert isinstance(result, MonitoringResult)
assert result.update_type == "market"
assert result.data["stock_code"] == "005930"
assert result.data["change_percent"] == 1.35
@pytest.mark.asyncio
async def test_evaluate_alert_conditions(self, monitor, sample_news_update):
"""Test evaluating alert conditions."""
# Add alert rule
monitor.add_alert_rule(
rule_id="high_impact",
condition="impact_score > 0.8",
alert_level=AlertLevel.HIGH,
message="High impact news detected"
)
alerts = await monitor.evaluate_alert_conditions(sample_news_update)
# Since sample has impact_score=0.7, should not trigger high impact alert
high_impact_alerts = [a for a in alerts if a.rule_id == "high_impact"]
assert len(high_impact_alerts) == 0
@pytest.mark.asyncio
async def test_sentiment_threshold_alerts(self, monitor, sample_news_update):
"""Test sentiment-based alert thresholds."""
# Add sentiment alert rule
monitor.add_alert_rule(
rule_id="positive_sentiment",
condition="sentiment_score > 0.7",
alert_level=AlertLevel.MEDIUM,
message="Positive sentiment detected"
)
alerts = await monitor.evaluate_alert_conditions(sample_news_update)
# Sample has sentiment_score=0.8, should trigger alert
sentiment_alerts = [a for a in alerts if a.get("rule_id") == "positive_sentiment"]
assert len(sentiment_alerts) == 1
assert sentiment_alerts[0]["level"] == AlertLevel.MEDIUM
@pytest.mark.asyncio
async def test_price_movement_alerts(self, monitor, sample_market_event):
"""Test price movement alert detection."""
# Add price movement alert
monitor.add_alert_rule(
rule_id="price_spike",
condition="abs(change_percent) > 2.0",
alert_level=AlertLevel.HIGH,
message="Significant price movement"
)
alerts = await monitor.evaluate_alert_conditions(sample_market_event)
# Sample has change_percent=1.35, should not trigger
spike_alerts = [a for a in alerts if a.get("rule_id") == "price_spike"]
assert len(spike_alerts) == 0
@pytest.mark.asyncio
async def test_volume_anomaly_detection(self, monitor):
"""Test volume anomaly detection."""
# Create high volume event
high_volume_event = MarketEvent(
event_type="volume_spike",
stock_code="005930",
current_price=75000,
previous_price=74000,
change_percent=1.0,
volume=5000000, # 5x normal volume
timestamp=datetime.now(timezone.utc)
)
anomaly = await monitor.detect_volume_anomaly(high_volume_event)
assert anomaly["is_anomaly"] is True
assert anomaly["anomaly_score"] > 0.5
assert "volume_multiple" in anomaly
@pytest.mark.asyncio
async def test_correlation_monitoring(self, monitor):
"""Test monitoring correlations between news and market movements."""
news_data = {
"sentiment_score": 0.8,
"entities": {"companies": ["삼성전자"]},
"timestamp": datetime.now(timezone.utc)
}
market_data = {
"stock_code": "005930",
"change_percent": 2.5,
"timestamp": datetime.now(timezone.utc)
}
correlation = await monitor.analyze_news_market_correlation(news_data, market_data)
assert "correlation_strength" in correlation
assert "time_delay" in correlation
assert "confidence" in correlation
@pytest.mark.asyncio
async def test_trend_detection(self, monitor):
"""Test trend detection in real-time data."""
# Mock historical data
price_history = [
{"price": 70000, "timestamp": datetime.now(timezone.utc) - timedelta(hours=5)},
{"price": 72000, "timestamp": datetime.now(timezone.utc) - timedelta(hours=4)},
{"price": 74000, "timestamp": datetime.now(timezone.utc) - timedelta(hours=3)},
{"price": 75000, "timestamp": datetime.now(timezone.utc) - timedelta(hours=2)},
{"price": 76000, "timestamp": datetime.now(timezone.utc) - timedelta(hours=1)}
]
trend = await monitor.detect_price_trend(price_history)
assert trend["direction"] in ["upward", "downward", "sideways"]
assert "strength" in trend
assert "duration" in trend
@pytest.mark.asyncio
async def test_multi_asset_monitoring(self, monitor):
"""Test monitoring multiple assets simultaneously."""
assets = ["005930", "000660", "035420"] # Samsung, SK Hynix, Naver
# Mock market data for multiple assets
market_updates = []
for asset in assets:
update = MarketEvent(
event_type="price_update",
stock_code=asset,
current_price=50000,
previous_price=49000,
change_percent=2.04,
volume=1000000,
timestamp=datetime.now(timezone.utc)
)
market_updates.append(update)
results = await monitor.process_multi_asset_updates(market_updates)
assert len(results) == 3
for result in results:
assert isinstance(result, MonitoringResult)
@pytest.mark.asyncio
async def test_system_health_monitoring(self, monitor):
"""Test system health and performance monitoring."""
metrics = await monitor.get_system_metrics()
assert isinstance(metrics, SystemMetrics)
assert hasattr(metrics, 'cpu_usage')
assert hasattr(metrics, 'memory_usage')
assert hasattr(metrics, 'active_connections')
assert hasattr(metrics, 'processing_rate')
@pytest.mark.asyncio
async def test_connection_resilience(self, monitor):
"""Test connection resilience and reconnection."""
# Simulate connection failure
with patch.object(monitor, '_is_connection_healthy') as mock_health:
mock_health.return_value = False
# Should attempt reconnection
with patch.object(monitor, '_reconnect') as mock_reconnect:
mock_reconnect.return_value = True
result = await monitor.handle_connection_failure()
assert result is True
mock_reconnect.assert_called_once()
@pytest.mark.asyncio
async def test_data_buffering(self, monitor):
"""Test data buffering during connection issues."""
# Add some data to buffer
sample_data = {"test": "data", "timestamp": datetime.now(timezone.utc)}
monitor.add_to_buffer(sample_data)
buffer_size = monitor.get_buffer_size()
assert buffer_size > 0
# Test buffer flush
flushed_data = await monitor.flush_buffer()
assert len(flushed_data) == 1
assert flushed_data[0]["test"] == "data"
@pytest.mark.asyncio
async def test_rate_limiting(self, monitor):
"""Test rate limiting for API calls."""
# Set rate limit
monitor.set_rate_limit(requests_per_second=10)
# Test rate limit enforcement
start_time = datetime.now()
for i in range(15):
await monitor.rate_limited_request(f"test_request_{i}")
end_time = datetime.now()
# Should take at least some time due to rate limiting (relaxed for testing)
duration = (end_time - start_time).total_seconds()
assert duration >= 0.5 # Relaxed timing assertion
@pytest.mark.asyncio
async def test_alert_aggregation(self, monitor):
"""Test alert aggregation and deduplication."""
# Create multiple similar alerts (same message for aggregation)
alerts = []
for i in range(5):
alert_data = {
"rule_id": "price_alert",
"level": AlertLevel.MEDIUM,
"message": "Price alert", # Same message for aggregation
"timestamp": datetime.now(timezone.utc)
}
alerts.append(alert_data)
aggregated = await monitor.aggregate_alerts(alerts, time_window=60)
# Should aggregate similar alerts (same rule_id and message)
assert len(aggregated) == 1
assert aggregated[0]["count"] == 5
@pytest.mark.asyncio
async def test_historical_comparison(self, monitor):
"""Test comparing current data with historical patterns."""
current_metrics = {
"volume": 2000000,
"price_change": 3.5,
"sentiment_score": 0.8
}
comparison = await monitor.compare_with_historical(
current_metrics,
lookback_days=30
)
assert "volume_percentile" in comparison
assert "price_change_percentile" in comparison
assert "sentiment_score_percentile" in comparison
@pytest.mark.asyncio
async def test_event_prioritization(self, monitor):
"""Test event prioritization based on importance."""
events = [
{"type": "news", "impact_score": 0.9, "entities": ["삼성전자"]},
{"type": "price", "change_percent": 5.0, "stock_code": "005930"},
{"type": "volume", "volume_spike": 3.0, "stock_code": "005930"},
{"type": "news", "impact_score": 0.3, "entities": ["기타회사"]}
]
prioritized = await monitor.prioritize_events(events)
# Should be sorted by importance
assert len(prioritized) == 4
assert prioritized[0]["priority_score"] >= prioritized[-1]["priority_score"]
@pytest.mark.asyncio
async def test_real_time_dashboard_data(self, monitor):
"""Test generating real-time dashboard data."""
dashboard_data = await monitor.get_dashboard_data()
assert "active_alerts" in dashboard_data
assert "recent_updates" in dashboard_data
assert "system_status" in dashboard_data
assert "performance_metrics" in dashboard_data
@pytest.mark.asyncio
async def test_websocket_broadcasting(self, monitor):
"""Test WebSocket broadcasting of updates."""
# Mock WebSocket connections
mock_websockets = [Mock() for _ in range(3)]
monitor.websocket_connections = mock_websockets
update_data = {"type": "price_update", "data": {"stock": "005930"}}
await monitor.broadcast_update(update_data)
# Should broadcast to all connections
for ws in mock_websockets:
ws.send.assert_called_once()
@pytest.mark.asyncio
async def test_custom_metric_tracking(self, monitor):
"""Test tracking custom metrics."""
# Add custom metric
monitor.add_custom_metric(
name="sentiment_momentum",
calculation_func=lambda data: sum(d.get("sentiment", 0) for d in data) / len(data)
)
# Provide data for calculation
data_points = [
{"sentiment": 0.8, "timestamp": datetime.now(timezone.utc)},
{"sentiment": 0.6, "timestamp": datetime.now(timezone.utc)},
{"sentiment": 0.9, "timestamp": datetime.now(timezone.utc)}
]
metric_value = await monitor.calculate_custom_metric("sentiment_momentum", data_points)
assert isinstance(metric_value, float)
assert 0.0 <= metric_value <= 1.0
@pytest.mark.asyncio
async def test_market_hours_detection(self, monitor):
"""Test market hours detection and handling."""
# Test during market hours
market_time = datetime.now(timezone.utc).replace(hour=1, minute=30) # 9:30 AM KST
is_market_open = monitor.is_market_open(market_time)
# Test outside market hours
after_hours = datetime.now(timezone.utc).replace(hour=9, minute=0) # 6:00 PM KST
is_after_hours = monitor.is_market_open(after_hours)
# Assertions depend on market schedule logic
assert isinstance(is_market_open, bool)
assert isinstance(is_after_hours, bool)
@pytest.mark.asyncio
async def test_news_sentiment_tracking(self, monitor):
"""Test tracking news sentiment over time."""
sentiment_history = [
{"timestamp": datetime.now(timezone.utc) - timedelta(hours=3), "sentiment": 0.7},
{"timestamp": datetime.now(timezone.utc) - timedelta(hours=2), "sentiment": 0.8},
{"timestamp": datetime.now(timezone.utc) - timedelta(hours=1), "sentiment": 0.6},
{"timestamp": datetime.now(timezone.utc), "sentiment": 0.9}
]
sentiment_trend = await monitor.analyze_sentiment_trend(sentiment_history)
assert "current_sentiment" in sentiment_trend
assert "trend_direction" in sentiment_trend
assert "volatility" in sentiment_trend
@pytest.mark.asyncio
async def test_anomaly_detection_ml(self, monitor):
"""Test ML-based anomaly detection."""
# Mock feature data
features = {
"price_change": 2.5,
"volume_ratio": 1.8,
"sentiment_score": 0.7,
"news_count": 5,
"time_of_day": 14 # 2 PM
}
anomaly_result = await monitor.detect_anomaly_ml(features)
assert "is_anomaly" in anomaly_result
assert "anomaly_score" in anomaly_result
assert "contributing_factors" in anomaly_result
@pytest.mark.asyncio
async def test_cross_validation_monitoring(self, monitor):
"""Test cross-validation of data from multiple sources."""
# Data from different sources
source_data = {
"source_a": {"price": 75000, "volume": 1000000},
"source_b": {"price": 75100, "volume": 1020000},
"source_c": {"price": 74950, "volume": 980000}
}
validation_result = await monitor.cross_validate_data(source_data)
assert "consensus_price" in validation_result
assert "data_quality_score" in validation_result
assert "outlier_sources" in validation_result
@pytest.mark.asyncio
async def test_performance_benchmarking(self, monitor):
"""Test performance benchmarking of monitoring system."""
# Simulate processing load
test_data = [{"id": i, "data": f"test_{i}"} for i in range(1000)]
import time
start_time = time.time()
results = await monitor.benchmark_processing(test_data)
end_time = time.time()
processing_time = end_time - start_time
assert results["processed_count"] == 1000
assert results["processing_rate"] > 0
assert processing_time < 5.0 # Should process quickly
@pytest.mark.asyncio
async def test_alert_escalation(self, monitor):
"""Test alert escalation based on severity and time."""
# Create initial alert
alert = {
"id": "alert_123",
"level": AlertLevel.MEDIUM,
"message": "Price movement detected",
"created_at": datetime.now(timezone.utc) - timedelta(minutes=30),
"escalated": False
}
# Check if should escalate
should_escalate = await monitor.should_escalate_alert(alert)
if should_escalate:
escalated_alert = await monitor.escalate_alert(alert)
assert escalated_alert["level"] == AlertLevel.HIGH
assert escalated_alert["escalated"] is True
@pytest.mark.asyncio
async def test_data_retention_policy(self, monitor):
"""Test data retention and cleanup policies."""
# Add old data
old_timestamp = datetime.now(timezone.utc) - timedelta(days=31)
old_data = {"timestamp": old_timestamp, "data": "old_data"}
monitor.add_monitoring_data(old_data)
# Apply retention policy
cleaned_count = await monitor.apply_retention_policy(retention_days=30)
assert cleaned_count >= 0 # Should clean up old data
@pytest.mark.asyncio
async def test_monitoring_statistics(self, monitor):
"""Test monitoring system statistics collection."""
stats = await monitor.get_monitoring_statistics()
assert "uptime" in stats
assert "total_updates_processed" in stats
assert "alerts_generated" in stats
assert "average_processing_time" in stats
assert "error_rate" in stats
@pytest.mark.asyncio
async def test_config_hot_reload(self, monitor):
"""Test hot reloading of monitoring configuration."""
# Update configuration
new_config = {
"alert_thresholds": {
"high_impact": 0.9,
"medium_impact": 0.6,
"low_impact": 0.3
},
"rate_limits": {
"news_api": 100,
"market_api": 200
}
}
result = await monitor.reload_configuration(new_config)
assert result is True
assert monitor.get_config("alert_thresholds.high_impact") == 0.9
@pytest.mark.asyncio
async def test_error_handling(self, monitor):
"""Test error handling in monitoring system."""
# Test with invalid data
with pytest.raises(MonitoringError):
await monitor.process_news_update(None)
# Test graceful degradation
with patch.object(monitor, '_external_api_call') as mock_api:
mock_api.side_effect = Exception("API Error")
# Should handle error gracefully
result = await monitor.process_with_fallback("test_data")
assert result is not None # Should return fallback result
def test_monitoring_result_dataclass(self):
"""Test MonitoringResult dataclass."""
result = MonitoringResult(
update_type="news",
data={"news_id": "123", "sentiment": 0.8},
timestamp=datetime.now(timezone.utc),
alerts=[],
processing_time=0.05
)
assert result.update_type == "news"
assert result.data["news_id"] == "123"
assert isinstance(result.timestamp, datetime)