Skip to main content
Glama

MCP Git Server

by MementoRC
test_heartbeat.py3.62 kB
import asyncio import pytest from mcp_server_git.session import HeartbeatManager, SessionManager @pytest.mark.asyncio class TestHeartbeatManager: @pytest.fixture async def session_manager(self): mgr = SessionManager(idle_timeout=100, heartbeat_timeout=10) yield mgr await mgr.shutdown() @pytest.fixture async def heartbeat_manager(self, session_manager): hb = HeartbeatManager( session_manager, heartbeat_interval=0.05, missed_threshold=2 ) await hb.start() yield hb await hb.stop() @pytest.mark.asyncio async def test_heartbeat_record_and_metrics( self, session_manager, heartbeat_manager ): session = await session_manager.create_session("hb-test") await heartbeat_manager.record_heartbeat("hb-test") last = heartbeat_manager.get_last_heartbeat("hb-test") assert last is not None await session.handle_heartbeat() metrics = session.get_metrics() assert metrics["heartbeat_count"] == 1 @pytest.mark.asyncio async def test_missed_heartbeat_triggers_cleanup(self, session_manager): hb = HeartbeatManager( session_manager, heartbeat_interval=0.05, missed_threshold=1 ) await hb.start() await session_manager.create_session("missed-hb") await hb.record_heartbeat("missed-hb") # Wait for heartbeat loop to detect missed heartbeat await asyncio.sleep(0.2) # Session should be closed and removed s = await session_manager.get_session("missed-hb") assert s is None await hb.stop() @pytest.mark.asyncio async def test_concurrent_heartbeats(self, session_manager, heartbeat_manager): sessions = [] for i in range(5): s = await session_manager.create_session(f"concurrent-{i}") sessions.append(s) await heartbeat_manager.record_heartbeat(f"concurrent-{i}") # Sleep for less than the cleanup threshold (missed_threshold=2 * heartbeat_interval=0.05 = 0.1s) await asyncio.sleep(0.05) for i in range(5): assert heartbeat_manager.get_last_heartbeat(f"concurrent-{i}") is not None @pytest.mark.asyncio async def test_heartbeat_metrics_increment( self, session_manager, heartbeat_manager ): session = await session_manager.create_session("metrics-hb") for _ in range(3): await heartbeat_manager.record_heartbeat("metrics-hb") await session.handle_heartbeat() metrics = session.get_metrics() assert metrics["heartbeat_count"] == 3 @pytest.mark.asyncio async def test_heartbeat_manager_stop(self, session_manager): hb = HeartbeatManager( session_manager, heartbeat_interval=0.05, missed_threshold=1 ) await hb.start() await hb.stop() assert not hb._running @pytest.mark.asyncio async def test_heartbeat_cleanup_on_shutdown(self, session_manager): hb = HeartbeatManager( session_manager, heartbeat_interval=0.05, missed_threshold=1 ) session_manager.heartbeat_manager = hb # Associate the heartbeat manager await hb.start() await session_manager.create_session("shutdown-hb") await hb.record_heartbeat("shutdown-hb") await session_manager.shutdown() # After shutdown, session should be closed and heartbeat manager stopped s = await session_manager.get_session("shutdown-hb") assert s is None assert not hb._running

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MementoRC/mcp-git'

If you have feedback or need assistance with the MCP directory API, please join our Discord server