"""Tests for MCP server handlers."""
import pytest
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Any
import json
from src.server.mcp_handlers import (
MCPHandler,
NewsHandler,
AnalysisHandler,
MonitoringHandler,
ToolHandler,
MCPError,
MCPRequest,
MCPResponse,
NewsQuery,
AnalysisRequest,
MonitoringRequest
)
class TestMCPHandler:
"""Test cases for MCPHandler base class."""
@pytest.fixture
def handler(self):
"""Create MCPHandler instance for testing."""
return MCPHandler()
@pytest.fixture
def sample_mcp_request(self):
"""Sample MCP request for testing."""
return MCPRequest(
method="get_news",
params={
"query": "삼성전자",
"limit": 10,
"include_analysis": True
}
)
def test_mcp_handler_initialization(self, handler):
"""Test MCPHandler initialization."""
assert handler is not None
assert hasattr(handler, 'handle_request')
assert hasattr(handler, 'validate_request')
assert hasattr(handler, 'format_response')
@pytest.mark.asyncio
async def test_request_validation(self, handler, sample_mcp_request):
"""Test MCP request validation."""
# Valid request
is_valid = await handler.validate_request(sample_mcp_request)
assert is_valid is True
# Invalid request - missing method
invalid_request = MCPRequest(method="", params={})
is_valid = await handler.validate_request(invalid_request)
assert is_valid is False
@pytest.mark.asyncio
async def test_response_formatting(self, handler):
"""Test MCP response formatting."""
data = {"news": [], "count": 0}
response = await handler.format_response(data, success=True)
assert isinstance(response, MCPResponse)
assert response.success is True
assert response.data == data
assert response.error is None
@pytest.mark.asyncio
async def test_error_handling(self, handler):
"""Test error handling in MCP handler."""
error_message = "Test error"
with pytest.raises(MCPError):
await handler.handle_request(None)
# Test error response formatting
error_response = await handler.format_response(
None, success=False, error=error_message
)
assert error_response.success is False
assert error_response.error == error_message
@pytest.mark.asyncio
async def test_method_routing(self, handler):
"""Test method routing in MCP handler."""
# Test unknown method handling
unknown_request = MCPRequest(method="unknown_method", params={})
with pytest.raises(MCPError) as exc_info:
await handler.handle_request(unknown_request)
assert "Unknown method" in str(exc_info.value)
class TestNewsHandler:
"""Test cases for NewsHandler."""
@pytest.fixture
def news_handler(self):
"""Create NewsHandler instance for testing."""
return NewsHandler()
@pytest.fixture
def sample_news_query(self):
"""Sample news query for testing."""
return NewsQuery(
keywords=["삼성전자", "실적"],
date_from=datetime.now(timezone.utc) - timedelta(days=7),
date_to=datetime.now(timezone.utc),
sources=["naver", "daum"],
limit=20,
include_analysis=True
)
@pytest.fixture
def sample_news_data(self):
"""Sample news data for testing."""
return [
{
"id": "news_1",
"title": "삼성전자 3분기 실적 발표",
"content": "삼성전자가 좋은 실적을 발표했습니다.",
"url": "https://example.com/news/1",
"source": "naver",
"published_at": datetime.now(timezone.utc),
"sentiment": {"score": 0.8, "label": "positive"},
"market_impact": {"score": 0.7, "prediction": "bullish"}
},
{
"id": "news_2",
"title": "삼성전자 주가 상승",
"content": "삼성전자 주가가 크게 상승했습니다.",
"url": "https://example.com/news/2",
"source": "daum",
"published_at": datetime.now(timezone.utc),
"sentiment": {"score": 0.9, "label": "positive"},
"market_impact": {"score": 0.8, "prediction": "bullish"}
}
]
def test_news_handler_initialization(self, news_handler):
"""Test NewsHandler initialization."""
assert news_handler is not None
assert hasattr(news_handler, 'get_news')
assert hasattr(news_handler, 'search_news')
assert hasattr(news_handler, 'get_news_by_id')
@pytest.mark.asyncio
async def test_get_news_basic(self, news_handler, sample_news_query, sample_news_data):
"""Test basic news retrieval."""
with patch.object(news_handler, '_fetch_news_from_db') as mock_fetch:
mock_fetch.return_value = sample_news_data
result = await news_handler.get_news(sample_news_query)
assert "news" in result
assert "metadata" in result
assert len(result["news"]) == 2
assert result["metadata"]["total_count"] == 2
@pytest.mark.asyncio
async def test_search_news_with_filters(self, news_handler):
"""Test news search with various filters."""
search_params = {
"keywords": ["삼성전자"],
"sentiment_filter": "positive",
"date_range": "7d",
"sources": ["naver"],
"min_impact_score": 0.5
}
with patch.object(news_handler, '_search_news_database') as mock_search:
mock_search.return_value = {"news": [], "count": 0}
result = await news_handler.search_news(search_params)
assert "news" in result
assert "search_metadata" in result
mock_search.assert_called_once()
@pytest.mark.asyncio
async def test_get_news_by_id(self, news_handler, sample_news_data):
"""Test retrieving specific news by ID."""
news_id = "news_1"
with patch.object(news_handler, '_get_single_news') as mock_get:
mock_get.return_value = sample_news_data[0]
result = await news_handler.get_news_by_id(news_id)
assert result["id"] == news_id
assert "sentiment" in result
assert "market_impact" in result
@pytest.mark.asyncio
async def test_news_aggregation(self, news_handler, sample_news_data):
"""Test news aggregation by various dimensions."""
aggregation_params = {
"group_by": "source",
"time_bucket": "1h",
"metrics": ["count", "avg_sentiment", "avg_impact"]
}
with patch.object(news_handler, '_aggregate_news') as mock_agg:
mock_agg.return_value = {
"aggregations": {
"naver": {"count": 10, "avg_sentiment": 0.7},
"daum": {"count": 8, "avg_sentiment": 0.6}
}
}
result = await news_handler.aggregate_news(aggregation_params)
assert "aggregations" in result
assert "naver" in result["aggregations"]
@pytest.mark.asyncio
async def test_news_trending_topics(self, news_handler):
"""Test trending topics extraction."""
time_window = "24h"
with patch.object(news_handler, '_extract_trending_topics') as mock_trends:
mock_trends.return_value = {
"trending_topics": [
{"topic": "삼성전자", "score": 0.95, "count": 50},
{"topic": "실적발표", "score": 0.87, "count": 30}
]
}
result = await news_handler.get_trending_topics(time_window)
assert "trending_topics" in result
assert len(result["trending_topics"]) == 2
@pytest.mark.asyncio
async def test_news_timeline(self, news_handler):
"""Test news timeline generation."""
timeline_params = {
"entity": "삼성전자",
"date_range": "7d",
"resolution": "1h"
}
with patch.object(news_handler, '_generate_timeline') as mock_timeline:
mock_timeline.return_value = {
"timeline": [
{"timestamp": "2024-01-01T00:00:00Z", "count": 5, "avg_sentiment": 0.6},
{"timestamp": "2024-01-01T01:00:00Z", "count": 8, "avg_sentiment": 0.7}
]
}
result = await news_handler.get_news_timeline(timeline_params)
assert "timeline" in result
assert len(result["timeline"]) == 2
@pytest.mark.asyncio
async def test_news_export(self, news_handler, sample_news_data):
"""Test news data export in various formats."""
export_params = {
"format": "json",
"query": {"keywords": ["삼성전자"]},
"include_analysis": True
}
with patch.object(news_handler, '_export_news_data') as mock_export:
# Convert datetime objects to strings for JSON serialization
serializable_data = []
for item in sample_news_data:
item_copy = item.copy()
item_copy['published_at'] = item_copy['published_at'].isoformat()
serializable_data.append(item_copy)
mock_export.return_value = json.dumps(serializable_data, ensure_ascii=False)
result = await news_handler.export_news(export_params)
assert isinstance(result, dict)
assert "exported_data" in result
exported_data = json.loads(result["exported_data"])
assert len(exported_data) == 2
class TestAnalysisHandler:
"""Test cases for AnalysisHandler."""
@pytest.fixture
def analysis_handler(self):
"""Create AnalysisHandler instance for testing."""
return AnalysisHandler()
@pytest.fixture
def sample_analysis_request(self):
"""Sample analysis request for testing."""
return AnalysisRequest(
analysis_type="sentiment",
news_ids=["news_1", "news_2"],
parameters={
"language": "korean",
"domain": "financial"
}
)
def test_analysis_handler_initialization(self, analysis_handler):
"""Test AnalysisHandler initialization."""
assert analysis_handler is not None
assert hasattr(analysis_handler, 'analyze_sentiment')
assert hasattr(analysis_handler, 'analyze_market_impact')
assert hasattr(analysis_handler, 'detect_rumors')
assert hasattr(analysis_handler, 'summarize_news')
@pytest.mark.asyncio
async def test_sentiment_analysis(self, analysis_handler):
"""Test sentiment analysis through MCP handler."""
request_data = {
"news_ids": ["news_1"],
"options": {"include_confidence": True}
}
with patch('src.analysis.sentiment_analyzer.SentimentAnalyzer') as mock_analyzer:
mock_result = Mock()
mock_result.sentiment = "positive"
mock_result.score = 0.8
mock_result.confidence = 0.9
mock_analyzer.return_value.analyze.return_value = mock_result
result = await analysis_handler.analyze_sentiment(request_data)
assert "sentiment_results" in result
assert len(result["sentiment_results"]) == 1
@pytest.mark.asyncio
async def test_market_impact_analysis(self, analysis_handler):
"""Test market impact analysis through MCP handler."""
request_data = {
"news_ids": ["news_1"],
"market_data": {"current_price": 70000, "volume": 1000000}
}
with patch('src.analysis.market_impact_analyzer.MarketImpactAnalyzer') as mock_analyzer:
mock_result = Mock()
mock_result.impact_score = 0.7
mock_result.price_prediction = "bullish"
mock_result.confidence_interval = [0.6, 0.8]
mock_analyzer.return_value.analyze_impact.return_value = mock_result
result = await analysis_handler.analyze_market_impact(request_data)
assert "impact_results" in result
assert result["impact_results"][0]["impact_score"] == 0.7
@pytest.mark.asyncio
async def test_rumor_detection(self, analysis_handler):
"""Test rumor detection through MCP handler."""
request_data = {
"news_ids": ["news_1"],
"detection_params": {"sensitivity": "high"}
}
with patch('src.analysis.rumor_detector.RumorDetector') as mock_detector:
mock_result = Mock()
mock_result.rumor_score = 0.3
mock_result.rumor_type = "misinformation"
mock_result.confidence = 0.8
mock_detector.return_value.analyze_rumor.return_value = mock_result
result = await analysis_handler.detect_rumors(request_data)
assert "rumor_results" in result
assert result["rumor_results"][0]["rumor_score"] == 0.3
@pytest.mark.asyncio
async def test_news_summarization(self, analysis_handler):
"""Test news summarization through MCP handler."""
request_data = {
"news_ids": ["news_1", "news_2"],
"summary_type": "extractive",
"length": "short"
}
with patch('src.analysis.news_summarizer.NewsSummarizer') as mock_summarizer:
mock_result = Mock()
mock_result.summary = "삼성전자 실적 호조로 주가 상승"
mock_result.confidence = 0.85
mock_result.key_points = ["실적 호조", "주가 상승"]
mock_summarizer.return_value.summarize.return_value = mock_result
result = await analysis_handler.summarize_news(request_data)
assert "summary_results" in result
assert len(result["summary_results"][0]["key_points"]) == 2
@pytest.mark.asyncio
async def test_batch_analysis(self, analysis_handler):
"""Test batch analysis of multiple news items."""
batch_request = {
"analyses": [
{"type": "sentiment", "news_ids": ["news_1"]},
{"type": "market_impact", "news_ids": ["news_2"]},
{"type": "rumor_detection", "news_ids": ["news_1", "news_2"]}
]
}
with patch.object(analysis_handler, 'analyze_sentiment') as mock_sentiment, \
patch.object(analysis_handler, 'analyze_market_impact') as mock_impact, \
patch.object(analysis_handler, 'detect_rumors') as mock_rumors:
mock_sentiment.return_value = {"sentiment_results": []}
mock_impact.return_value = {"impact_results": []}
mock_rumors.return_value = {"rumor_results": []}
result = await analysis_handler.run_batch_analysis(batch_request)
assert "batch_results" in result
assert len(result["batch_results"]) == 3
@pytest.mark.asyncio
async def test_analysis_comparison(self, analysis_handler):
"""Test analysis comparison between different time periods."""
comparison_request = {
"baseline_period": {"start": "2024-01-01", "end": "2024-01-07"},
"comparison_period": {"start": "2024-01-08", "end": "2024-01-14"},
"metrics": ["sentiment", "impact", "volume"]
}
with patch.object(analysis_handler, '_compare_analysis_periods') as mock_compare:
mock_compare.return_value = {
"comparison_results": {
"sentiment_change": 0.1,
"impact_change": -0.05,
"volume_change": 0.2
}
}
result = await analysis_handler.compare_analysis_periods(comparison_request)
assert "comparison_results" in result
assert "sentiment_change" in result["comparison_results"]
class TestMonitoringHandler:
"""Test cases for MonitoringHandler."""
@pytest.fixture
def monitoring_handler(self):
"""Create MonitoringHandler instance for testing."""
return MonitoringHandler()
@pytest.fixture
def sample_monitoring_request(self):
"""Sample monitoring request for testing."""
return MonitoringRequest(
monitor_type="realtime_alerts",
parameters={
"entities": ["삼성전자", "LG전자"],
"alert_threshold": 0.8,
"time_window": "1h"
}
)
def test_monitoring_handler_initialization(self, monitoring_handler):
"""Test MonitoringHandler initialization."""
assert monitoring_handler is not None
assert hasattr(monitoring_handler, 'get_realtime_status')
assert hasattr(monitoring_handler, 'configure_alerts')
assert hasattr(monitoring_handler, 'get_system_metrics')
@pytest.mark.asyncio
async def test_realtime_status(self, monitoring_handler):
"""Test real-time monitoring status retrieval."""
with patch('src.monitoring.realtime_monitor.RealtimeMonitor') as mock_monitor:
mock_monitor.return_value.get_status.return_value = {
"is_running": True,
"active_connections": 5,
"processed_items": 1500,
"queue_size": 10
}
result = await monitoring_handler.get_realtime_status()
assert "status" in result
assert result["status"]["is_running"] is True
assert result["status"]["active_connections"] == 5
@pytest.mark.asyncio
async def test_alert_configuration(self, monitoring_handler):
"""Test alert configuration through MCP handler."""
alert_config = {
"alert_type": "sentiment_spike",
"threshold": 0.9,
"entities": ["삼성전자"],
"notify_channels": ["email", "webhook"],
"conditions": {
"min_news_count": 5,
"time_window": "30m"
}
}
with patch.object(monitoring_handler, '_save_alert_config') as mock_save:
mock_save.return_value = {"config_id": "alert_123", "status": "active"}
result = await monitoring_handler.configure_alerts(alert_config)
assert "config_id" in result
assert result["status"] == "active"
@pytest.mark.asyncio
async def test_system_metrics(self, monitoring_handler):
"""Test system metrics retrieval."""
with patch.object(monitoring_handler, '_collect_system_metrics') as mock_metrics:
mock_metrics.return_value = {
"cpu_usage": 45.2,
"memory_usage": 67.8,
"disk_usage": 23.4,
"network_io": {"in": 1024, "out": 2048},
"database_connections": 15,
"cache_hit_rate": 0.95
}
result = await monitoring_handler.get_system_metrics()
assert "metrics" in result
assert result["metrics"]["cpu_usage"] == 45.2
assert result["metrics"]["cache_hit_rate"] == 0.95
@pytest.mark.asyncio
async def test_alert_history(self, monitoring_handler):
"""Test alert history retrieval."""
history_params = {
"date_range": "7d",
"alert_types": ["sentiment_spike", "market_impact"],
"limit": 50
}
with patch.object(monitoring_handler, '_get_alert_history') as mock_history:
mock_history.return_value = {
"alerts": [
{
"id": "alert_1",
"type": "sentiment_spike",
"entity": "삼성전자",
"triggered_at": "2024-01-01T10:00:00Z",
"severity": "high"
}
],
"total_count": 1
}
result = await monitoring_handler.get_alert_history(history_params)
assert "alerts" in result
assert len(result["alerts"]) == 1
@pytest.mark.asyncio
async def test_performance_monitoring(self, monitoring_handler):
"""Test performance monitoring and bottleneck detection."""
with patch.object(monitoring_handler, '_analyze_performance') as mock_perf:
mock_perf.return_value = {
"performance_metrics": {
"avg_response_time": 250, # ms
"throughput": 1500, # requests/min
"error_rate": 0.02,
"bottlenecks": ["database_queries", "sentiment_analysis"]
}
}
result = await monitoring_handler.get_performance_metrics()
assert "performance_metrics" in result
assert result["performance_metrics"]["avg_response_time"] == 250
assert "database_queries" in result["performance_metrics"]["bottlenecks"]
class TestToolHandler:
"""Test cases for ToolHandler."""
@pytest.fixture
def tool_handler(self):
"""Create ToolHandler instance for testing."""
return ToolHandler()
def test_tool_handler_initialization(self, tool_handler):
"""Test ToolHandler initialization."""
assert tool_handler is not None
assert hasattr(tool_handler, 'list_tools')
assert hasattr(tool_handler, 'execute_tool')
assert hasattr(tool_handler, 'get_tool_schema')
@pytest.mark.asyncio
async def test_list_available_tools(self, tool_handler):
"""Test listing available MCP tools."""
result = await tool_handler.list_tools()
assert "tools" in result
expected_tools = [
"get_news", "search_news", "analyze_sentiment",
"analyze_market_impact", "detect_rumors", "summarize_news",
"get_realtime_status", "configure_alerts"
]
tool_names = [tool["name"] for tool in result["tools"]]
for expected in expected_tools:
assert expected in tool_names
@pytest.mark.asyncio
async def test_get_tool_schema(self, tool_handler):
"""Test retrieving tool schema."""
tool_name = "get_news"
result = await tool_handler.get_tool_schema(tool_name)
assert "name" in result
assert "description" in result
assert "parameters" in result
assert result["name"] == tool_name
@pytest.mark.asyncio
async def test_execute_tool(self, tool_handler):
"""Test tool execution through MCP handler."""
tool_request = {
"tool_name": "get_news",
"parameters": {
"query": "삼성전자",
"limit": 5
}
}
with patch.object(tool_handler, '_execute_news_tool') as mock_execute:
mock_execute.return_value = {
"news": [],
"count": 0,
"execution_time": 150
}
result = await tool_handler.execute_tool(tool_request)
assert "result" in result
assert "execution_time" in result["result"]
@pytest.mark.asyncio
async def test_tool_validation(self, tool_handler):
"""Test tool parameter validation."""
# Valid parameters
valid_params = {
"tool_name": "analyze_sentiment",
"parameters": {"news_ids": ["news_1"], "language": "korean"}
}
is_valid = await tool_handler.validate_tool_request(valid_params)
assert is_valid is True
# Invalid parameters
invalid_params = {
"tool_name": "analyze_sentiment",
"parameters": {"invalid_param": "value"}
}
is_valid = await tool_handler.validate_tool_request(invalid_params)
assert is_valid is False
@pytest.mark.asyncio
async def test_tool_rate_limiting(self, tool_handler):
"""Test tool execution rate limiting."""
tool_request = {
"tool_name": "analyze_sentiment",
"parameters": {"news_ids": ["news_1"]}
}
with patch.object(tool_handler, '_check_rate_limit') as mock_limit:
# Allow execution
mock_limit.return_value = True
result = await tool_handler.execute_tool(tool_request)
assert "error" not in result
# Rate limit exceeded
mock_limit.return_value = False
result = await tool_handler.execute_tool(tool_request)
assert "error" in result
assert "rate limit" in result["error"].lower()
class TestMCPIntegration:
"""Integration tests for MCP handlers."""
@pytest.fixture
def mcp_server(self):
"""Create MCP server instance for testing."""
from src.server.mcp_handlers import MCPServer
return MCPServer()
@pytest.mark.asyncio
async def test_end_to_end_news_workflow(self, mcp_server):
"""Test complete news retrieval and analysis workflow."""
# Step 1: Get news
news_request = MCPRequest(
method="get_news",
params={"query": "삼성전자", "limit": 5}
)
news_response = await mcp_server.handle_request(news_request)
assert news_response.success is True
assert "news" in news_response.data
assert "metadata" in news_response.data
# Step 2: Analyze sentiment
analysis_request = MCPRequest(
method="analyze_sentiment",
params={"news_ids": ["news_1"]}
)
analysis_response = await mcp_server.handle_request(analysis_request)
assert analysis_response.success is True
assert "sentiment_results" in analysis_response.data
@pytest.mark.asyncio
async def test_concurrent_request_handling(self, mcp_server):
"""Test handling multiple concurrent MCP requests."""
import asyncio
requests = [
MCPRequest(method="get_news", params={"query": f"keyword_{i}"})
for i in range(10)
]
# Execute requests concurrently
tasks = [mcp_server.handle_request(req) for req in requests]
responses = await asyncio.gather(*tasks)
assert len(responses) == 10
assert all(resp.success for resp in responses)
@pytest.mark.asyncio
async def test_error_propagation(self, mcp_server):
"""Test error propagation through MCP layers."""
# Test database error
error_request = MCPRequest(
method="get_news",
params={"query": "삼성전자"}
)
# Test with invalid method to trigger error
invalid_request = MCPRequest(
method="invalid_method",
params={"query": "삼성전자"}
)
response = await mcp_server.handle_request(invalid_request)
assert response.success is False
assert "Unknown method" in response.error
@pytest.mark.asyncio
async def test_mcp_server_lifecycle(self, mcp_server):
"""Test MCP server startup and shutdown."""
# Test startup
await mcp_server.start()
assert mcp_server.is_running is True
# Test shutdown
await mcp_server.stop()
assert mcp_server.is_running is False
@pytest.mark.asyncio
async def test_resource_cleanup(self, mcp_server):
"""Test proper resource cleanup after request processing."""
request = MCPRequest(
method="get_news",
params={"query": "삼성전자"}
)
with patch.object(mcp_server, '_cleanup_resources') as mock_cleanup:
await mcp_server.handle_request(request)
mock_cleanup.assert_called_once()