Skip to main content
Glama

MCP Market Statistics Server

by whdghk1907
test_realtime_server.py37.2 kB
"""실시간 데이터 버퍼링 및 WebSocket 서버 테스트""" import pytest import asyncio import json import time from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch from typing import Dict, List, Any from src.realtime.data_buffer import RealTimeDataBuffer from src.realtime.websocket_server import WebSocketServer from src.realtime.message_broker import MessageBroker from src.realtime.connection_manager import ConnectionManager from src.exceptions import BufferOverflowError, ConnectionError class TestRealTimeDataBuffer: """실시간 데이터 버퍼 테스트""" @pytest.fixture def buffer_config(self): """버퍼 설정""" return { "max_size": 1000, "flush_interval": 5.0, "compression": True, "persistence": True, "ttl": 3600, # 1시간 "partition_count": 4 } @pytest.fixture def data_buffer(self, buffer_config): """데이터 버퍼 인스턴스""" return RealTimeDataBuffer(buffer_config) @pytest.fixture def sample_market_data(self): """샘플 시장 데이터""" return { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "symbol": "005930", # 삼성전자 "price": 75000, "volume": 1000000, "change": 500, "change_rate": 0.67, "bid": 74900, "ask": 75100, "bid_size": 5000, "ask_size": 3000 } def test_buffer_initialization(self, data_buffer, buffer_config): """버퍼 초기화 테스트""" assert data_buffer.max_size == buffer_config["max_size"] assert data_buffer.flush_interval == buffer_config["flush_interval"] assert data_buffer.compression_enabled == buffer_config["compression"] assert data_buffer.persistence_enabled == buffer_config["persistence"] assert data_buffer.ttl == buffer_config["ttl"] assert data_buffer.partition_count == buffer_config["partition_count"] assert data_buffer.size() == 0 assert data_buffer.is_empty() == True @pytest.mark.asyncio async def test_add_single_data(self, data_buffer, sample_market_data): """단일 데이터 추가 테스트""" # 데이터 추가 await data_buffer.add(sample_market_data) # 크기 확인 assert data_buffer.size() == 1 assert not data_buffer.is_empty() assert not data_buffer.is_full() # 데이터 조회 data = await data_buffer.get_latest(1) assert len(data) == 1 assert data[0]["symbol"] == "005930" assert data[0]["price"] == 75000 @pytest.mark.asyncio async def test_add_multiple_data(self, data_buffer): """다중 데이터 추가 테스트""" # 100개 데이터 추가 for i in range(100): market_data = { "timestamp": (datetime.now() + timedelta(seconds=i)).isoformat(), "market": "KOSPI", "symbol": f"00593{i % 10}", "price": 75000 + i, "volume": 1000 * (i + 1) } await data_buffer.add(market_data) # 크기 확인 assert data_buffer.size() == 100 # 최신 10개 데이터 조회 latest_data = await data_buffer.get_latest(10) assert len(latest_data) == 10 # 최신 데이터가 먼저 오는지 확인 (가장 높은 price) assert latest_data[0]["price"] == 75099 assert latest_data[9]["price"] == 75090 @pytest.mark.asyncio async def test_buffer_partitioning(self, data_buffer): """버퍼 파티셔닝 테스트""" # 다양한 심볼의 데이터 추가 symbols = ["005930", "000660", "035420", "207940"] for i, symbol in enumerate(symbols * 10): # 각 심볼당 10개씩 market_data = { "timestamp": datetime.now().isoformat(), "market": "KOSPI", "symbol": symbol, "price": 50000 + i, "volume": 1000 } await data_buffer.add(market_data) # 파티션별 데이터 확인 partition_stats = data_buffer.get_partition_stats() assert len(partition_stats) == 4 # 4개 파티션 # 각 파티션에 데이터가 있는지 확인 total_items = sum(stats["item_count"] for stats in partition_stats.values()) assert total_items == 40 # 4 symbols * 10 items @pytest.mark.asyncio async def test_buffer_overflow_handling(self, buffer_config): """버퍼 오버플로우 처리 테스트""" # 작은 버퍼 사이즈로 설정 buffer_config["max_size"] = 10 data_buffer = RealTimeDataBuffer(buffer_config) # 버퍼 크기보다 많은 데이터 추가 for i in range(15): market_data = { "timestamp": datetime.now().isoformat(), "symbol": f"TEST{i}", "price": 1000 + i } await data_buffer.add(market_data) # 최대 크기를 초과하지 않는지 확인 (오래된 데이터 삭제) assert data_buffer.size() <= 10 # 가장 최신 데이터들만 남아있는지 확인 latest_data = await data_buffer.get_latest(5) assert latest_data[0]["price"] >= 1010 # 최신 데이터 @pytest.mark.asyncio async def test_time_based_filtering(self, data_buffer): """시간 기반 필터링 테스트""" base_time = datetime.now() # 다양한 시간의 데이터 추가 for i in range(10): market_data = { "timestamp": (base_time - timedelta(minutes=i)).isoformat(), "symbol": "005930", "price": 75000 + i } await data_buffer.add(market_data) # 최근 5분 이내 데이터만 조회 since_time = base_time - timedelta(minutes=5) recent_data = await data_buffer.get_since(since_time) # 5분 이내 데이터만 있는지 확인 (0~5분, 총 6개) assert len(recent_data) == 6 # 시간 순서 확인 timestamps = [datetime.fromisoformat(item["timestamp"]) for item in recent_data] assert all(ts >= since_time for ts in timestamps) @pytest.mark.asyncio async def test_symbol_based_filtering(self, data_buffer): """심볼 기반 필터링 테스트""" symbols = ["005930", "000660", "035420"] # 다양한 심볼 데이터 추가 for symbol in symbols: for i in range(5): market_data = { "timestamp": datetime.now().isoformat(), "symbol": symbol, "price": 50000 + i } await data_buffer.add(market_data) # 특정 심볼 데이터만 조회 samsung_data = await data_buffer.get_by_symbol("005930") assert len(samsung_data) == 5 assert all(item["symbol"] == "005930" for item in samsung_data) # 다중 심볼 조회 multi_symbol_data = await data_buffer.get_by_symbols(["005930", "000660"]) assert len(multi_symbol_data) == 10 symbols_in_result = set(item["symbol"] for item in multi_symbol_data) assert symbols_in_result == {"005930", "000660"} @pytest.mark.asyncio async def test_data_compression(self, buffer_config): """데이터 압축 테스트""" # 압축 활성화 buffer_config["compression"] = True compressed_buffer = RealTimeDataBuffer(buffer_config) # 압축 비활성화 buffer_config["compression"] = False uncompressed_buffer = RealTimeDataBuffer(buffer_config) # 동일한 대용량 데이터 추가 large_data = { "timestamp": datetime.now().isoformat(), "symbol": "005930", "price": 75000, "volume": 1000000, "order_book": [{"price": 75000 + i, "size": 1000} for i in range(100)] } await compressed_buffer.add(large_data) await uncompressed_buffer.add(large_data) # 압축된 버퍼의 메모리 사용량이 더 적은지 확인 compressed_stats = compressed_buffer.get_memory_stats() uncompressed_stats = uncompressed_buffer.get_memory_stats() assert compressed_stats["compressed_size"] < uncompressed_stats["raw_size"] assert compressed_stats["compression_ratio"] > 0 @pytest.mark.asyncio async def test_buffer_persistence(self, data_buffer): """버퍼 지속성 테스트""" # 데이터 추가 test_data = [ {"symbol": "005930", "price": 75000, "timestamp": datetime.now().isoformat()}, {"symbol": "000660", "price": 45000, "timestamp": datetime.now().isoformat()} ] for data in test_data: await data_buffer.add(data) # 버퍼 저장 checkpoint_id = await data_buffer.create_checkpoint() assert checkpoint_id is not None # 버퍼 클리어 await data_buffer.clear() assert data_buffer.size() == 0 # 체크포인트에서 복원 restored = await data_buffer.restore_from_checkpoint(checkpoint_id) assert restored == True assert data_buffer.size() == 2 # 복원된 데이터 확인 restored_data = await data_buffer.get_all() symbols = [item["symbol"] for item in restored_data] assert "005930" in symbols assert "000660" in symbols @pytest.mark.asyncio async def test_ttl_expiration(self, buffer_config): """TTL 만료 테스트""" # 짧은 TTL 설정 (1초) buffer_config["ttl"] = 1 data_buffer = RealTimeDataBuffer(buffer_config) # 데이터 추가 old_data = { "timestamp": (datetime.now() - timedelta(seconds=2)).isoformat(), "symbol": "005930", "price": 75000 } new_data = { "timestamp": datetime.now().isoformat(), "symbol": "000660", "price": 45000 } await data_buffer.add(old_data) await data_buffer.add(new_data) # TTL 정리 실행 await data_buffer.cleanup_expired() # 만료된 데이터가 제거되었는지 확인 remaining_data = await data_buffer.get_all() symbols = [item["symbol"] for item in remaining_data] # new_data만 남아있어야 함 assert "000660" in symbols # old_data는 TTL로 인해 제거되었을 수 있음 @pytest.mark.asyncio async def test_buffer_stats(self, data_buffer): """버퍼 통계 테스트""" # 다양한 데이터 추가 for i in range(50): await data_buffer.add({ "timestamp": datetime.now().isoformat(), "symbol": f"TEST{i % 5}", "price": 1000 + i }) # 통계 확인 stats = data_buffer.get_stats() assert "total_items" in stats assert "memory_usage" in stats assert "partition_distribution" in stats assert "compression_ratio" in stats assert "oldest_timestamp" in stats assert "newest_timestamp" in stats assert stats["total_items"] == 50 assert stats["memory_usage"] > 0 @pytest.mark.asyncio async def test_concurrent_access(self, data_buffer): """동시 접근 테스트""" # 동시에 데이터 추가하는 태스크들 async def add_data_task(start_idx, count): for i in range(count): await data_buffer.add({ "timestamp": datetime.now().isoformat(), "symbol": f"CONCURRENT{start_idx + i}", "price": 1000 + start_idx + i }) # 5개 태스크가 각각 20개씩 데이터 추가 tasks = [add_data_task(i * 20, 20) for i in range(5)] await asyncio.gather(*tasks) # 총 100개 데이터가 추가되었는지 확인 assert data_buffer.size() == 100 # 데이터 무결성 확인 all_data = await data_buffer.get_all() symbols = set(item["symbol"] for item in all_data) assert len(symbols) == 100 # 모든 심볼이 유니크 class TestWebSocketServer: """WebSocket 서버 테스트""" @pytest.fixture def server_config(self): """서버 설정""" return { "host": "localhost", "port": 8765, "max_connections": 100, "heartbeat_interval": 30, "message_queue_size": 1000, "compression": True, "authentication": True } @pytest.fixture def mock_websocket(self): """Mock WebSocket 연결""" ws = MagicMock() ws.id = "test_client_123" ws.remote_address = ("127.0.0.1", 12345) ws.send = AsyncMock() ws.recv = AsyncMock() ws.close = AsyncMock() ws.closed = False return ws @pytest.fixture def websocket_server(self, server_config): """WebSocket 서버 인스턴스""" return WebSocketServer(server_config) def test_server_initialization(self, websocket_server, server_config): """서버 초기화 테스트""" assert websocket_server.host == server_config["host"] assert websocket_server.port == server_config["port"] assert websocket_server.max_connections == server_config["max_connections"] assert websocket_server.heartbeat_interval == server_config["heartbeat_interval"] assert websocket_server.compression_enabled == server_config["compression"] assert websocket_server.authentication_enabled == server_config["authentication"] assert websocket_server.is_running == False assert websocket_server.connection_count == 0 @pytest.mark.asyncio async def test_server_start_stop(self, websocket_server): """서버 시작/중지 테스트""" # 서버 시작 start_task = asyncio.create_task(websocket_server.start()) await asyncio.sleep(0.1) # 서버가 시작될 시간 assert websocket_server.is_running == True # 서버 중지 await websocket_server.stop() assert websocket_server.is_running == False # 시작 태스크 정리 start_task.cancel() try: await start_task except asyncio.CancelledError: pass @pytest.mark.asyncio async def test_client_connection(self, websocket_server, mock_websocket): """클라이언트 연결 테스트""" # 클라이언트 연결 await websocket_server.handle_connection(mock_websocket) # 연결 상태 확인 assert websocket_server.connection_count == 1 assert websocket_server.has_connection(mock_websocket.id) # 연결 정보 확인 connection_info = websocket_server.get_connection_info(mock_websocket.id) assert connection_info is not None assert connection_info["id"] == mock_websocket.id assert connection_info["remote_address"] == mock_websocket.remote_address @pytest.mark.asyncio async def test_client_disconnection(self, websocket_server, mock_websocket): """클라이언트 연결 해제 테스트""" # 연결 후 해제 await websocket_server.handle_connection(mock_websocket) await websocket_server.disconnect_client(mock_websocket.id) # 연결 해제 확인 assert websocket_server.connection_count == 0 assert not websocket_server.has_connection(mock_websocket.id) # WebSocket close 호출 확인 mock_websocket.close.assert_called_once() @pytest.mark.asyncio async def test_broadcast_message(self, websocket_server): """브로드캐스트 메시지 테스트""" # 여러 클라이언트 연결 clients = [] for i in range(3): mock_ws = MagicMock() mock_ws.id = f"client_{i}" mock_ws.send = AsyncMock() mock_ws.closed = False clients.append(mock_ws) await websocket_server.handle_connection(mock_ws) # 브로드캐스트 메시지 test_message = { "type": "market_data", "data": {"symbol": "005930", "price": 75000} } sent_count = await websocket_server.broadcast(test_message) # 모든 클라이언트에게 전송되었는지 확인 assert sent_count == 3 for client in clients: client.send.assert_called_once() sent_message = client.send.call_args[0][0] assert "market_data" in sent_message @pytest.mark.asyncio async def test_unicast_message(self, websocket_server, mock_websocket): """유니캐스트 메시지 테스트""" # 클라이언트 연결 await websocket_server.handle_connection(mock_websocket) # 특정 클라이언트에게 메시지 전송 test_message = { "type": "personal_data", "data": {"user_id": "test_user", "balance": 1000000} } success = await websocket_server.send_to_client(mock_websocket.id, test_message) # 전송 성공 확인 assert success == True mock_websocket.send.assert_called_once() sent_message = mock_websocket.send.call_args[0][0] assert "personal_data" in sent_message @pytest.mark.asyncio async def test_subscription_management(self, websocket_server, mock_websocket): """구독 관리 테스트""" await websocket_server.handle_connection(mock_websocket) # 토픽 구독 topics = ["KOSPI", "005930", "000660"] for topic in topics: await websocket_server.subscribe_client(mock_websocket.id, topic) # 구독 상태 확인 subscriptions = websocket_server.get_client_subscriptions(mock_websocket.id) assert set(subscriptions) == set(topics) # 토픽 구독 해제 await websocket_server.unsubscribe_client(mock_websocket.id, "000660") updated_subscriptions = websocket_server.get_client_subscriptions(mock_websocket.id) assert "000660" not in updated_subscriptions assert len(updated_subscriptions) == 2 @pytest.mark.asyncio async def test_topic_based_broadcasting(self, websocket_server): """토픽 기반 브로드캐스팅 테스트""" # 다른 토픽을 구독하는 클라이언트들 client1 = MagicMock() client1.id = "client_kospi" client1.send = AsyncMock() client1.closed = False client2 = MagicMock() client2.id = "client_samsung" client2.send = AsyncMock() client2.closed = False await websocket_server.handle_connection(client1) await websocket_server.handle_connection(client2) # 구독 설정 await websocket_server.subscribe_client("client_kospi", "KOSPI") await websocket_server.subscribe_client("client_samsung", "005930") # KOSPI 토픽 브로드캐스트 kospi_message = { "type": "index_data", "topic": "KOSPI", "data": {"index": 2650, "change": 5.2} } sent_count = await websocket_server.broadcast_to_topic("KOSPI", kospi_message) # KOSPI 구독자에게만 전송되었는지 확인 assert sent_count == 1 client1.send.assert_called_once() client2.send.assert_not_called() @pytest.mark.asyncio async def test_heartbeat_mechanism(self, websocket_server, mock_websocket): """하트비트 메커니즘 테스트""" await websocket_server.handle_connection(mock_websocket) # 하트비트 시작 heartbeat_task = asyncio.create_task( websocket_server.start_heartbeat(mock_websocket.id) ) # 잠시 대기 후 하트비트 메시지 확인 await asyncio.sleep(0.1) # 하트비트 메시지가 전송되었는지 확인 (실제로는 간격에 따라) connection_info = websocket_server.get_connection_info(mock_websocket.id) assert "last_heartbeat" in connection_info # 하트비트 정리 heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass @pytest.mark.asyncio async def test_connection_limit(self, websocket_server): """연결 제한 테스트""" # 최대 연결 수 설정 websocket_server.max_connections = 2 # 최대 연결 수만큼 연결 clients = [] for i in range(2): mock_ws = MagicMock() mock_ws.id = f"client_{i}" mock_ws.send = AsyncMock() mock_ws.close = AsyncMock() mock_ws.closed = False clients.append(mock_ws) await websocket_server.handle_connection(mock_ws) assert websocket_server.connection_count == 2 # 추가 연결 시도 (제한 초과) extra_client = MagicMock() extra_client.id = "extra_client" extra_client.close = AsyncMock() extra_client.closed = False # 연결 거부되어야 함 with pytest.raises(ConnectionError): await websocket_server.handle_connection(extra_client) @pytest.mark.asyncio async def test_message_queuing(self, websocket_server, mock_websocket): """메시지 큐잉 테스트""" await websocket_server.handle_connection(mock_websocket) # 클라이언트를 일시적으로 사용 불가 상태로 설정 mock_websocket.send.side_effect = Exception("Connection lost") # 메시지 전송 시도 (큐에 저장되어야 함) test_message = {"type": "test", "data": "queued_message"} success = await websocket_server.send_to_client(mock_websocket.id, test_message) # 메시지가 큐에 저장되었는지 확인 queue_size = websocket_server.get_message_queue_size(mock_websocket.id) assert queue_size > 0 # 클라이언트 복구 후 큐된 메시지 전송 mock_websocket.send.side_effect = None # 정상 상태로 복구 await websocket_server.flush_message_queue(mock_websocket.id) # 큐가 비워졌는지 확인 queue_size_after = websocket_server.get_message_queue_size(mock_websocket.id) assert queue_size_after == 0 @pytest.mark.asyncio async def test_authentication(self, websocket_server, mock_websocket): """인증 테스트""" # 인증 메시지 auth_message = { "type": "auth", "token": "valid_jwt_token_123", "user_id": "test_user" } # 인증 처리 auth_result = await websocket_server.authenticate_client(mock_websocket.id, auth_message) # 인증 성공 확인 assert auth_result["success"] == True assert auth_result["user_id"] == "test_user" # 인증된 클라이언트 정보 확인 connection_info = websocket_server.get_connection_info(mock_websocket.id) assert connection_info["authenticated"] == True assert connection_info["user_id"] == "test_user" def test_server_stats(self, websocket_server): """서버 통계 테스트""" stats = websocket_server.get_server_stats() assert "total_connections" in stats assert "active_connections" in stats assert "messages_sent" in stats assert "messages_received" in stats assert "uptime" in stats assert "memory_usage" in stats assert stats["total_connections"] >= 0 assert stats["active_connections"] >= 0 class TestMessageBroker: """메시지 브로커 테스트""" @pytest.fixture def broker_config(self): """브로커 설정""" return { "queue_size": 10000, "batch_size": 100, "flush_interval": 1.0, "persistence": True, "priority_levels": 3 } @pytest.fixture def message_broker(self, broker_config): """메시지 브로커 인스턴스""" return MessageBroker(broker_config) def test_broker_initialization(self, message_broker, broker_config): """브로커 초기화 테스트""" assert message_broker.queue_size == broker_config["queue_size"] assert message_broker.batch_size == broker_config["batch_size"] assert message_broker.flush_interval == broker_config["flush_interval"] assert message_broker.persistence_enabled == broker_config["persistence"] assert message_broker.priority_levels == broker_config["priority_levels"] @pytest.mark.asyncio async def test_message_publishing(self, message_broker): """메시지 발행 테스트""" test_message = { "type": "market_data", "symbol": "005930", "price": 75000, "timestamp": datetime.now().isoformat() } # 메시지 발행 message_id = await message_broker.publish("market_updates", test_message) assert message_id is not None assert len(message_id) > 0 # 큐 크기 확인 queue_stats = message_broker.get_queue_stats("market_updates") assert queue_stats["pending_messages"] == 1 @pytest.mark.asyncio async def test_message_subscription(self, message_broker): """메시지 구독 테스트""" subscriber_id = "test_subscriber" topic = "price_alerts" # 구독 등록 await message_broker.subscribe(subscriber_id, topic) # 구독 상태 확인 subscriptions = message_broker.get_subscriptions(subscriber_id) assert topic in subscriptions # 토픽에 메시지 발행 test_message = {"alert": "Price target reached", "symbol": "005930"} await message_broker.publish(topic, test_message) # 구독자가 메시지를 받을 수 있는지 확인 messages = await message_broker.consume(subscriber_id, max_messages=1) assert len(messages) == 1 assert messages[0]["data"]["alert"] == "Price target reached" @pytest.mark.asyncio async def test_priority_message_handling(self, message_broker): """우선순위 메시지 처리 테스트""" topic = "system_alerts" # 다양한 우선순위의 메시지 발행 messages = [ {"content": "Low priority", "priority": 0}, {"content": "High priority", "priority": 2}, {"content": "Medium priority", "priority": 1} ] for msg in messages: await message_broker.publish(topic, msg, priority=msg["priority"]) # 구독자 등록 subscriber_id = "priority_subscriber" await message_broker.subscribe(subscriber_id, topic) # 메시지 소비 (우선순위 순서로 와야 함) consumed_messages = await message_broker.consume(subscriber_id, max_messages=3) # 높은 우선순위 메시지가 먼저 와야 함 priorities = [msg["data"]["priority"] for msg in consumed_messages] assert priorities == [2, 1, 0] # 높은 우선순위부터 @pytest.mark.asyncio async def test_batch_message_processing(self, message_broker): """배치 메시지 처리 테스트""" topic = "batch_data" subscriber_id = "batch_subscriber" await message_broker.subscribe(subscriber_id, topic) # 여러 메시지 발행 for i in range(10): message = {"batch_id": i, "data": f"batch_data_{i}"} await message_broker.publish(topic, message) # 배치로 메시지 소비 batch_messages = await message_broker.consume(subscriber_id, max_messages=5) assert len(batch_messages) == 5 assert batch_messages[0]["data"]["batch_id"] == 0 assert batch_messages[4]["data"]["batch_id"] == 4 # 나머지 메시지 소비 remaining_messages = await message_broker.consume(subscriber_id, max_messages=10) assert len(remaining_messages) == 5 @pytest.mark.asyncio async def test_message_persistence(self, message_broker): """메시지 지속성 테스트""" topic = "persistent_data" # 지속성 메시지 발행 persistent_message = { "id": "persist_001", "critical_data": "This must survive restarts" } message_id = await message_broker.publish( topic, persistent_message, persistent=True ) # 브로커 재시작 시뮬레이션 await message_broker.save_state() # 새 브로커 인스턴스로 상태 복원 new_config = message_broker.config.copy() new_broker = MessageBroker(new_config) await new_broker.restore_state() # 구독자 등록 후 지속성 메시지 확인 subscriber_id = "persistent_subscriber" await new_broker.subscribe(subscriber_id, topic) messages = await new_broker.consume(subscriber_id, max_messages=1) assert len(messages) == 1 assert messages[0]["data"]["id"] == "persist_001" @pytest.mark.asyncio async def test_dead_letter_queue(self, message_broker): """데드 레터 큐 테스트""" topic = "problematic_topic" subscriber_id = "failing_subscriber" await message_broker.subscribe(subscriber_id, topic) # 처리 실패를 시뮬레이션할 메시지 failing_message = {"will_fail": True, "data": "problematic_data"} await message_broker.publish(topic, failing_message) # 메시지 소비 시도 (실패 시뮬레이션) with patch.object(message_broker, '_process_message', side_effect=Exception("Processing failed")): messages = await message_broker.consume(subscriber_id, max_messages=1) # 데드 레터 큐 확인 dlq_stats = message_broker.get_dead_letter_queue_stats() assert dlq_stats["message_count"] > 0 # 데드 레터 큐에서 메시지 조회 dlq_messages = await message_broker.get_dead_letter_messages(max_messages=1) assert len(dlq_messages) == 1 assert dlq_messages[0]["data"]["will_fail"] == True def test_broker_statistics(self, message_broker): """브로커 통계 테스트""" stats = message_broker.get_broker_stats() assert "total_published" in stats assert "total_consumed" in stats assert "active_subscribers" in stats assert "queue_sizes" in stats assert "throughput" in stats assert "error_rate" in stats assert stats["total_published"] >= 0 assert stats["total_consumed"] >= 0 class TestConnectionManager: """연결 관리자 테스트""" @pytest.fixture def connection_manager(self): """연결 관리자 인스턴스""" config = { "max_connections_per_ip": 10, "rate_limit_per_minute": 1000, "cleanup_interval": 60 } return ConnectionManager(config) @pytest.fixture def mock_connection(self): """Mock 연결""" connection = MagicMock() connection.id = "conn_123" connection.remote_address = ("192.168.1.100", 12345) connection.user_agent = "TestClient/1.0" connection.created_at = datetime.now() return connection def test_connection_registration(self, connection_manager, mock_connection): """연결 등록 테스트""" # 연결 등록 connection_manager.register_connection(mock_connection) # 등록 확인 assert connection_manager.get_connection_count() == 1 assert connection_manager.has_connection(mock_connection.id) # 연결 정보 확인 conn_info = connection_manager.get_connection_info(mock_connection.id) assert conn_info["id"] == mock_connection.id assert conn_info["remote_address"] == mock_connection.remote_address def test_connection_unregistration(self, connection_manager, mock_connection): """연결 해제 테스트""" # 연결 등록 후 해제 connection_manager.register_connection(mock_connection) connection_manager.unregister_connection(mock_connection.id) # 해제 확인 assert connection_manager.get_connection_count() == 0 assert not connection_manager.has_connection(mock_connection.id) def test_ip_based_connection_limit(self, connection_manager): """IP 기반 연결 제한 테스트""" # 동일 IP에서 여러 연결 same_ip = "192.168.1.100" for i in range(connection_manager.max_connections_per_ip): mock_conn = MagicMock() mock_conn.id = f"conn_{i}" mock_conn.remote_address = (same_ip, 12345 + i) connection_manager.register_connection(mock_conn) # 제한 내에서는 정상 등록 assert connection_manager.get_connection_count() == connection_manager.max_connections_per_ip # 제한 초과 시 예외 발생 excess_conn = MagicMock() excess_conn.id = "excess_conn" excess_conn.remote_address = (same_ip, 99999) with pytest.raises(ConnectionError): connection_manager.register_connection(excess_conn) def test_rate_limiting(self, connection_manager, mock_connection): """속도 제한 테스트""" connection_manager.register_connection(mock_connection) # 정상 요청 (제한 내) for i in range(10): allowed = connection_manager.check_rate_limit(mock_connection.id) assert allowed == True # 제한 초과 시뮬레이션 (매우 많은 요청) # 실제로는 시간 기반으로 동작하지만 테스트에서는 직접 카운터 조작 connection_manager._set_request_count(mock_connection.id, 1001) rate_limited = connection_manager.check_rate_limit(mock_connection.id) assert rate_limited == False def test_connection_statistics(self, connection_manager): """연결 통계 테스트""" # 여러 연결 등록 for i in range(5): mock_conn = MagicMock() mock_conn.id = f"stats_conn_{i}" mock_conn.remote_address = (f"192.168.1.{100 + i}", 12345) connection_manager.register_connection(mock_conn) # 통계 확인 stats = connection_manager.get_connection_stats() assert "total_connections" in stats assert "connections_by_ip" in stats assert "average_connection_duration" in stats assert "request_rates" in stats assert stats["total_connections"] == 5 assert len(stats["connections_by_ip"]) == 5 def test_connection_cleanup(self, connection_manager): """연결 정리 테스트""" # 오래된 연결 시뮬레이션 old_connection = MagicMock() old_connection.id = "old_conn" old_connection.remote_address = ("192.168.1.200", 12345) old_connection.created_at = datetime.now() - timedelta(hours=2) old_connection.last_activity = datetime.now() - timedelta(hours=1) connection_manager.register_connection(old_connection) # 정리 실행 cleaned_count = connection_manager.cleanup_stale_connections(max_idle_minutes=30) # 오래된 연결이 정리되었는지 확인 assert cleaned_count >= 0 # 실제 정리 여부는 구현에 따라 다름

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-market-statistics'

If you have feedback or need assistance with the MCP directory API, please join our Discord server