Skip to main content
Glama

MCP Git Server

by MementoRC
test_memory_leaks.py19.3 kB
"""Memory leak detection tests for MCP Git Server.""" import asyncio import gc import logging import random import time import uuid import pytest from mcp_server_git.session import SessionState logger = logging.getLogger(__name__) @pytest.mark.stress @pytest.mark.ci_skip # Too slow for CI @pytest.mark.asyncio async def test_memory_leak_detection_extended_operations( stress_session_manager, mock_client, memory_monitor, stress_test_config ): """ Test for memory leaks during extended operation cycles. Performs a large number of operations while monitoring memory usage to detect potential leaks in session management, message processing, or resource cleanup. """ config = stress_test_config["memory"] # More reasonable operation count for CI import os is_ci = ( os.getenv("CI", "false").lower() == "true" or os.getenv("GITHUB_ACTIONS", "false").lower() == "true" or os.getenv("PYTEST_CI", "false").lower() == "true" ) operation_count = 1000 if is_ci else 50000 # Reduced for CI sample_interval = config["sample_interval"] max_growth_mb = config["max_growth_mb"] max_slope = config["max_slope"] # Get baseline memory initial_memory = memory_monitor.take_sample("baseline") logger.info(f"Starting memory leak test with {operation_count} operations") logger.info(f"Initial memory: {initial_memory:.2f} MB") await mock_client.connect() session = await stress_session_manager.create_session( mock_client.session_id, user="memory_test_user" ) try: for i in range(operation_count): # Mix of different operations to stress different code paths operation_type = i % 6 if operation_type == 0: # Standard ping await mock_client.ping() await session.handle_heartbeat() elif operation_type == 1: # Start and immediately cancel operation op_id = str(uuid.uuid4()) await mock_client.start_operation(op_id) await mock_client.cancel_operation(op_id) elif operation_type == 2: # Batch operations await mock_client.send_batch_messages(5) elif operation_type == 3: # Session state changes if session.state == SessionState.ACTIVE: await session.pause() await session.resume() elif operation_type == 4: # Error scenarios (should not leak memory) try: await mock_client.send_invalid_message() except Exception: pass # Expected else: # Brief idle with cleanup opportunity gc.collect() # Give garbage collector a chance await asyncio.sleep(0.001) # Take memory samples at intervals if i % sample_interval == 0 and i > 0: current_memory = memory_monitor.take_sample(f"operation_{i}") if i % (sample_interval * 10) == 0: memory_growth = memory_monitor.get_memory_growth() logger.info(f"Progress: {i}/{operation_count} operations") logger.info(f"Current memory: {current_memory:.2f} MB") logger.info(f"Memory growth: {memory_growth:.2f} MB") # Early warning if memory growth is excessive if memory_growth > max_growth_mb * 2: logger.warning( f"High memory growth detected: {memory_growth:.2f} MB" ) finally: # Cleanup if mock_client.connected: await mock_client.disconnect() if session and not session.is_closed: await stress_session_manager.close_session(session.session_id) # Final cleanup and measurement gc.collect() await asyncio.sleep(0.5) # Allow async cleanup gc.collect() final_memory = memory_monitor.take_sample("final_after_cleanup") # Analysis memory_growth = memory_monitor.get_memory_growth() memory_slope = memory_monitor.get_memory_slope() logger.info("Memory leak test completed:") logger.info(f"Operations completed: {operation_count}") logger.info(f"Initial memory: {initial_memory:.2f} MB") logger.info(f"Final memory: {final_memory:.2f} MB") logger.info(f"Total growth: {memory_growth:.2f} MB") logger.info(f"Growth slope: {memory_slope:.6f} MB/sample") # Log memory samples for analysis memory_monitor.log_samples() # Assertions assert memory_growth < max_growth_mb, ( f"Memory growth exceeds limit: {memory_growth:.2f} MB > {max_growth_mb} MB" ) assert abs(memory_slope) < max_slope, ( f"Memory leak detected: slope={memory_slope:.6f} > {max_slope}" ) logger.info("✅ No memory leaks detected") @pytest.mark.stress @pytest.mark.ci_skip # Too slow for CI @pytest.mark.asyncio async def test_session_creation_destruction_memory( stress_session_manager, memory_monitor, stress_test_config ): """Test memory usage during rapid session creation and destruction.""" session_cycles = 5000 sessions_per_cycle = 20 memory_monitor.take_sample("session_test_start") logger.info(f"Testing {session_cycles} session creation/destruction cycles") logger.info(f"Sessions per cycle: {sessions_per_cycle}") for cycle in range(session_cycles): # Create multiple sessions sessions = [] for i in range(sessions_per_cycle): session_id = f"session_{cycle}_{i}" session = await stress_session_manager.create_session( session_id, user=f"user_{cycle}_{i}" ) sessions.append(session) # Brief activity await session.handle_heartbeat() # Destroy all sessions for session in sessions: await stress_session_manager.close_session(session.session_id) # Clear references sessions.clear() # Periodic memory monitoring if cycle % 100 == 0: gc.collect() current_memory = memory_monitor.take_sample(f"cycle_{cycle}") if cycle % 500 == 0: memory_growth = memory_monitor.get_memory_growth() logger.info( f"Cycle {cycle}/{session_cycles}: {current_memory:.2f} MB " f"(growth: {memory_growth:.2f} MB)" ) # Final cleanup and measurement gc.collect() await asyncio.sleep(0.5) gc.collect() memory_monitor.take_sample("session_test_end") memory_growth = memory_monitor.get_memory_growth() memory_slope = memory_monitor.get_memory_slope() # Check final session manager state final_sessions = await stress_session_manager.get_all_sessions() logger.info("Session memory test completed:") logger.info(f"Total session cycles: {session_cycles}") logger.info(f"Total sessions created: {session_cycles * sessions_per_cycle}") logger.info(f"Memory growth: {memory_growth:.2f} MB") logger.info(f"Memory slope: {memory_slope:.6f} MB/sample") logger.info(f"Remaining sessions: {len(final_sessions)}") # Assertions - more lenient for CI import os is_ci = ( os.getenv("CI", "false").lower() == "true" or os.getenv("GITHUB_ACTIONS", "false").lower() == "true" or os.getenv("PYTEST_CI", "false").lower() == "true" ) max_memory_growth = 50 if is_ci else 30 max_slope = 0.5 if is_ci else 0.1 assert memory_growth < max_memory_growth, ( f"Session memory growth too high: {memory_growth:.2f} MB" ) assert abs(memory_slope) < max_slope, ( f"Session memory leak: slope={memory_slope:.6f}" ) assert len(final_sessions) == 0, ( f"Sessions not properly cleaned up: {len(final_sessions)}" ) logger.info("✅ Session memory management verified") @pytest.mark.stress @pytest.mark.asyncio async def test_resource_cleanup_after_errors( stress_session_manager, mock_client, memory_monitor ): """Test that resources are properly cleaned up after various error scenarios.""" error_cycles = 1000 memory_monitor.take_sample("error_cleanup_start") logger.info(f"Testing resource cleanup through {error_cycles} error scenarios") await mock_client.connect() for cycle in range(error_cycles): # Create session session_id = f"error_session_{cycle}" session = await stress_session_manager.create_session( session_id, user=f"error_user_{cycle}" ) # Generate various error scenarios error_type = cycle % 5 try: if error_type == 0: # Invalid message await mock_client.send_invalid_message() elif error_type == 1: # Malformed operation await mock_client.send_raw_message({"invalid": "message"}) elif error_type == 2: # Force session error state # Simulate by directly changing session state session.state = SessionState.ERROR await session.handle_heartbeat() elif error_type == 3: # Client disconnect during operation op_id = str(uuid.uuid4()) await mock_client.start_operation(op_id) await mock_client.disconnect() await mock_client.connect() else: # Timeout simulation await session.pause() await asyncio.sleep(0.01) await session.resume() except Exception: # Errors are expected pass # Clean up session try: await stress_session_manager.close_session(session.session_id) except Exception: pass # Periodic memory check if cycle % 100 == 0: gc.collect() current_memory = memory_monitor.take_sample(f"error_cycle_{cycle}") if cycle % 200 == 0: memory_growth = memory_monitor.get_memory_growth() logger.info( f"Error cycle {cycle}/{error_cycles}: {current_memory:.2f} MB " f"(growth: {memory_growth:.2f} MB)" ) # Final cleanup if mock_client.connected: await mock_client.disconnect() gc.collect() await asyncio.sleep(0.5) gc.collect() memory_monitor.take_sample("error_cleanup_end") memory_growth = memory_monitor.get_memory_growth() # Check system state final_sessions = await stress_session_manager.get_all_sessions() logger.info("Error cleanup test completed:") logger.info(f"Error cycles completed: {error_cycles}") logger.info(f"Memory growth: {memory_growth:.2f} MB") logger.info(f"Remaining sessions: {len(final_sessions)}") # Assertions - errors should not cause memory leaks assert memory_growth < 20, ( f"Error scenarios caused memory leak: {memory_growth:.2f} MB" ) assert len(final_sessions) <= 1, ( f"Error scenarios left sessions: {len(final_sessions)}" ) logger.info("✅ Resource cleanup after errors verified") @pytest.mark.stress @pytest.mark.ci_skip # Too slow for CI @pytest.mark.asyncio async def test_garbage_collection_effectiveness( stress_session_manager, mock_client, memory_monitor ): """Test that garbage collection effectively reclaims memory.""" # Allocate significant resources initial_memory = memory_monitor.take_sample("gc_test_start") logger.info("Testing garbage collection effectiveness") await mock_client.connect() # Create many objects that should be garbage collected large_objects = [] sessions = [] for i in range(5000): # Create session with data session = await stress_session_manager.create_session( f"gc_session_{i}", user=f"gc_user_{i}" ) sessions.append(session) # Create some large temporary objects large_data = { "data": "x" * 10000, # 10KB per object "id": i, "metadata": {"created": time.time(), "purpose": "gc_test"}, "extra_data": ["x" * 1000 for _ in range(10)], # Additional 10KB } large_objects.append(large_data) # Some activity await session.handle_heartbeat() await mock_client.ping() # Memory should have grown peak_memory = memory_monitor.take_sample("gc_test_peak") # Clear references to enable garbage collection for session in sessions: await session.close() sessions.clear() large_objects.clear() if mock_client.connected: await mock_client.disconnect() # Force garbage collection for _ in range(3): gc.collect() await asyncio.sleep(0.1) # Memory should have decreased significantly post_gc_memory = memory_monitor.take_sample("gc_test_post_gc") initial_to_peak = peak_memory - initial_memory peak_to_post_gc = peak_memory - post_gc_memory gc_efficiency = peak_to_post_gc / initial_to_peak if initial_to_peak > 0 else 0 logger.info("Garbage collection test completed:") logger.info(f"Initial memory: {initial_memory:.2f} MB") logger.info(f"Peak memory: {peak_memory:.2f} MB") logger.info(f"Post-GC memory: {post_gc_memory:.2f} MB") logger.info(f"Memory growth: {initial_to_peak:.2f} MB") logger.info(f"Memory reclaimed: {peak_to_post_gc:.2f} MB") logger.info(f"GC efficiency: {gc_efficiency:.2%}") # Assertions - more lenient for CI import os is_ci = ( os.getenv("CI", "false").lower() == "true" or os.getenv("GITHUB_ACTIONS", "false").lower() == "true" or os.getenv("PYTEST_CI", "false").lower() == "true" ) min_allocation = 2 if is_ci else 5 min_gc_efficiency = 0.3 if is_ci else 0.7 assert initial_to_peak > min_allocation, ( "Test did not allocate enough memory to be meaningful" ) assert gc_efficiency > min_gc_efficiency, ( f"Garbage collection not effective: {gc_efficiency:.2%}" ) assert post_gc_memory < peak_memory, "Garbage collection did not reclaim any memory" logger.info("✅ Garbage collection effectiveness verified") @pytest.mark.stress @pytest.mark.ci_skip # Too slow for CI @pytest.mark.asyncio async def test_long_term_memory_stability( stress_session_manager, multiple_mock_clients, memory_monitor, stress_test_config ): """Test memory stability over extended periods with realistic usage patterns.""" config = stress_test_config["memory"] # More reasonable duration for CI import os is_ci = ( os.getenv("CI", "false").lower() == "true" or os.getenv("GITHUB_ACTIONS", "false").lower() == "true" or os.getenv("PYTEST_CI", "false").lower() == "true" ) duration_minutes = ( 1 if is_ci else min(config.get("stability_duration_minutes", 20), 20) ) memory_monitor.take_sample("stability_start") logger.info(f"Testing memory stability for {duration_minutes} minutes") # Connect multiple clients for client in multiple_mock_clients[:5]: # Use 5 clients await client.connect() # Create sessions for each client sessions = [] for i, client in enumerate(multiple_mock_clients[:5]): session = await stress_session_manager.create_session( client.session_id, user=f"stability_user_{i}" ) sessions.append(session) start_time = time.time() end_time = start_time + (duration_minutes * 60) try: iteration = 0 while time.time() < end_time: iteration += 1 # Realistic usage pattern client_idx = iteration % len(multiple_mock_clients[:5]) client = multiple_mock_clients[client_idx] session = sessions[client_idx] activity_type = iteration % 10 if activity_type < 6: # Normal operations (60%) await client.ping() await session.handle_heartbeat() elif activity_type < 8: # Batch processing (20%) await client.send_batch_messages(random.randint(3, 8)) elif activity_type < 9: # Operation lifecycle (10%) op_id = str(uuid.uuid4()) await client.start_operation(op_id) if random.random() < 0.5: await asyncio.sleep(random.uniform(0.01, 0.1)) await client.cancel_operation(op_id) else: # Client reconnection (10%) await client.disconnect() await asyncio.sleep(random.uniform(0.1, 0.5)) await client.connect() # Update session await stress_session_manager.close_session(session.session_id) session = await stress_session_manager.create_session( client.session_id, user=f"stability_user_{client_idx}" ) sessions[client_idx] = session # Memory monitoring if iteration % 1000 == 0: current_memory = memory_monitor.take_sample(f"stability_{iteration}") elapsed_minutes = (time.time() - start_time) / 60 logger.info( f"Stability test: {elapsed_minutes:.1f}/{duration_minutes} minutes" ) logger.info(f"Memory: {current_memory:.2f} MB, Iteration: {iteration}") # Check for gradual leaks memory_growth = memory_monitor.get_memory_growth() if memory_growth > 100: # More than 100MB growth logger.warning(f"High memory growth: {memory_growth:.2f} MB") await asyncio.sleep(0.001) # Small delay finally: # Cleanup all clients and sessions for client in multiple_mock_clients[:5]: if client.connected: await client.disconnect() for session in sessions: if session and not session.is_closed: await stress_session_manager.close_session(session.session_id) # Final memory measurement gc.collect() memory_monitor.take_sample("stability_end") memory_growth = memory_monitor.get_memory_growth() memory_slope = memory_monitor.get_memory_slope() actual_duration = (time.time() - start_time) / 60 logger.info("Long-term stability test completed:") logger.info(f"Duration: {actual_duration:.2f} minutes") logger.info(f"Iterations: {iteration}") logger.info(f"Memory growth: {memory_growth:.2f} MB") logger.info(f"Memory slope: {memory_slope:.6f} MB/sample") # Stability assertions assert memory_growth < 75, ( f"Long-term memory growth too high: {memory_growth:.2f} MB" ) assert abs(memory_slope) < 0.2, ( f"Memory instability detected: slope={memory_slope:.6f}" ) logger.info("✅ Long-term memory stability verified")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server