test_realtime_server.py•37.2 kB
"""실시간 데이터 버퍼링 및 WebSocket 서버 테스트"""
import pytest
import asyncio
import json
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Any
from src.realtime.data_buffer import RealTimeDataBuffer
from src.realtime.websocket_server import WebSocketServer
from src.realtime.message_broker import MessageBroker
from src.realtime.connection_manager import ConnectionManager
from src.exceptions import BufferOverflowError, ConnectionError
class TestRealTimeDataBuffer:
"""실시간 데이터 버퍼 테스트"""
@pytest.fixture
def buffer_config(self):
"""버퍼 설정"""
return {
"max_size": 1000,
"flush_interval": 5.0,
"compression": True,
"persistence": True,
"ttl": 3600, # 1시간
"partition_count": 4
}
@pytest.fixture
def data_buffer(self, buffer_config):
"""데이터 버퍼 인스턴스"""
return RealTimeDataBuffer(buffer_config)
@pytest.fixture
def sample_market_data(self):
"""샘플 시장 데이터"""
return {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"symbol": "005930", # 삼성전자
"price": 75000,
"volume": 1000000,
"change": 500,
"change_rate": 0.67,
"bid": 74900,
"ask": 75100,
"bid_size": 5000,
"ask_size": 3000
}
def test_buffer_initialization(self, data_buffer, buffer_config):
"""버퍼 초기화 테스트"""
assert data_buffer.max_size == buffer_config["max_size"]
assert data_buffer.flush_interval == buffer_config["flush_interval"]
assert data_buffer.compression_enabled == buffer_config["compression"]
assert data_buffer.persistence_enabled == buffer_config["persistence"]
assert data_buffer.ttl == buffer_config["ttl"]
assert data_buffer.partition_count == buffer_config["partition_count"]
assert data_buffer.size() == 0
assert data_buffer.is_empty() == True
@pytest.mark.asyncio
async def test_add_single_data(self, data_buffer, sample_market_data):
"""단일 데이터 추가 테스트"""
# 데이터 추가
await data_buffer.add(sample_market_data)
# 크기 확인
assert data_buffer.size() == 1
assert not data_buffer.is_empty()
assert not data_buffer.is_full()
# 데이터 조회
data = await data_buffer.get_latest(1)
assert len(data) == 1
assert data[0]["symbol"] == "005930"
assert data[0]["price"] == 75000
@pytest.mark.asyncio
async def test_add_multiple_data(self, data_buffer):
"""다중 데이터 추가 테스트"""
# 100개 데이터 추가
for i in range(100):
market_data = {
"timestamp": (datetime.now() + timedelta(seconds=i)).isoformat(),
"market": "KOSPI",
"symbol": f"00593{i % 10}",
"price": 75000 + i,
"volume": 1000 * (i + 1)
}
await data_buffer.add(market_data)
# 크기 확인
assert data_buffer.size() == 100
# 최신 10개 데이터 조회
latest_data = await data_buffer.get_latest(10)
assert len(latest_data) == 10
# 최신 데이터가 먼저 오는지 확인 (가장 높은 price)
assert latest_data[0]["price"] == 75099
assert latest_data[9]["price"] == 75090
@pytest.mark.asyncio
async def test_buffer_partitioning(self, data_buffer):
"""버퍼 파티셔닝 테스트"""
# 다양한 심볼의 데이터 추가
symbols = ["005930", "000660", "035420", "207940"]
for i, symbol in enumerate(symbols * 10): # 각 심볼당 10개씩
market_data = {
"timestamp": datetime.now().isoformat(),
"market": "KOSPI",
"symbol": symbol,
"price": 50000 + i,
"volume": 1000
}
await data_buffer.add(market_data)
# 파티션별 데이터 확인
partition_stats = data_buffer.get_partition_stats()
assert len(partition_stats) == 4 # 4개 파티션
# 각 파티션에 데이터가 있는지 확인
total_items = sum(stats["item_count"] for stats in partition_stats.values())
assert total_items == 40 # 4 symbols * 10 items
@pytest.mark.asyncio
async def test_buffer_overflow_handling(self, buffer_config):
"""버퍼 오버플로우 처리 테스트"""
# 작은 버퍼 사이즈로 설정
buffer_config["max_size"] = 10
data_buffer = RealTimeDataBuffer(buffer_config)
# 버퍼 크기보다 많은 데이터 추가
for i in range(15):
market_data = {
"timestamp": datetime.now().isoformat(),
"symbol": f"TEST{i}",
"price": 1000 + i
}
await data_buffer.add(market_data)
# 최대 크기를 초과하지 않는지 확인 (오래된 데이터 삭제)
assert data_buffer.size() <= 10
# 가장 최신 데이터들만 남아있는지 확인
latest_data = await data_buffer.get_latest(5)
assert latest_data[0]["price"] >= 1010 # 최신 데이터
@pytest.mark.asyncio
async def test_time_based_filtering(self, data_buffer):
"""시간 기반 필터링 테스트"""
base_time = datetime.now()
# 다양한 시간의 데이터 추가
for i in range(10):
market_data = {
"timestamp": (base_time - timedelta(minutes=i)).isoformat(),
"symbol": "005930",
"price": 75000 + i
}
await data_buffer.add(market_data)
# 최근 5분 이내 데이터만 조회
since_time = base_time - timedelta(minutes=5)
recent_data = await data_buffer.get_since(since_time)
# 5분 이내 데이터만 있는지 확인 (0~5분, 총 6개)
assert len(recent_data) == 6
# 시간 순서 확인
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in recent_data]
assert all(ts >= since_time for ts in timestamps)
@pytest.mark.asyncio
async def test_symbol_based_filtering(self, data_buffer):
"""심볼 기반 필터링 테스트"""
symbols = ["005930", "000660", "035420"]
# 다양한 심볼 데이터 추가
for symbol in symbols:
for i in range(5):
market_data = {
"timestamp": datetime.now().isoformat(),
"symbol": symbol,
"price": 50000 + i
}
await data_buffer.add(market_data)
# 특정 심볼 데이터만 조회
samsung_data = await data_buffer.get_by_symbol("005930")
assert len(samsung_data) == 5
assert all(item["symbol"] == "005930" for item in samsung_data)
# 다중 심볼 조회
multi_symbol_data = await data_buffer.get_by_symbols(["005930", "000660"])
assert len(multi_symbol_data) == 10
symbols_in_result = set(item["symbol"] for item in multi_symbol_data)
assert symbols_in_result == {"005930", "000660"}
@pytest.mark.asyncio
async def test_data_compression(self, buffer_config):
"""데이터 압축 테스트"""
# 압축 활성화
buffer_config["compression"] = True
compressed_buffer = RealTimeDataBuffer(buffer_config)
# 압축 비활성화
buffer_config["compression"] = False
uncompressed_buffer = RealTimeDataBuffer(buffer_config)
# 동일한 대용량 데이터 추가
large_data = {
"timestamp": datetime.now().isoformat(),
"symbol": "005930",
"price": 75000,
"volume": 1000000,
"order_book": [{"price": 75000 + i, "size": 1000} for i in range(100)]
}
await compressed_buffer.add(large_data)
await uncompressed_buffer.add(large_data)
# 압축된 버퍼의 메모리 사용량이 더 적은지 확인
compressed_stats = compressed_buffer.get_memory_stats()
uncompressed_stats = uncompressed_buffer.get_memory_stats()
assert compressed_stats["compressed_size"] < uncompressed_stats["raw_size"]
assert compressed_stats["compression_ratio"] > 0
@pytest.mark.asyncio
async def test_buffer_persistence(self, data_buffer):
"""버퍼 지속성 테스트"""
# 데이터 추가
test_data = [
{"symbol": "005930", "price": 75000, "timestamp": datetime.now().isoformat()},
{"symbol": "000660", "price": 45000, "timestamp": datetime.now().isoformat()}
]
for data in test_data:
await data_buffer.add(data)
# 버퍼 저장
checkpoint_id = await data_buffer.create_checkpoint()
assert checkpoint_id is not None
# 버퍼 클리어
await data_buffer.clear()
assert data_buffer.size() == 0
# 체크포인트에서 복원
restored = await data_buffer.restore_from_checkpoint(checkpoint_id)
assert restored == True
assert data_buffer.size() == 2
# 복원된 데이터 확인
restored_data = await data_buffer.get_all()
symbols = [item["symbol"] for item in restored_data]
assert "005930" in symbols
assert "000660" in symbols
@pytest.mark.asyncio
async def test_ttl_expiration(self, buffer_config):
"""TTL 만료 테스트"""
# 짧은 TTL 설정 (1초)
buffer_config["ttl"] = 1
data_buffer = RealTimeDataBuffer(buffer_config)
# 데이터 추가
old_data = {
"timestamp": (datetime.now() - timedelta(seconds=2)).isoformat(),
"symbol": "005930",
"price": 75000
}
new_data = {
"timestamp": datetime.now().isoformat(),
"symbol": "000660",
"price": 45000
}
await data_buffer.add(old_data)
await data_buffer.add(new_data)
# TTL 정리 실행
await data_buffer.cleanup_expired()
# 만료된 데이터가 제거되었는지 확인
remaining_data = await data_buffer.get_all()
symbols = [item["symbol"] for item in remaining_data]
# new_data만 남아있어야 함
assert "000660" in symbols
# old_data는 TTL로 인해 제거되었을 수 있음
@pytest.mark.asyncio
async def test_buffer_stats(self, data_buffer):
"""버퍼 통계 테스트"""
# 다양한 데이터 추가
for i in range(50):
await data_buffer.add({
"timestamp": datetime.now().isoformat(),
"symbol": f"TEST{i % 5}",
"price": 1000 + i
})
# 통계 확인
stats = data_buffer.get_stats()
assert "total_items" in stats
assert "memory_usage" in stats
assert "partition_distribution" in stats
assert "compression_ratio" in stats
assert "oldest_timestamp" in stats
assert "newest_timestamp" in stats
assert stats["total_items"] == 50
assert stats["memory_usage"] > 0
@pytest.mark.asyncio
async def test_concurrent_access(self, data_buffer):
"""동시 접근 테스트"""
# 동시에 데이터 추가하는 태스크들
async def add_data_task(start_idx, count):
for i in range(count):
await data_buffer.add({
"timestamp": datetime.now().isoformat(),
"symbol": f"CONCURRENT{start_idx + i}",
"price": 1000 + start_idx + i
})
# 5개 태스크가 각각 20개씩 데이터 추가
tasks = [add_data_task(i * 20, 20) for i in range(5)]
await asyncio.gather(*tasks)
# 총 100개 데이터가 추가되었는지 확인
assert data_buffer.size() == 100
# 데이터 무결성 확인
all_data = await data_buffer.get_all()
symbols = set(item["symbol"] for item in all_data)
assert len(symbols) == 100 # 모든 심볼이 유니크
class TestWebSocketServer:
"""WebSocket 서버 테스트"""
@pytest.fixture
def server_config(self):
"""서버 설정"""
return {
"host": "localhost",
"port": 8765,
"max_connections": 100,
"heartbeat_interval": 30,
"message_queue_size": 1000,
"compression": True,
"authentication": True
}
@pytest.fixture
def mock_websocket(self):
"""Mock WebSocket 연결"""
ws = MagicMock()
ws.id = "test_client_123"
ws.remote_address = ("127.0.0.1", 12345)
ws.send = AsyncMock()
ws.recv = AsyncMock()
ws.close = AsyncMock()
ws.closed = False
return ws
@pytest.fixture
def websocket_server(self, server_config):
"""WebSocket 서버 인스턴스"""
return WebSocketServer(server_config)
def test_server_initialization(self, websocket_server, server_config):
"""서버 초기화 테스트"""
assert websocket_server.host == server_config["host"]
assert websocket_server.port == server_config["port"]
assert websocket_server.max_connections == server_config["max_connections"]
assert websocket_server.heartbeat_interval == server_config["heartbeat_interval"]
assert websocket_server.compression_enabled == server_config["compression"]
assert websocket_server.authentication_enabled == server_config["authentication"]
assert websocket_server.is_running == False
assert websocket_server.connection_count == 0
@pytest.mark.asyncio
async def test_server_start_stop(self, websocket_server):
"""서버 시작/중지 테스트"""
# 서버 시작
start_task = asyncio.create_task(websocket_server.start())
await asyncio.sleep(0.1) # 서버가 시작될 시간
assert websocket_server.is_running == True
# 서버 중지
await websocket_server.stop()
assert websocket_server.is_running == False
# 시작 태스크 정리
start_task.cancel()
try:
await start_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_client_connection(self, websocket_server, mock_websocket):
"""클라이언트 연결 테스트"""
# 클라이언트 연결
await websocket_server.handle_connection(mock_websocket)
# 연결 상태 확인
assert websocket_server.connection_count == 1
assert websocket_server.has_connection(mock_websocket.id)
# 연결 정보 확인
connection_info = websocket_server.get_connection_info(mock_websocket.id)
assert connection_info is not None
assert connection_info["id"] == mock_websocket.id
assert connection_info["remote_address"] == mock_websocket.remote_address
@pytest.mark.asyncio
async def test_client_disconnection(self, websocket_server, mock_websocket):
"""클라이언트 연결 해제 테스트"""
# 연결 후 해제
await websocket_server.handle_connection(mock_websocket)
await websocket_server.disconnect_client(mock_websocket.id)
# 연결 해제 확인
assert websocket_server.connection_count == 0
assert not websocket_server.has_connection(mock_websocket.id)
# WebSocket close 호출 확인
mock_websocket.close.assert_called_once()
@pytest.mark.asyncio
async def test_broadcast_message(self, websocket_server):
"""브로드캐스트 메시지 테스트"""
# 여러 클라이언트 연결
clients = []
for i in range(3):
mock_ws = MagicMock()
mock_ws.id = f"client_{i}"
mock_ws.send = AsyncMock()
mock_ws.closed = False
clients.append(mock_ws)
await websocket_server.handle_connection(mock_ws)
# 브로드캐스트 메시지
test_message = {
"type": "market_data",
"data": {"symbol": "005930", "price": 75000}
}
sent_count = await websocket_server.broadcast(test_message)
# 모든 클라이언트에게 전송되었는지 확인
assert sent_count == 3
for client in clients:
client.send.assert_called_once()
sent_message = client.send.call_args[0][0]
assert "market_data" in sent_message
@pytest.mark.asyncio
async def test_unicast_message(self, websocket_server, mock_websocket):
"""유니캐스트 메시지 테스트"""
# 클라이언트 연결
await websocket_server.handle_connection(mock_websocket)
# 특정 클라이언트에게 메시지 전송
test_message = {
"type": "personal_data",
"data": {"user_id": "test_user", "balance": 1000000}
}
success = await websocket_server.send_to_client(mock_websocket.id, test_message)
# 전송 성공 확인
assert success == True
mock_websocket.send.assert_called_once()
sent_message = mock_websocket.send.call_args[0][0]
assert "personal_data" in sent_message
@pytest.mark.asyncio
async def test_subscription_management(self, websocket_server, mock_websocket):
"""구독 관리 테스트"""
await websocket_server.handle_connection(mock_websocket)
# 토픽 구독
topics = ["KOSPI", "005930", "000660"]
for topic in topics:
await websocket_server.subscribe_client(mock_websocket.id, topic)
# 구독 상태 확인
subscriptions = websocket_server.get_client_subscriptions(mock_websocket.id)
assert set(subscriptions) == set(topics)
# 토픽 구독 해제
await websocket_server.unsubscribe_client(mock_websocket.id, "000660")
updated_subscriptions = websocket_server.get_client_subscriptions(mock_websocket.id)
assert "000660" not in updated_subscriptions
assert len(updated_subscriptions) == 2
@pytest.mark.asyncio
async def test_topic_based_broadcasting(self, websocket_server):
"""토픽 기반 브로드캐스팅 테스트"""
# 다른 토픽을 구독하는 클라이언트들
client1 = MagicMock()
client1.id = "client_kospi"
client1.send = AsyncMock()
client1.closed = False
client2 = MagicMock()
client2.id = "client_samsung"
client2.send = AsyncMock()
client2.closed = False
await websocket_server.handle_connection(client1)
await websocket_server.handle_connection(client2)
# 구독 설정
await websocket_server.subscribe_client("client_kospi", "KOSPI")
await websocket_server.subscribe_client("client_samsung", "005930")
# KOSPI 토픽 브로드캐스트
kospi_message = {
"type": "index_data",
"topic": "KOSPI",
"data": {"index": 2650, "change": 5.2}
}
sent_count = await websocket_server.broadcast_to_topic("KOSPI", kospi_message)
# KOSPI 구독자에게만 전송되었는지 확인
assert sent_count == 1
client1.send.assert_called_once()
client2.send.assert_not_called()
@pytest.mark.asyncio
async def test_heartbeat_mechanism(self, websocket_server, mock_websocket):
"""하트비트 메커니즘 테스트"""
await websocket_server.handle_connection(mock_websocket)
# 하트비트 시작
heartbeat_task = asyncio.create_task(
websocket_server.start_heartbeat(mock_websocket.id)
)
# 잠시 대기 후 하트비트 메시지 확인
await asyncio.sleep(0.1)
# 하트비트 메시지가 전송되었는지 확인 (실제로는 간격에 따라)
connection_info = websocket_server.get_connection_info(mock_websocket.id)
assert "last_heartbeat" in connection_info
# 하트비트 정리
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_connection_limit(self, websocket_server):
"""연결 제한 테스트"""
# 최대 연결 수 설정
websocket_server.max_connections = 2
# 최대 연결 수만큼 연결
clients = []
for i in range(2):
mock_ws = MagicMock()
mock_ws.id = f"client_{i}"
mock_ws.send = AsyncMock()
mock_ws.close = AsyncMock()
mock_ws.closed = False
clients.append(mock_ws)
await websocket_server.handle_connection(mock_ws)
assert websocket_server.connection_count == 2
# 추가 연결 시도 (제한 초과)
extra_client = MagicMock()
extra_client.id = "extra_client"
extra_client.close = AsyncMock()
extra_client.closed = False
# 연결 거부되어야 함
with pytest.raises(ConnectionError):
await websocket_server.handle_connection(extra_client)
@pytest.mark.asyncio
async def test_message_queuing(self, websocket_server, mock_websocket):
"""메시지 큐잉 테스트"""
await websocket_server.handle_connection(mock_websocket)
# 클라이언트를 일시적으로 사용 불가 상태로 설정
mock_websocket.send.side_effect = Exception("Connection lost")
# 메시지 전송 시도 (큐에 저장되어야 함)
test_message = {"type": "test", "data": "queued_message"}
success = await websocket_server.send_to_client(mock_websocket.id, test_message)
# 메시지가 큐에 저장되었는지 확인
queue_size = websocket_server.get_message_queue_size(mock_websocket.id)
assert queue_size > 0
# 클라이언트 복구 후 큐된 메시지 전송
mock_websocket.send.side_effect = None # 정상 상태로 복구
await websocket_server.flush_message_queue(mock_websocket.id)
# 큐가 비워졌는지 확인
queue_size_after = websocket_server.get_message_queue_size(mock_websocket.id)
assert queue_size_after == 0
@pytest.mark.asyncio
async def test_authentication(self, websocket_server, mock_websocket):
"""인증 테스트"""
# 인증 메시지
auth_message = {
"type": "auth",
"token": "valid_jwt_token_123",
"user_id": "test_user"
}
# 인증 처리
auth_result = await websocket_server.authenticate_client(mock_websocket.id, auth_message)
# 인증 성공 확인
assert auth_result["success"] == True
assert auth_result["user_id"] == "test_user"
# 인증된 클라이언트 정보 확인
connection_info = websocket_server.get_connection_info(mock_websocket.id)
assert connection_info["authenticated"] == True
assert connection_info["user_id"] == "test_user"
def test_server_stats(self, websocket_server):
"""서버 통계 테스트"""
stats = websocket_server.get_server_stats()
assert "total_connections" in stats
assert "active_connections" in stats
assert "messages_sent" in stats
assert "messages_received" in stats
assert "uptime" in stats
assert "memory_usage" in stats
assert stats["total_connections"] >= 0
assert stats["active_connections"] >= 0
class TestMessageBroker:
"""메시지 브로커 테스트"""
@pytest.fixture
def broker_config(self):
"""브로커 설정"""
return {
"queue_size": 10000,
"batch_size": 100,
"flush_interval": 1.0,
"persistence": True,
"priority_levels": 3
}
@pytest.fixture
def message_broker(self, broker_config):
"""메시지 브로커 인스턴스"""
return MessageBroker(broker_config)
def test_broker_initialization(self, message_broker, broker_config):
"""브로커 초기화 테스트"""
assert message_broker.queue_size == broker_config["queue_size"]
assert message_broker.batch_size == broker_config["batch_size"]
assert message_broker.flush_interval == broker_config["flush_interval"]
assert message_broker.persistence_enabled == broker_config["persistence"]
assert message_broker.priority_levels == broker_config["priority_levels"]
@pytest.mark.asyncio
async def test_message_publishing(self, message_broker):
"""메시지 발행 테스트"""
test_message = {
"type": "market_data",
"symbol": "005930",
"price": 75000,
"timestamp": datetime.now().isoformat()
}
# 메시지 발행
message_id = await message_broker.publish("market_updates", test_message)
assert message_id is not None
assert len(message_id) > 0
# 큐 크기 확인
queue_stats = message_broker.get_queue_stats("market_updates")
assert queue_stats["pending_messages"] == 1
@pytest.mark.asyncio
async def test_message_subscription(self, message_broker):
"""메시지 구독 테스트"""
subscriber_id = "test_subscriber"
topic = "price_alerts"
# 구독 등록
await message_broker.subscribe(subscriber_id, topic)
# 구독 상태 확인
subscriptions = message_broker.get_subscriptions(subscriber_id)
assert topic in subscriptions
# 토픽에 메시지 발행
test_message = {"alert": "Price target reached", "symbol": "005930"}
await message_broker.publish(topic, test_message)
# 구독자가 메시지를 받을 수 있는지 확인
messages = await message_broker.consume(subscriber_id, max_messages=1)
assert len(messages) == 1
assert messages[0]["data"]["alert"] == "Price target reached"
@pytest.mark.asyncio
async def test_priority_message_handling(self, message_broker):
"""우선순위 메시지 처리 테스트"""
topic = "system_alerts"
# 다양한 우선순위의 메시지 발행
messages = [
{"content": "Low priority", "priority": 0},
{"content": "High priority", "priority": 2},
{"content": "Medium priority", "priority": 1}
]
for msg in messages:
await message_broker.publish(topic, msg, priority=msg["priority"])
# 구독자 등록
subscriber_id = "priority_subscriber"
await message_broker.subscribe(subscriber_id, topic)
# 메시지 소비 (우선순위 순서로 와야 함)
consumed_messages = await message_broker.consume(subscriber_id, max_messages=3)
# 높은 우선순위 메시지가 먼저 와야 함
priorities = [msg["data"]["priority"] for msg in consumed_messages]
assert priorities == [2, 1, 0] # 높은 우선순위부터
@pytest.mark.asyncio
async def test_batch_message_processing(self, message_broker):
"""배치 메시지 처리 테스트"""
topic = "batch_data"
subscriber_id = "batch_subscriber"
await message_broker.subscribe(subscriber_id, topic)
# 여러 메시지 발행
for i in range(10):
message = {"batch_id": i, "data": f"batch_data_{i}"}
await message_broker.publish(topic, message)
# 배치로 메시지 소비
batch_messages = await message_broker.consume(subscriber_id, max_messages=5)
assert len(batch_messages) == 5
assert batch_messages[0]["data"]["batch_id"] == 0
assert batch_messages[4]["data"]["batch_id"] == 4
# 나머지 메시지 소비
remaining_messages = await message_broker.consume(subscriber_id, max_messages=10)
assert len(remaining_messages) == 5
@pytest.mark.asyncio
async def test_message_persistence(self, message_broker):
"""메시지 지속성 테스트"""
topic = "persistent_data"
# 지속성 메시지 발행
persistent_message = {
"id": "persist_001",
"critical_data": "This must survive restarts"
}
message_id = await message_broker.publish(
topic,
persistent_message,
persistent=True
)
# 브로커 재시작 시뮬레이션
await message_broker.save_state()
# 새 브로커 인스턴스로 상태 복원
new_config = message_broker.config.copy()
new_broker = MessageBroker(new_config)
await new_broker.restore_state()
# 구독자 등록 후 지속성 메시지 확인
subscriber_id = "persistent_subscriber"
await new_broker.subscribe(subscriber_id, topic)
messages = await new_broker.consume(subscriber_id, max_messages=1)
assert len(messages) == 1
assert messages[0]["data"]["id"] == "persist_001"
@pytest.mark.asyncio
async def test_dead_letter_queue(self, message_broker):
"""데드 레터 큐 테스트"""
topic = "problematic_topic"
subscriber_id = "failing_subscriber"
await message_broker.subscribe(subscriber_id, topic)
# 처리 실패를 시뮬레이션할 메시지
failing_message = {"will_fail": True, "data": "problematic_data"}
await message_broker.publish(topic, failing_message)
# 메시지 소비 시도 (실패 시뮬레이션)
with patch.object(message_broker, '_process_message', side_effect=Exception("Processing failed")):
messages = await message_broker.consume(subscriber_id, max_messages=1)
# 데드 레터 큐 확인
dlq_stats = message_broker.get_dead_letter_queue_stats()
assert dlq_stats["message_count"] > 0
# 데드 레터 큐에서 메시지 조회
dlq_messages = await message_broker.get_dead_letter_messages(max_messages=1)
assert len(dlq_messages) == 1
assert dlq_messages[0]["data"]["will_fail"] == True
def test_broker_statistics(self, message_broker):
"""브로커 통계 테스트"""
stats = message_broker.get_broker_stats()
assert "total_published" in stats
assert "total_consumed" in stats
assert "active_subscribers" in stats
assert "queue_sizes" in stats
assert "throughput" in stats
assert "error_rate" in stats
assert stats["total_published"] >= 0
assert stats["total_consumed"] >= 0
class TestConnectionManager:
"""연결 관리자 테스트"""
@pytest.fixture
def connection_manager(self):
"""연결 관리자 인스턴스"""
config = {
"max_connections_per_ip": 10,
"rate_limit_per_minute": 1000,
"cleanup_interval": 60
}
return ConnectionManager(config)
@pytest.fixture
def mock_connection(self):
"""Mock 연결"""
connection = MagicMock()
connection.id = "conn_123"
connection.remote_address = ("192.168.1.100", 12345)
connection.user_agent = "TestClient/1.0"
connection.created_at = datetime.now()
return connection
def test_connection_registration(self, connection_manager, mock_connection):
"""연결 등록 테스트"""
# 연결 등록
connection_manager.register_connection(mock_connection)
# 등록 확인
assert connection_manager.get_connection_count() == 1
assert connection_manager.has_connection(mock_connection.id)
# 연결 정보 확인
conn_info = connection_manager.get_connection_info(mock_connection.id)
assert conn_info["id"] == mock_connection.id
assert conn_info["remote_address"] == mock_connection.remote_address
def test_connection_unregistration(self, connection_manager, mock_connection):
"""연결 해제 테스트"""
# 연결 등록 후 해제
connection_manager.register_connection(mock_connection)
connection_manager.unregister_connection(mock_connection.id)
# 해제 확인
assert connection_manager.get_connection_count() == 0
assert not connection_manager.has_connection(mock_connection.id)
def test_ip_based_connection_limit(self, connection_manager):
"""IP 기반 연결 제한 테스트"""
# 동일 IP에서 여러 연결
same_ip = "192.168.1.100"
for i in range(connection_manager.max_connections_per_ip):
mock_conn = MagicMock()
mock_conn.id = f"conn_{i}"
mock_conn.remote_address = (same_ip, 12345 + i)
connection_manager.register_connection(mock_conn)
# 제한 내에서는 정상 등록
assert connection_manager.get_connection_count() == connection_manager.max_connections_per_ip
# 제한 초과 시 예외 발생
excess_conn = MagicMock()
excess_conn.id = "excess_conn"
excess_conn.remote_address = (same_ip, 99999)
with pytest.raises(ConnectionError):
connection_manager.register_connection(excess_conn)
def test_rate_limiting(self, connection_manager, mock_connection):
"""속도 제한 테스트"""
connection_manager.register_connection(mock_connection)
# 정상 요청 (제한 내)
for i in range(10):
allowed = connection_manager.check_rate_limit(mock_connection.id)
assert allowed == True
# 제한 초과 시뮬레이션 (매우 많은 요청)
# 실제로는 시간 기반으로 동작하지만 테스트에서는 직접 카운터 조작
connection_manager._set_request_count(mock_connection.id, 1001)
rate_limited = connection_manager.check_rate_limit(mock_connection.id)
assert rate_limited == False
def test_connection_statistics(self, connection_manager):
"""연결 통계 테스트"""
# 여러 연결 등록
for i in range(5):
mock_conn = MagicMock()
mock_conn.id = f"stats_conn_{i}"
mock_conn.remote_address = (f"192.168.1.{100 + i}", 12345)
connection_manager.register_connection(mock_conn)
# 통계 확인
stats = connection_manager.get_connection_stats()
assert "total_connections" in stats
assert "connections_by_ip" in stats
assert "average_connection_duration" in stats
assert "request_rates" in stats
assert stats["total_connections"] == 5
assert len(stats["connections_by_ip"]) == 5
def test_connection_cleanup(self, connection_manager):
"""연결 정리 테스트"""
# 오래된 연결 시뮬레이션
old_connection = MagicMock()
old_connection.id = "old_conn"
old_connection.remote_address = ("192.168.1.200", 12345)
old_connection.created_at = datetime.now() - timedelta(hours=2)
old_connection.last_activity = datetime.now() - timedelta(hours=1)
connection_manager.register_connection(old_connection)
# 정리 실행
cleaned_count = connection_manager.cleanup_stale_connections(max_idle_minutes=30)
# 오래된 연결이 정리되었는지 확인
assert cleaned_count >= 0
# 실제 정리 여부는 구현에 따라 다름