test_streaming_tools.pyโข24.7 kB
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋๊ตฌ ํ
์คํธ"""
import pytest
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.tools.streaming_tools import RealTimeStreamingTool
from src.exceptions import DataValidationError, DatabaseConnectionError
class TestRealTimeStreamingTool:
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋๊ตฌ ํ
์คํธ"""
@pytest.fixture
def mock_db_manager(self):
"""Mock ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋งค๋์ """
return AsyncMock()
@pytest.fixture
def mock_cache_manager(self):
"""Mock ์บ์ ๋งค๋์ """
return AsyncMock()
@pytest.fixture
def mock_kafka_producer(self):
"""Mock Kafka ํ๋ก๋์"""
producer = MagicMock()
producer.send = AsyncMock()
producer.flush = AsyncMock()
return producer
@pytest.fixture
def mock_kafka_consumer(self):
"""Mock Kafka ์ปจ์๋จธ"""
consumer = AsyncMock()
return consumer
@pytest.fixture
def streaming_tool(self, mock_db_manager, mock_cache_manager):
"""์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋๊ตฌ ์ธ์คํด์ค"""
return RealTimeStreamingTool(mock_db_manager, mock_cache_manager)
@pytest.fixture
def sample_market_data(self):
"""์ํ ์ค์๊ฐ ์์ฅ ๋ฐ์ดํฐ"""
return {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2650.5,
"change": 5.2,
"change_rate": 0.196,
"volume": 125000000,
"transaction_value": 8500000000000,
"market_cap": 1850000000000000,
"individual_buy": 1200000000,
"individual_sell": 1100000000,
"institution_buy": 2500000000,
"institution_sell": 2400000000,
"foreign_buy": 800000000,
"foreign_sell": 750000000,
"advancing_issues": 485,
"declining_issues": 412,
"unchanged_issues": 103
}
def test_tool_initialization(self, streaming_tool, mock_db_manager, mock_cache_manager):
"""๋๊ตฌ ์ด๊ธฐํ ํ
์คํธ"""
assert streaming_tool.name == "start_realtime_stream"
assert streaming_tool.description is not None
assert "์ค์๊ฐ" in streaming_tool.description or "real-time" in streaming_tool.description.lower()
assert streaming_tool.db_manager == mock_db_manager
assert streaming_tool.cache_manager == mock_cache_manager
def test_tool_definition(self, streaming_tool):
"""๋๊ตฌ ์ ์ ํ
์คํธ"""
definition = streaming_tool.get_tool_definition()
assert definition.name == "start_realtime_stream"
assert definition.description is not None
assert definition.inputSchema is not None
# ์
๋ ฅ ์คํค๋ง ๊ฒ์ฆ
schema = definition.inputSchema
assert schema["type"] == "object"
assert "properties" in schema
properties = schema["properties"]
assert "markets" in properties
assert "stream_types" in properties
assert "update_interval" in properties
assert "include_analytics" in properties
# stream_types ํ๋ผ๋ฏธํฐ ๊ฒ์ฆ
stream_prop = properties["stream_types"]
assert stream_prop["type"] == "array"
assert "market_data" in str(stream_prop)
assert "investor_flow" in str(stream_prop)
assert "market_breadth" in str(stream_prop)
@pytest.mark.asyncio
async def test_execute_market_data_stream(self, streaming_tool, sample_market_data):
"""์์ฅ ๋ฐ์ดํฐ ์คํธ๋ฆผ ํ
์คํธ"""
# ์บ์ ๋ฏธ์ค
streaming_tool.cache_manager.get.return_value = None
# ๋ฐ์ดํฐ๋ฒ ์ด์ค ์๋ต ์ค์
streaming_tool.db_manager.fetch_all.return_value = [sample_market_data]
# ์คํ
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"update_interval": 5,
"duration": 30
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
assert len(result) == 1
content = result[0]
assert content.type == "text"
# JSON ํ์ฑํ์ฌ ๋ด์ฉ ํ์ธ
import json
data = json.loads(content.text)
assert "stream_info" in data
assert "stream_data" in data
assert "analytics" in data
# ์คํธ๋ฆผ ์ ๋ณด ๊ฒ์ฆ
stream_info = data["stream_info"]
assert "stream_id" in stream_info
assert "status" in stream_info
assert stream_info["status"] == "started"
assert "markets" in stream_info
assert "KOSPI" in stream_info["markets"]
@pytest.mark.asyncio
async def test_execute_investor_flow_stream(self, streaming_tool):
"""ํฌ์์ ์๊ธ ํ๋ฆ ์คํธ๋ฆผ ํ
์คํธ"""
# ํฌ์์ ํ๋ฆ ๋ฐ์ดํฐ
investor_flow_data = [
{
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"individual_net": 100000000,
"institution_net": -50000000,
"foreign_net": 20000000,
"pension_net": 10000000,
"bank_net": -5000000,
"insurance_net": 15000000
}
]
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = investor_flow_data
# ์คํ
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["investor_flow"],
"update_interval": 10,
"include_analytics": True
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
assert "stream_data" in data
stream_data = data["stream_data"]
assert "investor_flow" in stream_data
investor_data = stream_data["investor_flow"]
assert "individual_net" in investor_data[0]
assert "institution_net" in investor_data[0]
assert "foreign_net" in investor_data[0]
@pytest.mark.asyncio
async def test_execute_market_breadth_stream(self, streaming_tool):
"""์์ฅ ํญ ์คํธ๋ฆผ ํ
์คํธ"""
breadth_data = [
{
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"advancing_issues": 485,
"declining_issues": 412,
"unchanged_issues": 103,
"new_highs": 25,
"new_lows": 18,
"advance_decline_ratio": 1.177,
"advance_volume": 65000000,
"decline_volume": 55000000,
"total_volume": 125000000
}
]
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = breadth_data
# ์คํ
result = await streaming_tool.execute({
"markets": ["KOSPI", "KOSDAQ"],
"stream_types": ["market_breadth"],
"update_interval": 15
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
stream_data = data["stream_data"]
assert "market_breadth" in stream_data
breadth = stream_data["market_breadth"][0]
assert "advancing_issues" in breadth
assert "declining_issues" in breadth
assert "advance_decline_ratio" in breadth
@pytest.mark.asyncio
async def test_comprehensive_stream(self, streaming_tool, sample_market_data):
"""์ข
ํฉ ์คํธ๋ฆผ ํ
์คํธ (๋ชจ๋ ๋ฐ์ดํฐ ํ์
)"""
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.return_value = [sample_market_data]
# ์คํ
result = await streaming_tool.execute({
"markets": ["KOSPI", "KOSDAQ"],
"stream_types": ["market_data", "investor_flow", "market_breadth"],
"update_interval": 5,
"include_analytics": True,
"include_alerts": True,
"buffer_size": 100
})
# ๊ฒฐ๊ณผ ๊ฒ์ฆ
content = result[0]
import json
data = json.loads(content.text)
assert "stream_info" in data
assert "stream_data" in data
assert "analytics" in data
assert "alerts" in data
# ๋ชจ๋ ์คํธ๋ฆผ ํ์
ํ์ธ
stream_data = data["stream_data"]
assert "market_data" in stream_data
assert "investor_flow" in stream_data
assert "market_breadth" in stream_data
def test_kafka_producer_setup(self, streaming_tool):
"""Kafka ํ๋ก๋์ ์ค์ ํ
์คํธ"""
config = {
"bootstrap_servers": ["localhost:9092"],
"key_serializer": "json",
"value_serializer": "json",
"acks": "all",
"retries": 3
}
producer = streaming_tool._setup_kafka_producer(config)
assert producer is not None
assert hasattr(producer, 'config')
assert producer.config["bootstrap_servers"] == ["localhost:9092"]
def test_kafka_consumer_setup(self, streaming_tool):
"""Kafka ์ปจ์๋จธ ์ค์ ํ
์คํธ"""
config = {
"bootstrap_servers": ["localhost:9092"],
"group_id": "market_analytics",
"topics": ["market_data", "investor_flow"],
"auto_offset_reset": "latest"
}
consumer = streaming_tool._setup_kafka_consumer(config)
assert consumer is not None
assert hasattr(consumer, 'config')
assert consumer.config["group_id"] == "market_analytics"
assert consumer.config["topics"] == ["market_data", "investor_flow"]
@pytest.mark.asyncio
async def test_data_buffering_system(self, streaming_tool):
"""๋ฐ์ดํฐ ๋ฒํผ๋ง ์์คํ
ํ
์คํธ"""
buffer_config = {
"buffer_size": 50,
"flush_interval": 10,
"compression": "gzip"
}
buffer = streaming_tool._create_data_buffer(buffer_config)
# ํ
์คํธ ๋ฐ์ดํฐ ์ถ๊ฐ
test_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"value": 2650.5
}
await buffer.add(test_data)
await buffer.add(test_data)
# ๋ฒํผ ์ํ ํ์ธ
assert buffer.size() == 2
assert not buffer.is_full()
# ๋ฒํผ ๋ฐ์ดํฐ ์กฐํ
buffered_data = await buffer.get_all()
assert len(buffered_data) == 2
assert buffered_data[0]["market"] == "KOSPI"
@pytest.mark.asyncio
async def test_backpressure_handling(self, streaming_tool):
"""๋ฐฑํ๋ ์
์ฒ๋ฆฌ ํ
์คํธ"""
# ๋์ ๋ถํ ์ํฉ ์๋ฎฌ๋ ์ด์
high_load_config = {
"max_queue_size": 10,
"drop_strategy": "oldest",
"backpressure_threshold": 0.8
}
queue = streaming_tool._create_backpressure_queue(high_load_config)
# ํ์ ๋ฐ์ดํฐ ์ถ๊ฐ (์ฉ๋ ์ด๊ณผ)
for i in range(15):
await queue.put({"id": i, "data": f"test_data_{i}"})
# ๋ฐฑํ๋ ์
์ํ ํ์ธ
assert queue.is_backpressure_active()
assert queue.qsize() <= 10 # ์ต๋ ํฌ๊ธฐ ์ด๊ณผํ์ง ์์
# ๊ฐ์ฅ ์ค๋๋ ๋ฐ์ดํฐ๊ฐ ์ญ์ ๋์๋์ง ํ์ธ
oldest_item = await queue.get()
assert oldest_item["id"] >= 5 # ์ฒ์ 5๊ฐ๋ ์ญ์ ๋จ
@pytest.mark.asyncio
async def test_websocket_connection_manager(self, streaming_tool):
"""WebSocket ์ฐ๊ฒฐ ๊ด๋ฆฌ ํ
์คํธ"""
connection_manager = streaming_tool._create_websocket_manager()
# Mock WebSocket ์ฐ๊ฒฐ
mock_ws1 = MagicMock()
mock_ws1.id = "client_1"
mock_ws2 = MagicMock()
mock_ws2.id = "client_2"
# ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ์ถ๊ฐ
await connection_manager.add_connection(mock_ws1)
await connection_manager.add_connection(mock_ws2)
# ์ฐ๊ฒฐ ์ํ ํ์ธ
assert connection_manager.get_connection_count() == 2
assert connection_manager.has_connection("client_1")
assert connection_manager.has_connection("client_2")
# ๋ธ๋ก๋์บ์คํธ ํ
์คํธ
test_message = {"type": "market_update", "data": {"KOSPI": 2650.5}}
sent_count = await connection_manager.broadcast(test_message)
assert sent_count == 2
# ์ฐ๊ฒฐ ์ ๊ฑฐ
await connection_manager.remove_connection("client_1")
assert connection_manager.get_connection_count() == 1
assert not connection_manager.has_connection("client_1")
@pytest.mark.asyncio
async def test_stream_analytics_calculation(self, streaming_tool):
"""์คํธ๋ฆผ ๋ถ์ ๊ณ์ฐ ํ
์คํธ"""
# ์๊ณ์ด ๋ฐ์ดํฐ ์์ฑ
time_series_data = []
base_time = datetime.now()
for i in range(60): # 1๋ถ๊ฐ 5์ด ๊ฐ๊ฒฉ ๋ฐ์ดํฐ
timestamp = base_time + timedelta(seconds=i*5)
data_point = {
"timestamp": timestamp.isoformat(),
"value": 2650 + i * 0.5 + (i % 10) * 2, # ์์น ์ถ์ธ + ๋
ธ์ด์ฆ
"volume": 1000000 + i * 50000
}
time_series_data.append(data_point)
# ์ค์๊ฐ ๋ถ์ ์ํ
analytics = await streaming_tool._calculate_stream_analytics(time_series_data)
assert "trend" in analytics
assert "volatility" in analytics
assert "momentum" in analytics
assert "volume_profile" in analytics
# ํธ๋ ๋ ํ์ธ (์์น ์ถ์ธ์ฌ์ผ ํจ)
assert analytics["trend"]["direction"] in ["up", "bullish"]
assert analytics["trend"]["strength"] > 0
# ๋ณ๋์ฑ ํ์ธ
assert analytics["volatility"]["current"] >= 0
assert "percentile" in analytics["volatility"]
# ๋ชจ๋ฉํ
ํ์ธ
assert "rsi" in analytics["momentum"]
assert "macd" in analytics["momentum"]
@pytest.mark.asyncio
async def test_alert_generation(self, streaming_tool):
"""์๋ฆผ ์์ฑ ํ
์คํธ"""
# ์๋ฆผ ํธ๋ฆฌ๊ฑฐ ๋ฐ์ดํฐ
alert_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2700.0, # ์๊ณ๊ฐ ์ด๊ณผ
"change_rate": -3.5, # ๊ธ๋ฝ
"volume": 500000000, # ๊ฑฐ๋๋ ๊ธ์ฆ
"volatility": 0.045 # ๋์ ๋ณ๋์ฑ
}
alerts = await streaming_tool._generate_alerts(alert_data)
assert len(alerts) > 0
assert any(alert["type"] == "price_movement" for alert in alerts)
assert any(alert["severity"] in ["medium", "high"] for alert in alerts)
# ์๋ฆผ ๋ด์ฉ ๊ฒ์ฆ
price_alert = next(alert for alert in alerts if alert["type"] == "price_movement")
assert "๊ธ๋ฝ" in price_alert["message"] or "decline" in price_alert["message"].lower()
assert price_alert["threshold"] is not None
assert price_alert["actual_value"] is not None
@pytest.mark.asyncio
async def test_stream_performance_monitoring(self, streaming_tool):
"""์คํธ๋ฆผ ์ฑ๋ฅ ๋ชจ๋ํฐ๋ง ํ
์คํธ"""
monitor = streaming_tool._create_performance_monitor()
# ์ฑ๋ฅ ๋ฉํธ๋ฆญ ๊ธฐ๋ก
start_time = time.time()
for i in range(100):
await monitor.record_message_processed(f"msg_{i}", time.time())
if i % 10 == 0:
await asyncio.sleep(0.01) # ์ฝ๊ฐ์ ์ง์ฐ ์ถ๊ฐ
end_time = time.time()
# ์ฑ๋ฅ ๋ฉํธ๋ฆญ ํ์ธ
metrics = await monitor.get_metrics()
assert "messages_per_second" in metrics
assert "average_latency" in metrics
assert "total_messages" in metrics
assert "error_rate" in metrics
assert metrics["total_messages"] == 100
assert metrics["messages_per_second"] > 0
assert metrics["error_rate"] == 0 # ์๋ฌ ์์ด ์ฒ๋ฆฌ๋จ
@pytest.mark.asyncio
async def test_stream_recovery_mechanism(self, streaming_tool):
"""์คํธ๋ฆผ ๋ณต๊ตฌ ๋ฉ์ปค๋์ฆ ํ
์คํธ"""
recovery_config = {
"max_retries": 3,
"retry_delay": 0.1,
"circuit_breaker_threshold": 5,
"recovery_timeout": 30
}
recovery_manager = streaming_tool._create_recovery_manager(recovery_config)
# ์ฐ๊ฒฐ ์คํจ ์๋ฎฌ๋ ์ด์
with patch('asyncio.sleep'): # ์ค์ ๋๊ธฐ ์๊ฐ ๊ฑด๋๋ฐ๊ธฐ
# ์ฒซ ๋ฒ์งธ ์๋ ์คํจ
success = await recovery_manager.attempt_recovery("kafka_connection")
assert not success # ์ฒซ ์๋๋ ์คํจ
# ์ฌ์๋ ํ ์ฑ๊ณต (Mock์ผ๋ก ์ฑ๊ณต ์๋ฎฌ๋ ์ด์
)
with patch.object(recovery_manager, '_test_connection', return_value=True):
success = await recovery_manager.attempt_recovery("kafka_connection")
assert success
# ๋ณต๊ตฌ ๋ฉํธ๋ฆญ ํ์ธ
metrics = recovery_manager.get_recovery_metrics()
assert "total_failures" in metrics
assert "successful_recoveries" in metrics
assert metrics["total_failures"] > 0
@pytest.mark.asyncio
async def test_data_validation(self, streaming_tool):
"""๋ฐ์ดํฐ ๊ฒ์ฆ ํ
์คํธ"""
# ์ ํจํ ๋ฐ์ดํฐ
valid_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": 2650.5,
"change_rate": 0.196
}
# ์ ํจ์ฑ ๊ฒ์ฌ
is_valid = streaming_tool._validate_stream_data(valid_data, "market_data")
assert is_valid
# ๋ฌดํจํ ๋ฐ์ดํฐ (ํ์ ํ๋ ๋๋ฝ)
invalid_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI"
# index_value ๋๋ฝ
}
is_valid = streaming_tool._validate_stream_data(invalid_data, "market_data")
assert not is_valid
# ์๋ชป๋ ๋ฐ์ดํฐ ํ์
wrong_type_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"index_value": "not_a_number", # ๋ฌธ์์ด์ด์ง๋ง ์ซ์์ฌ์ผ ํจ
"change_rate": 0.196
}
is_valid = streaming_tool._validate_stream_data(wrong_type_data, "market_data")
assert not is_valid
@pytest.mark.asyncio
async def test_stream_filtering(self, streaming_tool):
"""์คํธ๋ฆผ ํํฐ๋ง ํ
์คํธ"""
# ํํฐ ์ค์
filters = {
"markets": ["KOSPI"],
"min_volume": 1000000,
"max_change_rate": 5.0,
"exclude_after_hours": True
}
# ํ
์คํธ ๋ฐ์ดํฐ
test_data = [
{"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "regular"},
{"market": "KOSDAQ", "volume": 1500000, "change_rate": 2.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 500000, "change_rate": 1.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 2000000, "change_rate": 6.0, "trading_session": "regular"},
{"market": "KOSPI", "volume": 2000000, "change_rate": 1.5, "trading_session": "after_hours"}
]
# ํํฐ๋ง ์ ์ฉ
filtered_data = await streaming_tool._apply_stream_filters(test_data, filters)
# ๊ฒฐ๊ณผ ๊ฒ์ฆ (์ฒซ ๋ฒ์งธ ๋ฐ์ดํฐ๋ง ๋ชจ๋ ์กฐ๊ฑด ๋ง์กฑ)
assert len(filtered_data) == 1
assert filtered_data[0]["market"] == "KOSPI"
assert filtered_data[0]["volume"] >= 1000000
assert filtered_data[0]["change_rate"] <= 5.0
assert filtered_data[0]["trading_session"] == "regular"
@pytest.mark.asyncio
async def test_cache_functionality(self, streaming_tool):
"""์บ์ ๊ธฐ๋ฅ ํ
์คํธ"""
# ์บ์ ํํธ ์๋๋ฆฌ์ค
cached_stream_info = {
"stream_id": "test_stream_123",
"status": "active",
"start_time": datetime.now().isoformat(),
"markets": ["KOSPI"],
"stream_types": ["market_data"]
}
streaming_tool.cache_manager.get.return_value = cached_stream_info
# ์คํ
result = await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"use_cache": True
})
# ์บ์์์ ๋ฐ์ดํฐ ๋ฐํ ํ์ธ
content = result[0]
import json
data = json.loads(content.text)
# ์บ์๋ ๋ฐ์ดํฐ๊ฐ ๊ทธ๋๋ก ๋ฐํ๋๋์ง ํ์ธ
assert data.get("stream_id") == "test_stream_123"
@pytest.mark.asyncio
async def test_error_handling(self, streaming_tool):
"""์๋ฌ ์ฒ๋ฆฌ ํ
์คํธ"""
streaming_tool.cache_manager.get.return_value = None
streaming_tool.db_manager.fetch_all.side_effect = DatabaseConnectionError("DB ์ฐ๊ฒฐ ์คํจ")
with pytest.raises(DatabaseConnectionError):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"]
})
@pytest.mark.asyncio
async def test_invalid_parameters(self, streaming_tool):
"""์๋ชป๋ ํ๋ผ๋ฏธํฐ ํ
์คํธ"""
# ๋น ์์ฅ ๋ชฉ๋ก
with pytest.raises(ValueError, match="At least one market"):
await streaming_tool.execute({
"markets": [],
"stream_types": ["market_data"]
})
# ๋น ์คํธ๋ฆผ ํ์
๋ชฉ๋ก
with pytest.raises(ValueError, match="At least one stream type"):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": []
})
# ์๋ชป๋ ์
๋ฐ์ดํธ ๊ฐ๊ฒฉ
with pytest.raises(ValueError, match="Invalid update interval"):
await streaming_tool.execute({
"markets": ["KOSPI"],
"stream_types": ["market_data"],
"update_interval": 0
})
def test_stream_id_generation(self, streaming_tool):
"""์คํธ๋ฆผ ID ์์ฑ ํ
์คํธ"""
markets = ["KOSPI", "KOSDAQ"]
stream_types = ["market_data", "investor_flow"]
stream_id = streaming_tool._generate_stream_id(markets, stream_types)
assert stream_id is not None
assert len(stream_id) > 10 # ์ถฉ๋ถํ ๊ธด ID
assert "_" in stream_id or "-" in stream_id # ๊ตฌ๋ถ์ ํฌํจ
# ๋์ผํ ํ๋ผ๋ฏธํฐ๋ก ๋ค์ ์์ฑํ๋ฉด ๋ค๋ฅธ ID (ํ์์คํฌํ ๋๋ฌธ)
stream_id2 = streaming_tool._generate_stream_id(markets, stream_types)
assert stream_id != stream_id2
def test_message_serialization(self, streaming_tool):
"""๋ฉ์์ง ์ง๋ ฌํ ํ
์คํธ"""
test_message = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"data": {
"index_value": 2650.5,
"volume": 125000000,
"nested_data": {
"field1": "value1",
"field2": 123
}
}
}
# JSON ์ง๋ ฌํ
serialized = streaming_tool._serialize_message(test_message, "json")
assert isinstance(serialized, (str, bytes))
# ์ญ์ง๋ ฌํํ์ฌ ์๋ณธ๊ณผ ๋น๊ต
deserialized = json.loads(serialized)
assert deserialized["market"] == "KOSPI"
assert deserialized["data"]["index_value"] == 2650.5
# ์์ถ ์ง๋ ฌํ
compressed = streaming_tool._serialize_message(test_message, "json_gzip")
assert isinstance(compressed, bytes)
assert len(compressed) < len(serialized) # ์์ถ์ผ๋ก ํฌ๊ธฐ ๊ฐ์