Skip to main content
Glama
benchmark_transport.py20.1 kB
""" Performance benchmarks for Transport Layer. """ import pytest import asyncio import time import json from typing import List, Dict, Any import statistics import random from unittest.mock import AsyncMock, Mock from shannon_mcp.transport.manager import TransportManager from shannon_mcp.transport.session import TransportSession from shannon_mcp.transport.protocol import TransportProtocol from tests.fixtures.transport_fixtures import TransportFixtures from tests.utils.performance import PerformanceTimer, PerformanceMonitor class BenchmarkTransportConnection: """Benchmark transport connection performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_connection_establishment_performance(self, benchmark): """Benchmark establishing transport connections.""" manager = TransportManager() # Test different connection types connection_types = [ ("stdio", {"type": "stdio"}), ("websocket", {"type": "websocket", "host": "localhost", "port": 8080}), ("tcp", {"type": "tcp", "host": "localhost", "port": 9090}), ("pipe", {"type": "pipe", "path": "/tmp/test.pipe"}) ] results = {} for conn_type, config in connection_types: connection_times = [] # Mock connection establishment with patch.object(manager, '_create_connection') as mock_create: async def mock_connection(cfg): # Simulate connection time if cfg["type"] == "stdio": await asyncio.sleep(0.001) elif cfg["type"] == "websocket": await asyncio.sleep(0.01) elif cfg["type"] == "tcp": await asyncio.sleep(0.005) else: await asyncio.sleep(0.002) return TransportFixtures.create_transport_session( session_id=f"{conn_type}-session", transport_type=cfg["type"] ) mock_create.side_effect = mock_connection for i in range(50): start = time.perf_counter() session = await manager.create_session( session_id=f"{conn_type}-{i}", config=config ) duration = time.perf_counter() - start connection_times.append(duration) # Cleanup await manager.close_session(session.id) avg_time = statistics.mean(connection_times) p95_time = statistics.quantiles(connection_times, n=20)[18] results[conn_type] = { "avg_connection_time_ms": avg_time * 1000, "p95_connection_time_ms": p95_time * 1000, "connections_per_second": 1 / avg_time } # Connection establishment should be fast assert results["stdio"]["avg_connection_time_ms"] < 10 assert results["tcp"]["avg_connection_time_ms"] < 20 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_concurrent_connections_performance(self, benchmark): """Benchmark concurrent connection handling.""" manager = TransportManager() # Test different concurrency levels concurrency_levels = [10, 50, 100, 200] results = {} with patch.object(manager, '_create_connection') as mock_create: async def mock_connection(cfg): await asyncio.sleep(0.01) # Simulate connection time return TransportFixtures.create_transport_session() mock_create.side_effect = mock_connection for concurrent_count in concurrency_levels: start = time.perf_counter() # Create connections concurrently tasks = [] for i in range(concurrent_count): task = manager.create_session( session_id=f"concurrent-{i}", config={"type": "stdio"} ) tasks.append(task) sessions = await asyncio.gather(*tasks) duration = time.perf_counter() - start # Cleanup cleanup_tasks = [manager.close_session(s.id) for s in sessions] await asyncio.gather(*cleanup_tasks) results[f"{concurrent_count}_concurrent"] = { "connections": concurrent_count, "total_time": duration, "connections_per_second": concurrent_count / duration, "avg_time_per_connection_ms": (duration / concurrent_count) * 1000 } # Should handle concurrency well assert results["100_concurrent"]["connections_per_second"] > 500 return results class BenchmarkTransportMessaging: """Benchmark transport message handling.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_message_throughput_performance(self, benchmark): """Benchmark message sending throughput.""" protocol = TransportProtocol() # Test different message sizes message_sizes = [ ("small", 100), # 100 bytes ("medium", 1024), # 1 KB ("large", 10240), # 10 KB ("xlarge", 102400) # 100 KB ] results = {} for size_name, size_bytes in message_sizes: # Create test messages messages = [] for i in range(1000): message = TransportFixtures.create_transport_message( size=size_bytes, message_type="data" ) messages.append(message) # Mock transport mock_transport = AsyncMock() sent_data = [] async def mock_send(data): sent_data.append(data) await asyncio.sleep(0.0001) # Minimal delay mock_transport.send = mock_send # Benchmark sending start = time.perf_counter() for message in messages: encoded = await protocol.encode_message(message) await mock_transport.send(encoded) duration = time.perf_counter() - start total_bytes = len(messages) * size_bytes throughput_mb_s = (total_bytes / (1024 * 1024)) / duration results[size_name] = { "message_size": size_bytes, "message_count": len(messages), "duration": duration, "messages_per_second": len(messages) / duration, "throughput_mb_s": throughput_mb_s } # Should achieve good throughput assert results["small"]["messages_per_second"] > 5000 assert results["large"]["throughput_mb_s"] > 50 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_message_parsing_performance(self, benchmark): """Benchmark message parsing performance.""" protocol = TransportProtocol() # Create encoded messages message_types = [ ("request", {"method": "test", "params": {"data": "x" * 100}}), ("response", {"result": {"data": "x" * 1000}}), ("notification", {"event": "update", "data": {"values": list(range(100))}}), ("error", {"code": -32000, "message": "Error", "data": {"details": "x" * 500}}) ] results = {} for msg_type, content in message_types: # Create encoded messages encoded_messages = [] for i in range(1000): message = { "jsonrpc": "2.0", "id": i, **content } encoded = json.dumps(message).encode() encoded_messages.append(encoded) # Benchmark parsing parse_times = [] for _ in range(10): start = time.perf_counter() for encoded in encoded_messages: parsed = await protocol.decode_message(encoded) duration = time.perf_counter() - start parse_times.append(duration) avg_time = statistics.mean(parse_times) results[msg_type] = { "message_count": len(encoded_messages), "avg_parse_time": avg_time, "messages_per_second": len(encoded_messages) / avg_time, "avg_time_per_message_us": (avg_time / len(encoded_messages)) * 1_000_000 } # Parsing should be fast assert all(r["messages_per_second"] > 10000 for r in results.values()) return results class BenchmarkTransportStreaming: """Benchmark transport streaming performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_stream_performance(self, benchmark): """Benchmark streaming data transfer.""" manager = TransportManager() # Test different streaming patterns patterns = [ ("continuous", 0.001, 1000), # 1ms interval, 1000 messages ("burst", 0, 5000), # No delay, 5000 messages ("intermittent", 0.01, 500), # 10ms interval, 500 messages ("heavy", 0, 10000) # No delay, 10000 messages ] results = {} for pattern_name, interval, message_count in patterns: # Create mock session session = TransportFixtures.create_transport_session() # Mock streaming received_messages = [] async def mock_stream(): for i in range(message_count): message = TransportFixtures.create_transport_message( size=random.randint(100, 1000), message_type="stream" ) received_messages.append(message) if interval > 0: await asyncio.sleep(interval) yield message session.stream = mock_stream # Benchmark streaming start = time.perf_counter() message_count_received = 0 async for message in session.stream(): message_count_received += 1 duration = time.perf_counter() - start # Calculate throughput total_bytes = sum(len(m.get("data", "")) for m in received_messages) results[pattern_name] = { "interval": interval, "messages": message_count_received, "duration": duration, "messages_per_second": message_count_received / duration, "throughput_mb_s": (total_bytes / (1024 * 1024)) / duration } # Streaming should be efficient assert results["burst"]["messages_per_second"] > 10000 assert results["continuous"]["messages_per_second"] > 500 return results class BenchmarkTransportMultiplexing: """Benchmark transport multiplexing.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_multiplexing_performance(self, benchmark): """Benchmark handling multiple channels.""" manager = TransportManager() # Test different channel configurations configurations = [ ("few_channels", 5, 100), # 5 channels, 100 msg each ("many_channels", 50, 20), # 50 channels, 20 msg each ("balanced", 20, 50), # 20 channels, 50 msg each ("extreme", 100, 10) # 100 channels, 10 msg each ] results = {} for config_name, channel_count, messages_per_channel in configurations: # Create channels channels = [] for i in range(channel_count): channel = TransportFixtures.create_transport_channel( channel_id=f"{config_name}_channel_{i}" ) channels.append(channel) # Benchmark multiplexed communication start = time.perf_counter() # Send messages across channels tasks = [] for channel in channels: async def send_channel_messages(ch=channel): for j in range(messages_per_channel): message = TransportFixtures.create_transport_message( channel_id=ch.id, sequence=j ) await ch.send(message) tasks.append(send_channel_messages()) await asyncio.gather(*tasks) duration = time.perf_counter() - start total_messages = channel_count * messages_per_channel results[config_name] = { "channels": channel_count, "messages_per_channel": messages_per_channel, "total_messages": total_messages, "duration": duration, "messages_per_second": total_messages / duration, "avg_channel_throughput": (total_messages / channel_count) / duration } # Multiplexing should scale well assert results["many_channels"]["messages_per_second"] > 1000 assert results["extreme"]["messages_per_second"] > 500 return results class BenchmarkTransportReconnection: """Benchmark transport reconnection handling.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_reconnection_performance(self, benchmark): """Benchmark reconnection speed and recovery.""" manager = TransportManager() # Test different failure scenarios scenarios = [ ("clean_disconnect", 0.01), # 10ms reconnect ("network_failure", 0.05), # 50ms reconnect ("server_restart", 0.1), # 100ms reconnect ("intermittent", None) # Random delays ] results = {} for scenario_name, reconnect_delay in scenarios: reconnection_times = [] # Create session session = TransportFixtures.create_transport_session() # Simulate disconnections and reconnections for i in range(20): # Disconnect session.connected = False disconnect_time = time.perf_counter() # Simulate reconnection delay if reconnect_delay is None: await asyncio.sleep(random.uniform(0.01, 0.1)) else: await asyncio.sleep(reconnect_delay) # Reconnect start = time.perf_counter() # Mock reconnection process await session.reconnect() session.connected = True # Send test message to verify connection test_message = TransportFixtures.create_transport_message() await session.send(test_message) reconnect_duration = time.perf_counter() - start reconnection_times.append(reconnect_duration) avg_time = statistics.mean(reconnection_times) p95_time = statistics.quantiles(reconnection_times, n=20)[18] results[scenario_name] = { "avg_reconnect_time_ms": avg_time * 1000, "p95_reconnect_time_ms": p95_time * 1000, "reconnects_per_second": 1 / avg_time } # Reconnection should be fast assert results["clean_disconnect"]["avg_reconnect_time_ms"] < 50 assert results["network_failure"]["avg_reconnect_time_ms"] < 100 return results class BenchmarkTransportBuffering: """Benchmark transport buffering and flow control.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_buffer_performance(self, benchmark): """Benchmark message buffering under load.""" protocol = TransportProtocol() # Test different buffer scenarios scenarios = [ ("normal_load", 1000, 0.001), # 1000 msg, 1ms processing ("high_load", 5000, 0.0001), # 5000 msg, 0.1ms processing ("burst_load", 10000, 0), # 10000 msg, instant ("slow_consumer", 1000, 0.01) # 1000 msg, 10ms processing ] results = {} for scenario_name, message_count, processing_delay in scenarios: # Create buffer buffer = asyncio.Queue(maxsize=1000) # Producer task async def producer(): for i in range(message_count): message = TransportFixtures.create_transport_message( sequence=i, size=random.randint(100, 1000) ) try: await asyncio.wait_for( buffer.put(message), timeout=0.1 ) except asyncio.TimeoutError: # Buffer full pass # Consumer task consumed = [] async def consumer(): while True: try: message = await asyncio.wait_for( buffer.get(), timeout=0.1 ) consumed.append(message) if processing_delay > 0: await asyncio.sleep(processing_delay) except asyncio.TimeoutError: # No more messages break # Run producer and consumer start = time.perf_counter() await asyncio.gather( producer(), consumer() ) duration = time.perf_counter() - start results[scenario_name] = { "messages_sent": message_count, "messages_consumed": len(consumed), "duration": duration, "throughput": len(consumed) / duration, "drop_rate": (message_count - len(consumed)) / message_count * 100 } # Buffering should handle load well assert results["normal_load"]["drop_rate"] < 1 assert results["high_load"]["throughput"] > 1000 return results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server