We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""실시간 스트리밍 도구 테스트"""
import pytest
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.tools.streaming_tools import RealTimeStreamingTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestRealTimeStreamingTool:
"""실시간 스트리밍 도구 테스트"""
@pytest.fixture
def mock_db_manager(self):
"""Mock 데이터베이스 매니저"""
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock 캐시 매니저"""
return AsyncMock()
@pytest.fixture
def mock_kafka_producer(self):
"""Mock Kafka 프로듀서"""
producer = MagicMock()
producer.send = AsyncMock()
producer.flush = AsyncMock()
return producer
@pytest.fixture
def mock_kafka_consumer(self):
"""Mock Kafka 컨슈머"""
consumer = AsyncMock()
return consumer
@pytest.fixture
def streaming_tool(self, mock_db_manager, mock_cache_manager):
"""실시간 스트리밍 도구 인스턴스"""
return RealTimeStreamingTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_market_data(self):
"""샘플 실시간 시장 데이터"""
return {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2650.5,
"change": 5.2,
"change_rate": 0.196,
"volume": 125000000,
"transaction_value": 8500000000000,
"market_cap": 1850000000000000,
"individual_buy": 1200000000,
"individual_sell": 1100000000,
"institution_buy": 2500000000,
"institution_sell": 2400000000,
"foreign_buy": 800000000,
"foreign_sell": 750000000,
"advancing_issues": 485,
"declining_issues": 412,
"unchanged_issues": 103
}
def test_tool_initialization(self, streaming_tool, mock_db_manager, mock_cache_manager):
"""도구 초기화 테스트"""
assert streaming_tool.name == "start_realtime_stream"
assert streaming_tool.description is not None
assert "실시간" in streaming_tool.description or "real-time" in streaming_tool.description.lower()
assert streaming_tool.db_manager == mock_db_manager
assert streaming_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, streaming_tool):
"""도구 정의 테스트"""
definition = streaming_tool.get_tool_definition()
assert definition.name == "start_realtime_stream"
assert definition.description is not None
assert definition.inputSchema is not None
# 입력 스키마 검증
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "markets" in properties
assert "stream_types" in properties
assert "update_interval" in properties
assert "include_analytics" in properties
# stream_types 파라미터 검증
stream_prop = properties["stream_types"]
assert stream_prop["type"] == "array"
assert "market_data" in str(stream_prop)
assert "investor_flow" in str(stream_prop)
assert "market_breadth" in str(stream_prop)
@pytest.mark.asyncio
async def test_execute_market_data_stream(self, streaming_tool, sample_market_data):
"""시장 데이터 스트림 테스트"""
# 캐시 미스
streaming_tool.cache_manager.get.return_value = None
# 데이터베이스 응답 설정
streaming_tool.db_manager.fetch_all.return_value = [sample_market_data]
# 실행
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"update_interval": 5,
"duration": 30
})
# 결과 검증
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON 파싱하여 내용 확인
import json
data = json.loads(content.text)
assert "stream_info" in data
assert "stream_data" in data
assert "analytics" in data
# 스트림 정보 검증
stream_info = data["stream_info"]
assert "stream_id" in stream_info
assert "status" in stream_info
assert stream_info["status"] == "started"
assert "markets" in stream_info
assert "KOSPI" in stream_info["markets"]
@pytest.mark.asyncio
async def test_execute_investor_flow_stream(self, streaming_tool):
"""투자자 자금 흐름 스트림 테스트"""
# 투자자 흐름 데이터
investor_flow_data = [
{
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"individual_net": 100000000,
"institution_net": -50000000,
"foreign_net": 20000000,
"pension_net": 10000000,
"bank_net": -5000000,
"insurance_net": 15000000
}
]
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = investor_flow_data
# 실행
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["investor_flow"],
"update_interval": 10,
"include_analytics": True
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "stream_data" in data
stream_data = data["stream_data"]
assert "investor_flow" in stream_data
investor_data = stream_data["investor_flow"]
assert "individual_net" in investor_data[0]
assert "institution_net" in investor_data[0]
assert "foreign_net" in investor_data[0]
@pytest.mark.asyncio
async def test_execute_market_breadth_stream(self, streaming_tool):
"""시장 폭 스트림 테스트"""
breadth_data = [
{
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"advancing_issues": 485,
"declining_issues": 412,
"unchanged_issues": 103,
"new_highs": 25,
"new_lows": 18,
"advance_decline_ratio": 1.177,
"advance_volume": 65000000,
"decline_volume": 55000000,
"total_volume": 125000000
}
]
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = breadth_data
# 실행
result = await streaming_tool.execute({
"markets": ["KOSPI", "KOSDAQ"],
"stream_types": ["market_breadth"],
"update_interval": 15
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
stream_data = data["stream_data"]
assert "market_breadth" in stream_data
breadth = stream_data["market_breadth"][0]
assert "advancing_issues" in breadth
assert "declining_issues" in breadth
assert "advance_decline_ratio" in breadth
@pytest.mark.asyncio
async def test_comprehensive_stream(self, streaming_tool, sample_market_data):
"""종합 스트림 테스트 (모든 데이터 타입)"""
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = [sample_market_data]
# 실행
result = await streaming_tool.execute({
"markets": ["KOSPI", "KOSDAQ"],
"stream_types": ["market_data", "investor_flow", "market_breadth"],
"update_interval": 5,
"include_analytics": True,
"include_alerts": True,
"buffer_size": 100
})
# 결과 검증
content = result[0]
import json
data = json.loads(content.text)
assert "stream_info" in data
assert "stream_data" in data
assert "analytics" in data
assert "alerts" in data
# 모든 스트림 타입 확인
stream_data = data["stream_data"]
assert "market_data" in stream_data
assert "investor_flow" in stream_data
assert "market_breadth" in stream_data
def test_kafka_producer_setup(self, streaming_tool):
"""Kafka 프로듀서 설정 테스트"""
config = {
"bootstrap_servers": ["localhost:9092"],
"key_serializer": "json",
"value_serializer": "json",
"acks": "all",
"retries": 3
}
producer = streaming_tool._setup_kafka_producer(config)
assert producer is not None
assert hasattr(producer, 'config')
assert producer.config["bootstrap_servers"] == ["localhost:9092"]
def test_kafka_consumer_setup(self, streaming_tool):
"""Kafka 컨슈머 설정 테스트"""
config = {
"bootstrap_servers": ["localhost:9092"],
"group_id": "market_analytics",
"topics": ["market_data", "investor_flow"],
"auto_offset_reset": "latest"
}
consumer = streaming_tool._setup_kafka_consumer(config)
assert consumer is not None
assert hasattr(consumer, 'config')
assert consumer.config["group_id"] == "market_analytics"
assert consumer.config["topics"] == ["market_data", "investor_flow"]
@pytest.mark.asyncio
async def test_data_buffering_system(self, streaming_tool):
"""데이터 버퍼링 시스템 테스트"""
buffer_config = {
"buffer_size": 50,
"flush_interval": 10,
"compression": "gzip"
}
buffer = streaming_tool._create_data_buffer(buffer_config)
# 테스트 데이터 추가
test_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"value": 2650.5
}
await buffer.add(test_data)
await buffer.add(test_data)
# 버퍼 상태 확인
assert buffer.size() == 2
assert not buffer.is_full()
# 버퍼 데이터 조회
buffered_data = await buffer.get_all()
assert len(buffered_data) == 2
assert buffered_data[0]["market"] == "KOSPI"
@pytest.mark.asyncio
async def test_backpressure_handling(self, streaming_tool):
"""백프레셔 처리 테스트"""
# 높은 부하 상황 시뮬레이션
high_load_config = {
"max_queue_size": 10,
"drop_strategy": "oldest",
"backpressure_threshold": 0.8
}
queue = streaming_tool._create_backpressure_queue(high_load_config)
# 큐에 데이터 추가 (용량 초과)
for i in range(15):
await queue.put({"id": i, "data": f"test_data_{i}"})
# 백프레셔 상태 확인
assert queue.is_backpressure_active()
assert queue.qsize() <= 10 # 최대 크기 초과하지 않음
# 가장 오래된 데이터가 삭제되었는지 확인
oldest_item = await queue.get()
assert oldest_item["id"] >= 5 # 처음 5개는 삭제됨
@pytest.mark.asyncio
async def test_websocket_connection_manager(self, streaming_tool):
"""WebSocket 연결 관리 테스트"""
connection_manager = streaming_tool._create_websocket_manager()
# Mock WebSocket 연결
mock_ws1 = MagicMock()
mock_ws1.id = "client_1"
mock_ws2 = MagicMock()
mock_ws2.id = "client_2"
# 클라이언트 연결 추가
await connection_manager.add_connection(mock_ws1)
await connection_manager.add_connection(mock_ws2)
# 연결 상태 확인
assert connection_manager.get_connection_count() == 2
assert connection_manager.has_connection("client_1")
assert connection_manager.has_connection("client_2")
# 브로드캐스트 테스트
test_message = {"type": "market_update", "data": {"KOSPI": 2650.5}}
sent_count = await connection_manager.broadcast(test_message)
assert sent_count == 2
# 연결 제거
await connection_manager.remove_connection("client_1")
assert connection_manager.get_connection_count() == 1
assert not connection_manager.has_connection("client_1")
@pytest.mark.asyncio
async def test_stream_analytics_calculation(self, streaming_tool):
"""스트림 분석 계산 테스트"""
# 시계열 데이터 생성
time_series_data = []
base_time = datetime.now()
for i in range(60): # 1분간 5초 간격 데이터
timestamp = base_time + timedelta(seconds=i*5)
data_point = {
"timestamp": timestamp.isoformat(),
"value": 2650 + i * 0.5 + (i % 10) * 2, # 상승 추세 + 노이즈
"volume": 1000000 + i * 50000
}
time_series_data.append(data_point)
# 실시간 분석 수행
analytics = await streaming_tool._calculate_stream_analytics(time_series_data)
assert "trend" in analytics
assert "volatility" in analytics
assert "momentum" in analytics
assert "volume_profile" in analytics
# 트렌드 확인 (상승 추세여야 함)
assert analytics["trend"]["direction"] in ["up", "bullish"]
assert analytics["trend"]["strength"] > 0
# 변동성 확인
assert analytics["volatility"]["current"] >= 0
assert "percentile" in analytics["volatility"]
# 모멘텀 확인
assert "rsi" in analytics["momentum"]
assert "macd" in analytics["momentum"]
@pytest.mark.asyncio
async def test_alert_generation(self, streaming_tool):
"""알림 생성 테스트"""
# 알림 트리거 데이터
alert_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2700.0, # 임계값 초과
"change_rate": -3.5, # 급락
"volume": 500000000, # 거래량 급증
"volatility": 0.045 # 높은 변동성
}
alerts = await streaming_tool._generate_alerts(alert_data)
assert len(alerts) > 0
assert any(alert["type"] == "price_movement" for alert in alerts)
assert any(alert["severity"] in ["medium", "high"] for alert in alerts)
# 알림 내용 검증
price_alert = next(alert for alert in alerts if alert["type"] == "price_movement")
assert "급락" in price_alert["message"] or "decline" in price_alert["message"].lower()
assert price_alert["threshold"] is not None
assert price_alert["actual_value"] is not None
@pytest.mark.asyncio
async def test_stream_performance_monitoring(self, streaming_tool):
"""스트림 성능 모니터링 테스트"""
monitor = streaming_tool._create_performance_monitor()
# 성능 메트릭 기록
start_time = time.time()
for i in range(100):
await monitor.record_message_processed(f"msg_{i}", time.time())
if i % 10 == 0:
await asyncio.sleep(0.01) # 약간의 지연 추가
end_time = time.time()
# 성능 메트릭 확인
metrics = await monitor.get_metrics()
assert "messages_per_second" in metrics
assert "average_latency" in metrics
assert "total_messages" in metrics
assert "error_rate" in metrics
assert metrics["total_messages"] == 100
assert metrics["messages_per_second"] > 0
assert metrics["error_rate"] == 0 # 에러 없이 처리됨
@pytest.mark.asyncio
async def test_stream_recovery_mechanism(self, streaming_tool):
"""스트림 복구 메커니즘 테스트"""
recovery_config = {
"max_retries": 3,
"retry_delay": 0.1,
"circuit_breaker_threshold": 5,
"recovery_timeout": 30
}
recovery_manager = streaming_tool._create_recovery_manager(recovery_config)
# 연결 실패 시뮬레이션
with patch('asyncio.sleep'): # 실제 대기 시간 건너뛰기
# 첫 번째 시도 실패
success = await recovery_manager.attempt_recovery("kafka_connection")
assert not success # 첫 시도는 실패
# 재시도 후 성공 (Mock으로 성공 시뮬레이션)
with patch.object(recovery_manager, '_test_connection', return_value=True):
success = await recovery_manager.attempt_recovery("kafka_connection")
assert success
# 복구 메트릭 확인
metrics = recovery_manager.get_recovery_metrics()
assert "total_failures" in metrics
assert "successful_recoveries" in metrics
assert metrics["total_failures"] > 0
@pytest.mark.asyncio
async def test_data_validation(self, streaming_tool):
"""데이터 검증 테스트"""
# 유효한 데이터
valid_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2650.5,
"change_rate": 0.196
}
# 유효성 검사
is_valid = streaming_tool._validate_stream_data(valid_data, "market_data")
assert is_valid
# 무효한 데이터 (필수 필드 누락)
invalid_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI"
# index_value 누락
}
is_valid = streaming_tool._validate_stream_data(invalid_data, "market_data")
assert not is_valid
# 잘못된 데이터 타입
wrong_type_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": "not_a_number", # 문자열이지만 숫자여야 함
"change_rate": 0.196
}
is_valid = streaming_tool._validate_stream_data(wrong_type_data, "market_data")
assert not is_valid
@pytest.mark.asyncio
async def test_stream_filtering(self, streaming_tool):
"""스트림 필터링 테스트"""
# 필터 설정
filters = {
"markets": ["KOSPI"],
"min_volume": 1000000,
"max_change_rate": 5.0,
"exclude_after_hours": True
}
# 테스트 데이터
test_data = [
{"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "regular"},
{"market": "KOSDAQ", "volume": 1500000, "change_rate": 2.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 500000, "change_rate": 1.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 2000000, "change_rate": 6.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "after_hours"}
]
# 필터링 적용
filtered_data = await streaming_tool._apply_stream_filters(test_data, filters)
# 결과 검증 (첫 번째 데이터만 모든 조건 만족)
assert len(filtered_data) == 1
assert filtered_data[0]["market"] == "KOSPI"
assert filtered_data[0]["volume"] >= 1000000
assert filtered_data[0]["change_rate"] <= 5.0
assert filtered_data[0]["trading_session"] == "regular"
@pytest.mark.asyncio
async def test_cache_functionality(self, streaming_tool):
"""캐시 기능 테스트"""
# 캐시 히트 시나리오
cached_stream_info = {
"stream_id": "test_stream_123",
"status": "active",
"start_time": datetime.now().isoformat(),
"markets": ["KOSPI"],
"stream_types": ["market_data"]
}
streaming_tool.cache_manager.get.return_value = cached_stream_info
# 실행
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"use_cache": True
})
# 캐시에서 데이터 반환 확인
content = result[0]
import json
data = json.loads(content.text)
# 캐시된 데이터가 그대로 반환되는지 확인
assert data.get("stream_id") == "test_stream_123"
@pytest.mark.asyncio
async def test_error_handling(self, streaming_tool):
"""에러 처리 테스트"""
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB 연결 실패")
with pytest.raises(DatabaseConnectionError):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"]
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, streaming_tool):
"""잘못된 파라미터 테스트"""
# 빈 시장 목록
with pytest.raises(ValueError, match="At least one market"):
await streaming_tool.execute({
"markets": [],
"stream_types": ["market_data"]
})
# 빈 스트림 타입 목록
with pytest.raises(ValueError, match="At least one stream type"):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": []
})
# 잘못된 업데이트 간격
with pytest.raises(ValueError, match="Invalid update interval"):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"update_interval": 0
})
def test_stream_id_generation(self, streaming_tool):
"""스트림 ID 생성 테스트"""
markets = ["KOSPI", "KOSDAQ"]
stream_types = ["market_data", "investor_flow"]
stream_id = streaming_tool._generate_stream_id(markets, stream_types)
assert stream_id is not None
assert len(stream_id) > 10 # 충분히 긴 ID
assert "_" in stream_id or "-" in stream_id # 구분자 포함
# 동일한 파라미터로 다시 생성하면 다른 ID (타임스탬프 때문)
stream_id2 = streaming_tool._generate_stream_id(markets, stream_types)
assert stream_id != stream_id2
def test_message_serialization(self, streaming_tool):
"""메시지 직렬화 테스트"""
test_message = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"data": {
"index_value": 2650.5,
"volume": 125000000,
"nested_data": {
"field1": "value1",
"field2": 123
}
}
}
# JSON 직렬화
serialized = streaming_tool._serialize_message(test_message, "json")
assert isinstance(serialized, (str, bytes))
# 역직렬화하여 원본과 비교
deserialized = json.loads(serialized)
assert deserialized["market"] == "KOSPI"
assert deserialized["data"]["index_value"] == 2650.5
# 압축 직렬화
compressed = streaming_tool._serialize_message(test_message, "json_gzip")
assert isinstance(compressed, bytes)
assert len(compressed) < len(serialized) # 압축으로 크기 감소