Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
test_streaming_tools.pyโ€ข24.7 kB
"""์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋„๊ตฌ ํ…Œ์ŠคํŠธ""" import pytest import asyncio import json import time from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch from typing import Dict, List, Any from src.tools.streaming_tools import RealTimeStreamingTool from src.exceptions import DataValidationError, DatabaseConnectionError class TestRealTimeStreamingTool: """์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋„๊ตฌ ํ…Œ์ŠคํŠธ""" @pytest.fixture def mock_db_manager(self): """Mock ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋งค๋‹ˆ์ €""" return AsyncMock() @pytest.fixture def mock_cache_manager(self): """Mock ์บ์‹œ ๋งค๋‹ˆ์ €""" return AsyncMock() @pytest.fixture def mock_kafka_producer(self): """Mock Kafka ํ”„๋กœ๋“€์„œ""" producer = MagicMock() producer.send = AsyncMock() producer.flush = AsyncMock() return producer @pytest.fixture def mock_kafka_consumer(self): """Mock Kafka ์ปจ์Šˆ๋จธ""" consumer = AsyncMock() return consumer @pytest.fixture def streaming_tool(self, mock_db_manager, mock_cache_manager): """์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋„๊ตฌ ์ธ์Šคํ„ด์Šค""" return RealTimeStreamingTool(mock_db_manager, mock_cache_manager) @pytest.fixture def sample_market_data(self): """์ƒ˜ํ”Œ ์‹ค์‹œ๊ฐ„ ์‹œ์žฅ ๋ฐ์ดํ„ฐ""" return { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "index_value": 2650.5, "change": 5.2, "change_rate": 0.196, "volume": 125000000, "transaction_value": 8500000000000, "market_cap": 1850000000000000, "individual_buy": 1200000000, "individual_sell": 1100000000, "institution_buy": 2500000000, "institution_sell": 2400000000, "foreign_buy": 800000000, "foreign_sell": 750000000, "advancing_issues": 485, "declining_issues": 412, "unchanged_issues": 103 } def test_tool_initialization(self, streaming_tool, mock_db_manager, mock_cache_manager): """๋„๊ตฌ ์ดˆ๊ธฐํ™” ํ…Œ์ŠคํŠธ""" assert streaming_tool.name == "start_realtime_stream" assert streaming_tool.description is not None assert "์‹ค์‹œ๊ฐ„" in streaming_tool.description or "real-time" in streaming_tool.description.lower() assert streaming_tool.db_manager == mock_db_manager assert streaming_tool.cache_manager == mock_cache_manager def test_tool_definition(self, streaming_tool): """๋„๊ตฌ ์ •์˜ ํ…Œ์ŠคํŠธ""" definition = streaming_tool.get_tool_definition() assert definition.name == "start_realtime_stream" assert definition.description is not None assert definition.inputSchema is not None # ์ž…๋ ฅ ์Šคํ‚ค๋งˆ ๊ฒ€์ฆ schema = definition.inputSchema assert schema["type"] == "object" assert "properties" in schema properties = schema["properties"] assert "markets" in properties assert "stream_types" in properties assert "update_interval" in properties assert "include_analytics" in properties # stream_types ํŒŒ๋ผ๋ฏธํ„ฐ ๊ฒ€์ฆ stream_prop = properties["stream_types"] assert stream_prop["type"] == "array" assert "market_data" in str(stream_prop) assert "investor_flow" in str(stream_prop) assert "market_breadth" in str(stream_prop) @pytest.mark.asyncio async def test_execute_market_data_stream(self, streaming_tool, sample_market_data): """์‹œ์žฅ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ ํ…Œ์ŠคํŠธ""" # ์บ์‹œ ๋ฏธ์Šค streaming_tool.cache_manager.get.return_value = None # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์‘๋‹ต ์„ค์ • streaming_tool.db_manager.fetch_all.return_value = [sample_market_data] # ์‹คํ–‰ result = await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": ["market_data"], "update_interval": 5, "duration": 30 }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ assert len(result) == 1 content = result[0] assert content.type == "text" # JSON ํŒŒ์‹ฑํ•˜์—ฌ ๋‚ด์šฉ ํ™•์ธ import json data = json.loads(content.text) assert "stream_info" in data assert "stream_data" in data assert "analytics" in data # ์ŠคํŠธ๋ฆผ ์ •๋ณด ๊ฒ€์ฆ stream_info = data["stream_info"] assert "stream_id" in stream_info assert "status" in stream_info assert stream_info["status"] == "started" assert "markets" in stream_info assert "KOSPI" in stream_info["markets"] @pytest.mark.asyncio async def test_execute_investor_flow_stream(self, streaming_tool): """ํˆฌ์ž์ž ์ž๊ธˆ ํ๋ฆ„ ์ŠคํŠธ๋ฆผ ํ…Œ์ŠคํŠธ""" # ํˆฌ์ž์ž ํ๋ฆ„ ๋ฐ์ดํ„ฐ investor_flow_data = [ { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "individual_net": 100000000, "institution_net": -50000000, "foreign_net": 20000000, "pension_net": 10000000, "bank_net": -5000000, "insurance_net": 15000000 } ] streaming_tool.cache_manager.get.return_value = None streaming_tool.db_manager.fetch_all.return_value = investor_flow_data # ์‹คํ–‰ result = await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": ["investor_flow"], "update_interval": 10, "include_analytics": True }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) assert "stream_data" in data stream_data = data["stream_data"] assert "investor_flow" in stream_data investor_data = stream_data["investor_flow"] assert "individual_net" in investor_data[0] assert "institution_net" in investor_data[0] assert "foreign_net" in investor_data[0] @pytest.mark.asyncio async def test_execute_market_breadth_stream(self, streaming_tool): """์‹œ์žฅ ํญ ์ŠคํŠธ๋ฆผ ํ…Œ์ŠคํŠธ""" breadth_data = [ { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "advancing_issues": 485, "declining_issues": 412, "unchanged_issues": 103, "new_highs": 25, "new_lows": 18, "advance_decline_ratio": 1.177, "advance_volume": 65000000, "decline_volume": 55000000, "total_volume": 125000000 } ] streaming_tool.cache_manager.get.return_value = None streaming_tool.db_manager.fetch_all.return_value = breadth_data # ์‹คํ–‰ result = await streaming_tool.execute({ "markets": ["KOSPI", "KOSDAQ"], "stream_types": ["market_breadth"], "update_interval": 15 }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) stream_data = data["stream_data"] assert "market_breadth" in stream_data breadth = stream_data["market_breadth"][0] assert "advancing_issues" in breadth assert "declining_issues" in breadth assert "advance_decline_ratio" in breadth @pytest.mark.asyncio async def test_comprehensive_stream(self, streaming_tool, sample_market_data): """์ข…ํ•ฉ ์ŠคํŠธ๋ฆผ ํ…Œ์ŠคํŠธ (๋ชจ๋“  ๋ฐ์ดํ„ฐ ํƒ€์ž…)""" streaming_tool.cache_manager.get.return_value = None streaming_tool.db_manager.fetch_all.return_value = [sample_market_data] # ์‹คํ–‰ result = await streaming_tool.execute({ "markets": ["KOSPI", "KOSDAQ"], "stream_types": ["market_data", "investor_flow", "market_breadth"], "update_interval": 5, "include_analytics": True, "include_alerts": True, "buffer_size": 100 }) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ content = result[0] import json data = json.loads(content.text) assert "stream_info" in data assert "stream_data" in data assert "analytics" in data assert "alerts" in data # ๋ชจ๋“  ์ŠคํŠธ๋ฆผ ํƒ€์ž… ํ™•์ธ stream_data = data["stream_data"] assert "market_data" in stream_data assert "investor_flow" in stream_data assert "market_breadth" in stream_data def test_kafka_producer_setup(self, streaming_tool): """Kafka ํ”„๋กœ๋“€์„œ ์„ค์ • ํ…Œ์ŠคํŠธ""" config = { "bootstrap_servers": ["localhost:9092"], "key_serializer": "json", "value_serializer": "json", "acks": "all", "retries": 3 } producer = streaming_tool._setup_kafka_producer(config) assert producer is not None assert hasattr(producer, 'config') assert producer.config["bootstrap_servers"] == ["localhost:9092"] def test_kafka_consumer_setup(self, streaming_tool): """Kafka ์ปจ์Šˆ๋จธ ์„ค์ • ํ…Œ์ŠคํŠธ""" config = { "bootstrap_servers": ["localhost:9092"], "group_id": "market_analytics", "topics": ["market_data", "investor_flow"], "auto_offset_reset": "latest" } consumer = streaming_tool._setup_kafka_consumer(config) assert consumer is not None assert hasattr(consumer, 'config') assert consumer.config["group_id"] == "market_analytics" assert consumer.config["topics"] == ["market_data", "investor_flow"] @pytest.mark.asyncio async def test_data_buffering_system(self, streaming_tool): """๋ฐ์ดํ„ฐ ๋ฒ„ํผ๋ง ์‹œ์Šคํ…œ ํ…Œ์ŠคํŠธ""" buffer_config = { "buffer_size": 50, "flush_interval": 10, "compression": "gzip" } buffer = streaming_tool._create_data_buffer(buffer_config) # ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ test_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "value": 2650.5 } await buffer.add(test_data) await buffer.add(test_data) # ๋ฒ„ํผ ์ƒํƒœ ํ™•์ธ assert buffer.size() == 2 assert not buffer.is_full() # ๋ฒ„ํผ ๋ฐ์ดํ„ฐ ์กฐํšŒ buffered_data = await buffer.get_all() assert len(buffered_data) == 2 assert buffered_data[0]["market"] == "KOSPI" @pytest.mark.asyncio async def test_backpressure_handling(self, streaming_tool): """๋ฐฑํ”„๋ ˆ์…” ์ฒ˜๋ฆฌ ํ…Œ์ŠคํŠธ""" # ๋†’์€ ๋ถ€ํ•˜ ์ƒํ™ฉ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ high_load_config = { "max_queue_size": 10, "drop_strategy": "oldest", "backpressure_threshold": 0.8 } queue = streaming_tool._create_backpressure_queue(high_load_config) # ํ์— ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ (์šฉ๋Ÿ‰ ์ดˆ๊ณผ) for i in range(15): await queue.put({"id": i, "data": f"test_data_{i}"}) # ๋ฐฑํ”„๋ ˆ์…” ์ƒํƒœ ํ™•์ธ assert queue.is_backpressure_active() assert queue.qsize() <= 10 # ์ตœ๋Œ€ ํฌ๊ธฐ ์ดˆ๊ณผํ•˜์ง€ ์•Š์Œ # ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ญ์ œ๋˜์—ˆ๋Š”์ง€ ํ™•์ธ oldest_item = await queue.get() assert oldest_item["id"] >= 5 # ์ฒ˜์Œ 5๊ฐœ๋Š” ์‚ญ์ œ๋จ @pytest.mark.asyncio async def test_websocket_connection_manager(self, streaming_tool): """WebSocket ์—ฐ๊ฒฐ ๊ด€๋ฆฌ ํ…Œ์ŠคํŠธ""" connection_manager = streaming_tool._create_websocket_manager() # Mock WebSocket ์—ฐ๊ฒฐ mock_ws1 = MagicMock() mock_ws1.id = "client_1" mock_ws2 = MagicMock() mock_ws2.id = "client_2" # ํด๋ผ์ด์–ธํŠธ ์—ฐ๊ฒฐ ์ถ”๊ฐ€ await connection_manager.add_connection(mock_ws1) await connection_manager.add_connection(mock_ws2) # ์—ฐ๊ฒฐ ์ƒํƒœ ํ™•์ธ assert connection_manager.get_connection_count() == 2 assert connection_manager.has_connection("client_1") assert connection_manager.has_connection("client_2") # ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ํ…Œ์ŠคํŠธ test_message = {"type": "market_update", "data": {"KOSPI": 2650.5}} sent_count = await connection_manager.broadcast(test_message) assert sent_count == 2 # ์—ฐ๊ฒฐ ์ œ๊ฑฐ await connection_manager.remove_connection("client_1") assert connection_manager.get_connection_count() == 1 assert not connection_manager.has_connection("client_1") @pytest.mark.asyncio async def test_stream_analytics_calculation(self, streaming_tool): """์ŠคํŠธ๋ฆผ ๋ถ„์„ ๊ณ„์‚ฐ ํ…Œ์ŠคํŠธ""" # ์‹œ๊ณ„์—ด ๋ฐ์ดํ„ฐ ์ƒ์„ฑ time_series_data = [] base_time = datetime.now() for i in range(60): # 1๋ถ„๊ฐ„ 5์ดˆ ๊ฐ„๊ฒฉ ๋ฐ์ดํ„ฐ timestamp = base_time + timedelta(seconds=i*5) data_point = { "timestamp": timestamp.isoformat(), "value": 2650 + i * 0.5 + (i % 10) * 2, # ์ƒ์Šน ์ถ”์„ธ + ๋…ธ์ด์ฆˆ "volume": 1000000 + i * 50000 } time_series_data.append(data_point) # ์‹ค์‹œ๊ฐ„ ๋ถ„์„ ์ˆ˜ํ–‰ analytics = await streaming_tool._calculate_stream_analytics(time_series_data) assert "trend" in analytics assert "volatility" in analytics assert "momentum" in analytics assert "volume_profile" in analytics # ํŠธ๋ Œ๋“œ ํ™•์ธ (์ƒ์Šน ์ถ”์„ธ์—ฌ์•ผ ํ•จ) assert analytics["trend"]["direction"] in ["up", "bullish"] assert analytics["trend"]["strength"] > 0 # ๋ณ€๋™์„ฑ ํ™•์ธ assert analytics["volatility"]["current"] >= 0 assert "percentile" in analytics["volatility"] # ๋ชจ๋ฉ˜ํ…€ ํ™•์ธ assert "rsi" in analytics["momentum"] assert "macd" in analytics["momentum"] @pytest.mark.asyncio async def test_alert_generation(self, streaming_tool): """์•Œ๋ฆผ ์ƒ์„ฑ ํ…Œ์ŠคํŠธ""" # ์•Œ๋ฆผ ํŠธ๋ฆฌ๊ฑฐ ๋ฐ์ดํ„ฐ alert_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "index_value": 2700.0, # ์ž„๊ณ„๊ฐ’ ์ดˆ๊ณผ "change_rate": -3.5, # ๊ธ‰๋ฝ "volume": 500000000, # ๊ฑฐ๋ž˜๋Ÿ‰ ๊ธ‰์ฆ "volatility": 0.045 # ๋†’์€ ๋ณ€๋™์„ฑ } alerts = await streaming_tool._generate_alerts(alert_data) assert len(alerts) > 0 assert any(alert["type"] == "price_movement" for alert in alerts) assert any(alert["severity"] in ["medium", "high"] for alert in alerts) # ์•Œ๋ฆผ ๋‚ด์šฉ ๊ฒ€์ฆ price_alert = next(alert for alert in alerts if alert["type"] == "price_movement") assert "๊ธ‰๋ฝ" in price_alert["message"] or "decline" in price_alert["message"].lower() assert price_alert["threshold"] is not None assert price_alert["actual_value"] is not None @pytest.mark.asyncio async def test_stream_performance_monitoring(self, streaming_tool): """์ŠคํŠธ๋ฆผ ์„ฑ๋Šฅ ๋ชจ๋‹ˆํ„ฐ๋ง ํ…Œ์ŠคํŠธ""" monitor = streaming_tool._create_performance_monitor() # ์„ฑ๋Šฅ ๋ฉ”ํŠธ๋ฆญ ๊ธฐ๋ก start_time = time.time() for i in range(100): await monitor.record_message_processed(f"msg_{i}", time.time()) if i % 10 == 0: await asyncio.sleep(0.01) # ์•ฝ๊ฐ„์˜ ์ง€์—ฐ ์ถ”๊ฐ€ end_time = time.time() # ์„ฑ๋Šฅ ๋ฉ”ํŠธ๋ฆญ ํ™•์ธ metrics = await monitor.get_metrics() assert "messages_per_second" in metrics assert "average_latency" in metrics assert "total_messages" in metrics assert "error_rate" in metrics assert metrics["total_messages"] == 100 assert metrics["messages_per_second"] > 0 assert metrics["error_rate"] == 0 # ์—๋Ÿฌ ์—†์ด ์ฒ˜๋ฆฌ๋จ @pytest.mark.asyncio async def test_stream_recovery_mechanism(self, streaming_tool): """์ŠคํŠธ๋ฆผ ๋ณต๊ตฌ ๋ฉ”์ปค๋‹ˆ์ฆ˜ ํ…Œ์ŠคํŠธ""" recovery_config = { "max_retries": 3, "retry_delay": 0.1, "circuit_breaker_threshold": 5, "recovery_timeout": 30 } recovery_manager = streaming_tool._create_recovery_manager(recovery_config) # ์—ฐ๊ฒฐ ์‹คํŒจ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ with patch('asyncio.sleep'): # ์‹ค์ œ ๋Œ€๊ธฐ ์‹œ๊ฐ„ ๊ฑด๋„ˆ๋›ฐ๊ธฐ # ์ฒซ ๋ฒˆ์งธ ์‹œ๋„ ์‹คํŒจ success = await recovery_manager.attempt_recovery("kafka_connection") assert not success # ์ฒซ ์‹œ๋„๋Š” ์‹คํŒจ # ์žฌ์‹œ๋„ ํ›„ ์„ฑ๊ณต (Mock์œผ๋กœ ์„ฑ๊ณต ์‹œ๋ฎฌ๋ ˆ์ด์…˜) with patch.object(recovery_manager, '_test_connection', return_value=True): success = await recovery_manager.attempt_recovery("kafka_connection") assert success # ๋ณต๊ตฌ ๋ฉ”ํŠธ๋ฆญ ํ™•์ธ metrics = recovery_manager.get_recovery_metrics() assert "total_failures" in metrics assert "successful_recoveries" in metrics assert metrics["total_failures"] > 0 @pytest.mark.asyncio async def test_data_validation(self, streaming_tool): """๋ฐ์ดํ„ฐ ๊ฒ€์ฆ ํ…Œ์ŠคํŠธ""" # ์œ ํšจํ•œ ๋ฐ์ดํ„ฐ valid_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "index_value": 2650.5, "change_rate": 0.196 } # ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ is_valid = streaming_tool._validate_stream_data(valid_data, "market_data") assert is_valid # ๋ฌดํšจํ•œ ๋ฐ์ดํ„ฐ (ํ•„์ˆ˜ ํ•„๋“œ ๋ˆ„๋ฝ) invalid_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI" # index_value ๋ˆ„๋ฝ } is_valid = streaming_tool._validate_stream_data(invalid_data, "market_data") assert not is_valid # ์ž˜๋ชป๋œ ๋ฐ์ดํ„ฐ ํƒ€์ž… wrong_type_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "index_value": "not_a_number", # ๋ฌธ์ž์—ด์ด์ง€๋งŒ ์ˆซ์ž์—ฌ์•ผ ํ•จ "change_rate": 0.196 } is_valid = streaming_tool._validate_stream_data(wrong_type_data, "market_data") assert not is_valid @pytest.mark.asyncio async def test_stream_filtering(self, streaming_tool): """์ŠคํŠธ๋ฆผ ํ•„ํ„ฐ๋ง ํ…Œ์ŠคํŠธ""" # ํ•„ํ„ฐ ์„ค์ • filters = { "markets": ["KOSPI"], "min_volume": 1000000, "max_change_rate": 5.0, "exclude_after_hours": True } # ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ test_data = [ {"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "regular"}, {"market": "KOSDAQ", "volume": 1500000, "change_rate": 2.0, "trading_session": "regular"}, {"market": "KOSPI", "volume": 500000, "change_rate": 1.0, "trading_session": "regular"}, {"market": "KOSPI", "volume": 2000000, "change_rate": 6.0, "trading_session": "regular"}, {"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "after_hours"} ] # ํ•„ํ„ฐ๋ง ์ ์šฉ filtered_data = await streaming_tool._apply_stream_filters(test_data, filters) # ๊ฒฐ๊ณผ ๊ฒ€์ฆ (์ฒซ ๋ฒˆ์งธ ๋ฐ์ดํ„ฐ๋งŒ ๋ชจ๋“  ์กฐ๊ฑด ๋งŒ์กฑ) assert len(filtered_data) == 1 assert filtered_data[0]["market"] == "KOSPI" assert filtered_data[0]["volume"] >= 1000000 assert filtered_data[0]["change_rate"] <= 5.0 assert filtered_data[0]["trading_session"] == "regular" @pytest.mark.asyncio async def test_cache_functionality(self, streaming_tool): """์บ์‹œ ๊ธฐ๋Šฅ ํ…Œ์ŠคํŠธ""" # ์บ์‹œ ํžˆํŠธ ์‹œ๋‚˜๋ฆฌ์˜ค cached_stream_info = { "stream_id": "test_stream_123", "status": "active", "start_time": datetime.now().isoformat(), "markets": ["KOSPI"], "stream_types": ["market_data"] } streaming_tool.cache_manager.get.return_value = cached_stream_info # ์‹คํ–‰ result = await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": ["market_data"], "use_cache": True }) # ์บ์‹œ์—์„œ ๋ฐ์ดํ„ฐ ๋ฐ˜ํ™˜ ํ™•์ธ content = result[0] import json data = json.loads(content.text) # ์บ์‹œ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ๊ทธ๋Œ€๋กœ ๋ฐ˜ํ™˜๋˜๋Š”์ง€ ํ™•์ธ assert data.get("stream_id") == "test_stream_123" @pytest.mark.asyncio async def test_error_handling(self, streaming_tool): """์—๋Ÿฌ ์ฒ˜๋ฆฌ ํ…Œ์ŠคํŠธ""" streaming_tool.cache_manager.get.return_value = None streaming_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB ์—ฐ๊ฒฐ ์‹คํŒจ") with pytest.raises(DatabaseConnectionError): await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": ["market_data"] }) @pytest.mark.asyncio async def test_invalid_parameters(self, streaming_tool): """์ž˜๋ชป๋œ ํŒŒ๋ผ๋ฏธํ„ฐ ํ…Œ์ŠคํŠธ""" # ๋นˆ ์‹œ์žฅ ๋ชฉ๋ก with pytest.raises(ValueError, match="At least one market"): await streaming_tool.execute({ "markets": [], "stream_types": ["market_data"] }) # ๋นˆ ์ŠคํŠธ๋ฆผ ํƒ€์ž… ๋ชฉ๋ก with pytest.raises(ValueError, match="At least one stream type"): await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": [] }) # ์ž˜๋ชป๋œ ์—…๋ฐ์ดํŠธ ๊ฐ„๊ฒฉ with pytest.raises(ValueError, match="Invalid update interval"): await streaming_tool.execute({ "markets": ["KOSPI"], "stream_types": ["market_data"], "update_interval": 0 }) def test_stream_id_generation(self, streaming_tool): """์ŠคํŠธ๋ฆผ ID ์ƒ์„ฑ ํ…Œ์ŠคํŠธ""" markets = ["KOSPI", "KOSDAQ"] stream_types = ["market_data", "investor_flow"] stream_id = streaming_tool._generate_stream_id(markets, stream_types) assert stream_id is not None assert len(stream_id) > 10 # ์ถฉ๋ถ„ํžˆ ๊ธด ID assert "_" in stream_id or "-" in stream_id # ๊ตฌ๋ถ„์ž ํฌํ•จ # ๋™์ผํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋‹ค์‹œ ์ƒ์„ฑํ•˜๋ฉด ๋‹ค๋ฅธ ID (ํƒ€์ž„์Šคํƒฌํ”„ ๋•Œ๋ฌธ) stream_id2 = streaming_tool._generate_stream_id(markets, stream_types) assert stream_id != stream_id2 def test_message_serialization(self, streaming_tool): """๋ฉ”์‹œ์ง€ ์ง๋ ฌํ™” ํ…Œ์ŠคํŠธ""" test_message = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "data": { "index_value": 2650.5, "volume": 125000000, "nested_data": { "field1": "value1", "field2": 123 } } } # JSON ์ง๋ ฌํ™” serialized = streaming_tool._serialize_message(test_message, "json") assert isinstance(serialized, (str, bytes)) # ์—ญ์ง๋ ฌํ™”ํ•˜์—ฌ ์›๋ณธ๊ณผ ๋น„๊ต deserialized = json.loads(serialized) assert deserialized["market"] == "KOSPI" assert deserialized["data"]["index_value"] == 2650.5 # ์••์ถ• ์ง๋ ฌํ™” compressed = streaming_tool._serialize_message(test_message, "json_gzip") assert isinstance(compressed, bytes) assert len(compressed) < len(serialized) # ์••์ถ•์œผ๋กœ ํฌ๊ธฐ ๊ฐ์†Œ

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server