Skip to main content
Glama
benchmark_streaming.py12.9 kB
""" Performance benchmarks for JSONL streaming components. """ import pytest import asyncio import time import json from typing import List, AsyncIterator import statistics from shannon_mcp.streaming.reader import JSONLStreamReader from shannon_mcp.streaming.parser import JSONLParser from shannon_mcp.streaming.processor import StreamProcessor from tests.fixtures.streaming_fixtures import StreamingFixtures from tests.utils.performance import PerformanceTimer, PerformanceMonitor, async_benchmark class BenchmarkStreamingPerformance: """Benchmark JSONL streaming performance.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_stream_reader_throughput(self, benchmark, temp_dir): """Benchmark stream reader throughput.""" # Create test data message_counts = [100, 1000, 10000] results = {} for count in message_counts: messages = StreamingFixtures.create_session_stream("bench-session", count) async def stream_generator() -> AsyncIterator[bytes]: for msg in messages: yield msg.encode() # Benchmark reading async def read_stream(): reader = JSONLStreamReader() messages_read = 0 async for _ in reader.read_stream(stream_generator()): messages_read += 1 return messages_read # Run benchmark start_time = time.perf_counter() messages_processed = await read_stream() duration = time.perf_counter() - start_time throughput = messages_processed / duration results[count] = { "messages": messages_processed, "duration_seconds": duration, "messages_per_second": throughput } # Assert performance targets assert results[100]["messages_per_second"] > 10000 # >10k msg/s for small batches assert results[1000]["messages_per_second"] > 5000 # >5k msg/s for medium batches assert results[10000]["messages_per_second"] > 1000 # >1k msg/s for large batches return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_parser_performance(self, benchmark): """Benchmark JSONL parser performance.""" parser = JSONLParser() monitor = PerformanceMonitor() # Test different message sizes sizes = [100, 1000, 10000] # bytes for size in sizes: # Create message large_data = "x" * size message = json.dumps({"type": "test", "data": large_data}) + '\n' # Benchmark parsing with PerformanceTimer(f"parse_{size}b") as timer: iterations = 10000 for _ in range(iterations): parser.add_data(message) list(parser.parse()) monitor.add_measurement(timer.metrics) # Check performance summary = monitor.get_summary() assert summary["duration"]["avg"] < 0.1 # Average under 100ms @pytest.mark.benchmark @pytest.mark.asyncio async def test_backpressure_handling(self, benchmark): """Benchmark backpressure handling performance.""" # Create high-volume stream messages = StreamingFixtures.create_backpressure_stream( message_count=5000, message_size=1024 ) async def fast_producer() -> AsyncIterator[bytes]: """Produce messages as fast as possible.""" for msg in messages: yield msg.encode() # Test with slow consumer reader = JSONLStreamReader(buffer_size=100) start_time = time.perf_counter() processed = 0 async for message in reader.read_stream(fast_producer()): processed += 1 # Simulate slow processing await asyncio.sleep(0.0001) duration = time.perf_counter() - start_time # Should handle backpressure without memory explosion assert processed == len(messages) assert duration < 10.0 # Should complete within 10 seconds @pytest.mark.benchmark @pytest.mark.asyncio async def test_concurrent_streams(self, benchmark): """Benchmark concurrent stream processing.""" # Create multiple streams stream_count = 10 messages_per_stream = 100 streams = [] for i in range(stream_count): messages = StreamingFixtures.create_session_stream( f"concurrent-{i}", messages_per_stream ) streams.append(messages) async def process_stream(messages: List[str]) -> int: """Process a single stream.""" reader = JSONLStreamReader() count = 0 async def generator(): for msg in messages: yield msg.encode() async for _ in reader.read_stream(generator()): count += 1 return count # Process all streams concurrently start_time = time.perf_counter() results = await asyncio.gather(*[ process_stream(stream) for stream in streams ]) duration = time.perf_counter() - start_time total_messages = sum(results) throughput = total_messages / duration assert total_messages == stream_count * messages_per_stream assert throughput > 1000 # >1k messages/second total @pytest.mark.benchmark @pytest.mark.asyncio async def test_large_message_performance(self, benchmark): """Benchmark large message handling.""" sizes_kb = [10, 100, 1000] # 10KB, 100KB, 1MB results = {} for size_kb in sizes_kb: message = StreamingFixtures.create_large_message(size_kb) reader = JSONLStreamReader() parser = JSONLParser() # Benchmark processing start_time = time.perf_counter() async def generator(): yield message.encode() processed = False async for raw_msg in reader.read_stream(generator()): # Parse the message parser.add_data(raw_msg) for parsed in parser.parse(): processed = True break duration = time.perf_counter() - start_time results[f"{size_kb}KB"] = { "duration_seconds": duration, "throughput_MB_per_sec": (size_kb / 1024) / duration } assert processed # Large messages should still process efficiently assert results["10KB"]["duration_seconds"] < 0.01 assert results["100KB"]["duration_seconds"] < 0.1 assert results["1000KB"]["duration_seconds"] < 1.0 return results class BenchmarkStreamProcessing: """Benchmark end-to-end stream processing.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_full_pipeline_performance(self, benchmark, temp_dir): """Benchmark full streaming pipeline.""" # Create processor processor = StreamProcessor( metrics_dir=temp_dir / "metrics", checkpoint_dir=temp_dir / "checkpoints" ) await processor.initialize() # Test different session sizes session_sizes = [50, 500, 5000] results = {} for size in session_sizes: messages = StreamingFixtures.create_session_stream( f"pipeline-{size}", size ) async def mock_stream() -> AsyncIterator[bytes]: for msg in messages: yield msg.encode() # Benchmark full processing start_time = time.perf_counter() processed_count = 0 async for result in processor.process_stream(mock_stream()): processed_count += 1 duration = time.perf_counter() - start_time results[size] = { "messages": processed_count, "duration_seconds": duration, "messages_per_second": processed_count / duration } await processor.close() # Check performance targets assert results[50]["messages_per_second"] > 500 assert results[500]["messages_per_second"] > 200 assert results[5000]["messages_per_second"] > 100 return results @pytest.mark.benchmark @pytest.mark.asyncio async def test_metric_extraction_overhead(self, benchmark, temp_dir): """Benchmark metric extraction overhead.""" processor = StreamProcessor( metrics_dir=temp_dir / "metrics", checkpoint_dir=temp_dir / "checkpoints" ) await processor.initialize() # Create stream with various metric types messages = [] for i in range(1000): msg_type = ["tool_use", "token_usage", "checkpoint_create"][i % 3] msg = { "type": msg_type, "timestamp": datetime.now(timezone.utc).isoformat(), "data": {"index": i} } messages.append(json.dumps(msg) + '\n') # Benchmark with metrics enabled async def with_metrics(): async def generator(): for msg in messages: yield msg.encode() count = 0 async for _ in processor.process_stream(generator()): count += 1 return count # Benchmark without metrics (mock) async def without_metrics(): reader = JSONLStreamReader() async def generator(): for msg in messages: yield msg.encode() count = 0 async for _ in reader.read_stream(generator()): count += 1 return count # Compare performance with_metrics_time = await self._time_async(with_metrics) without_metrics_time = await self._time_async(without_metrics) overhead_percent = ((with_metrics_time - without_metrics_time) / without_metrics_time) * 100 await processor.close() # Metric extraction should add minimal overhead assert overhead_percent < 20 # Less than 20% overhead async def _time_async(self, coro): """Time an async coroutine.""" start = time.perf_counter() await coro() return time.perf_counter() - start class BenchmarkMemoryUsage: """Benchmark memory usage during streaming.""" @pytest.mark.benchmark @pytest.mark.asyncio async def test_memory_efficiency(self, benchmark): """Test memory usage remains bounded.""" import psutil import gc process = psutil.Process() # Get baseline memory gc.collect() baseline_memory = process.memory_info().rss / (1024 * 1024) # MB # Process large stream message_count = 100000 messages = StreamingFixtures.create_backpressure_stream( message_count=message_count, message_size=100 ) reader = JSONLStreamReader(buffer_size=1000) async def generator(): for msg in messages: yield msg.encode() # Track peak memory peak_memory = baseline_memory messages_processed = 0 async for _ in reader.read_stream(generator()): messages_processed += 1 if messages_processed % 10000 == 0: current_memory = process.memory_info().rss / (1024 * 1024) peak_memory = max(peak_memory, current_memory) # Final memory gc.collect() final_memory = process.memory_info().rss / (1024 * 1024) memory_increase = peak_memory - baseline_memory memory_leaked = final_memory - baseline_memory # Memory should remain bounded assert memory_increase < 100 # Less than 100MB increase assert memory_leaked < 10 # Less than 10MB leaked after GC

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server