Skip to main content
Glama

MCP Git Server

by MementoRC
test_performance.py15.5 kB
"""Performance benchmark tests for MCP Git Server.""" import asyncio import logging import random import time import uuid import pytest from mcp_server_git.models.enhanced_validation import process_notification_safely from mcp_server_git.optimizations import ( CPUProfiler, MemoryLeakDetector, PerformanceRegressionMonitor, clear_validation_cache, disable_validation_cache, enable_validation_cache, get_validation_cache_stats, ) logger = logging.getLogger(__name__) # Common test parameters THROUGHPUT_TEST_DURATION = 5 # seconds MEMORY_TEST_OPERATIONS = 10000 CPU_PROFILE_OPERATIONS = 5000 VALIDATION_CACHE_TEST_MESSAGES = 10000 # Global regression monitor for all tests regression_monitor = PerformanceRegressionMonitor() @pytest.fixture(autouse=True) def setup_and_teardown_cache(): """Fixture to ensure cache is cleared and enabled/disabled as needed for each test.""" clear_validation_cache() enable_validation_cache() # Default to enabled for most tests yield clear_validation_cache() enable_validation_cache() # Reset to enabled after test @pytest.mark.benchmark @pytest.mark.asyncio async def test_message_throughput_ping(benchmark_session_manager, mock_client): """Benchmark: Measure ping message throughput.""" await mock_client.connect() await benchmark_session_manager.create_session( mock_client.session_id, user="benchmark_user" ) messages_sent = 0 start_time = time.time() end_time = start_time + THROUGHPUT_TEST_DURATION with CPUProfiler("ping_throughput"): while time.time() < end_time: await mock_client.ping() messages_sent += 1 await asyncio.sleep(0) # Yield control duration = time.time() - start_time throughput = messages_sent / duration logger.info( f"Ping Throughput: {throughput:.2f} messages/sec ({messages_sent} messages in {duration:.2f}s)" ) regression_monitor.check("ping_throughput", 1 / throughput, threshold=1.2) assert throughput > 100, "Ping throughput is too low" @pytest.mark.benchmark @pytest.mark.asyncio async def test_message_throughput_operations(benchmark_session_manager, mock_client): """Benchmark: Measure start/cancel operation throughput.""" await mock_client.connect() await benchmark_session_manager.create_session( mock_client.session_id, user="benchmark_user" ) operations_completed = 0 start_time = time.time() end_time = start_time + THROUGHPUT_TEST_DURATION with CPUProfiler("operation_throughput"): while time.time() < end_time: op_id = str(uuid.uuid4()) await mock_client.start_operation(op_id) await mock_client.cancel_operation(op_id) operations_completed += 1 await asyncio.sleep(0) # Yield control duration = time.time() - start_time throughput = operations_completed / duration logger.info( f"Operation Throughput: {throughput:.2f} operations/sec ({operations_completed} operations in {duration:.2f}s)" ) regression_monitor.check("operation_throughput", 1 / throughput, threshold=1.2) assert throughput > 50, "Operation throughput is too low" @pytest.mark.benchmark @pytest.mark.asyncio async def test_validation_caching_effectiveness(memory_monitor): """Benchmark: Test validation caching performance improvement and regression.""" # Test data: mix of valid and repeated messages test_messages = [] for i in range(VALIDATION_CACHE_TEST_MESSAGES): if i % 10 == 0: # Repeated message that should benefit from caching test_messages.append( { "jsonrpc": "2.0", "method": "notifications/cancelled", "params": { "requestId": "repeated_request", "reason": "Test cancellation", }, } ) else: # Unique message test_messages.append( { "jsonrpc": "2.0", "method": "notifications/cancelled", "params": { "requestId": f"request_{i}", "reason": f"Cancellation {i}", }, } ) # Test without cache disable_validation_cache() clear_validation_cache() memory_monitor.take_sample("no_cache_start") start_time = time.time() for msg in test_messages: result = process_notification_safely(msg) assert result.is_valid, "Validation should succeed" no_cache_duration = time.time() - start_time memory_monitor.take_sample("no_cache_end") # Test with cache enable_validation_cache() clear_validation_cache() memory_monitor.take_sample("with_cache_start") start_time = time.time() for msg in test_messages: result = process_notification_safely(msg) assert result.is_valid, "Validation should succeed" with_cache_duration = time.time() - start_time memory_monitor.take_sample("with_cache_end") cache_stats = get_validation_cache_stats() logger.info(f"No Cache Duration: {no_cache_duration:.4f}s") logger.info(f"With Cache Duration: {with_cache_duration:.4f}s") logger.info(f"Cache Stats: {cache_stats}") # Cache should provide some improvement for repeated messages performance_ratio = ( no_cache_duration / with_cache_duration if with_cache_duration > 0 else 1.0 ) logger.info(f"Performance Improvement: {performance_ratio:.2f}x") # Performance regression detection regression_monitor.check( "validation_cache_no_cache", no_cache_duration, threshold=1.5 ) regression_monitor.check( "validation_cache_with_cache", with_cache_duration, threshold=1.5 ) # Assertions - cache should show activity and not break functionality assert cache_stats.get("hits", 0) + cache_stats.get("misses", 0) > 0, ( "Cache should show some activity" ) # Allow cache overhead for small operations, focus on correctness assert performance_ratio >= 0.1, "Cache should not make things 10x slower" assert with_cache_duration < 1.0, ( "Cached operations should complete in reasonable time" ) # Memory leak detection memory_growth = memory_monitor.get_memory_growth() logger.info(f"Memory growth during validation caching: {memory_growth:.2f} MB") assert memory_growth < 5, "Memory growth during validation caching is too high" @pytest.mark.benchmark @pytest.mark.ci_skip # Too intensive for CI @pytest.mark.asyncio async def test_realistic_mixed_workload_performance( benchmark_session_manager, memory_monitor ): """Benchmark: Test realistic mixed workload performance and memory leaks.""" client_count = 5 test_duration_seconds = 10 # Define MockMCPClient inline for this test to avoid import issues class MockMCPClient: def __init__(self, client_id): self.client_id = client_id self.connected = False self.session_id = None self.message_count = 0 async def connect(self): self.connected = True self.session_id = str(uuid.uuid4()) async def disconnect(self): self.connected = False async def ping(self): if not self.connected: raise RuntimeError("Client not connected") self.message_count += 1 return {"type": "pong", "id": str(uuid.uuid4())} async def start_operation(self, operation_id): if not self.connected: raise RuntimeError("Client not connected") self.message_count += 1 return { "type": "operation_started", "id": str(uuid.uuid4()), "operation_id": operation_id, } async def cancel_operation(self, operation_id): if not self.connected: raise RuntimeError("Client not connected") self.message_count += 1 return { "type": "operation_cancelled", "id": str(uuid.uuid4()), "operation_id": operation_id, } async def send_batch_messages(self, count): results = [] for _ in range(count): result = await self.ping() results.append(result) return results # Create multiple clients clients = [MockMCPClient(f"perf_client_{i}") for i in range(client_count)] sessions = [] for i, client in enumerate(clients): await client.connect() session = await benchmark_session_manager.create_session( client.session_id, user=f"perf_user_{i}" ) sessions.append(session) total_messages_sent = 0 total_errors = 0 memory_monitor.take_sample("mixed_workload_start") leak_detector = MemoryLeakDetector() leak_detector.take_snapshot("start") async def client_workload(client_idx: int): nonlocal total_messages_sent, total_errors client = clients[client_idx] session = sessions[client_idx] client_messages = 0 client_errors = 0 start_client_time = time.time() while time.time() - start_client_time < test_duration_seconds: try: activity_type = random.randint(1, 10) if activity_type <= 5: # 50% pings await client.ping() elif activity_type <= 7: # 20% start/cancel operations op_id = str(uuid.uuid4()) await client.start_operation(op_id) await client.cancel_operation(op_id) elif activity_type <= 9: # 20% batch messages await client.send_batch_messages(random.randint(1, 3)) else: # 10% session state change / heartbeat await session.handle_heartbeat() if random.random() < 0.1: # Occasionally pause/resume await session.pause() await session.resume() client_messages += 1 await asyncio.sleep(0.001) # Small delay to simulate real-world gaps except Exception as e: client_errors += 1 logger.debug(f"Mixed workload client {client_idx} error: {e}") total_messages_sent += client_messages total_errors += client_errors start_time = time.time() tasks = [client_workload(i) for i in range(client_count)] with CPUProfiler("mixed_workload"): await asyncio.gather(*tasks, return_exceptions=True) duration = time.time() - start_time memory_monitor.take_sample("mixed_workload_end") leak_detector.take_snapshot("end") memory_growth = memory_monitor.get_memory_growth() leak_report = leak_detector.report_growth() leak_detector.stop() throughput = total_messages_sent / duration error_rate = total_errors / total_messages_sent if total_messages_sent > 0 else 0 logger.info( f"Mixed Workload Performance: {throughput:.2f} msg/sec, {error_rate:.2%} errors" ) logger.info(f"Memory Growth: {memory_growth:.2f} MB") logger.info(f"Leak Detector: {leak_report}") memory_monitor.log_samples() regression_monitor.check("mixed_workload_throughput", 1 / throughput, threshold=1.2) regression_monitor.check( "mixed_workload_memory_growth", memory_growth, threshold=2.0 ) regression_monitor.check( "mixed_workload_object_growth", leak_report["object_growth"], threshold=2.0 ) assert throughput > 100, "Mixed workload throughput is too low" assert error_rate < 0.01, "Mixed workload error rate is too high" assert memory_growth < 10, "Mixed workload memory growth is too high" assert leak_report["object_growth"] < 10000, ( "Object growth is too high (possible leak)" ) # Cleanup for client in clients: if client.connected: await client.disconnect() @pytest.mark.benchmark @pytest.mark.asyncio async def test_optimized_message_processing_performance(memory_monitor): """Benchmark: Test optimized message processing under load with CPU profiling and memory leak detection.""" from mcp_server_git.optimizations import optimize_message_validation test_messages = [] for i in range(5000): if i % 5 == 0: test_messages.append( { "jsonrpc": "2.0", "method": "notifications/cancelled", "params": { "requestId": "repeat_optimized", "reason": "Optimized test", }, } ) else: test_messages.append( { "jsonrpc": "2.0", "method": "notifications/cancelled", "params": { "requestId": f"opt_{i}", "reason": f"Optimized {i}", }, } ) memory_monitor.take_sample("optimized_start") leak_detector = MemoryLeakDetector() leak_detector.take_snapshot("start") with CPUProfiler("optimized_message_processing"): start_time = time.time() for msg in test_messages: result = optimize_message_validation(msg) assert result.is_valid, "Optimized validation should succeed" duration = time.time() - start_time memory_monitor.take_sample("optimized_end") leak_detector.take_snapshot("end") leak_report = leak_detector.report_growth() leak_detector.stop() logger.info(f"Optimized message processing duration: {duration:.4f}s") logger.info(f"Leak Detector: {leak_report}") regression_monitor.check( "optimized_message_processing_duration", duration, threshold=1.5 ) assert duration < 1.0, "Optimized message processing is too slow" assert leak_report["memory_growth_mb"] < 5, ( "Memory growth is too high in optimized processing" ) assert leak_report["object_growth"] < 5000, ( "Object growth is too high in optimized processing" ) @pytest.mark.benchmark @pytest.mark.asyncio async def test_performance_monitor_production_stats(): """Test the production PerformanceMonitor for message processing.""" from mcp_server_git.optimizations import message_perf_monitor # Simulate recording timings for _ in range(100): message_perf_monitor.record(0.001 + random.random() * 0.002) stats = message_perf_monitor.get_stats() logger.info(f"Production PerformanceMonitor stats: {stats}") assert stats["count"] == 0 or stats["avg"] < 0.01, ( "Average message processing time should be low" ) @pytest.mark.benchmark def test_performance_regression_summary(): """Report all detected regressions at the end of the suite.""" regressions = regression_monitor.get_regressions() if regressions: logger.warning(f"Performance regressions detected: {regressions}") else: logger.info("No performance regressions detected.") assert not regressions, f"Performance regressions detected: {regressions}" # CI formatting resolution trigger

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server