Skip to main content
Glama
test_connection_pool.py8.72 kB
"""Connection pool performance benchmarks.""" import asyncio import statistics import time from typing import Any import pytest from scout_mcp.models import SSHHost from scout_mcp.services.pool import ConnectionPool class MockSSHConnection: """Mock SSH connection for testing.""" def __init__(self, delay: float = 0.001) -> None: """Initialize mock connection.""" self._delay = delay self._closed = False @property def is_closed(self) -> bool: """Check if connection is closed.""" return self._closed def close(self) -> None: """Close connection.""" self._closed = True async def run(self, command: str, check: bool = True) -> Any: """Mock command execution.""" await asyncio.sleep(self._delay) return type("Result", (), {"returncode": 0, "stdout": "", "stderr": ""})() class MockAsyncSSH: """Mock asyncssh module.""" def __init__(self, delay: float = 0.001) -> None: """Initialize mock.""" self._delay = delay self._connection_count = 0 async def connect(self, *args: Any, **kwargs: Any) -> MockSSHConnection: """Mock SSH connection.""" self._connection_count += 1 await asyncio.sleep(self._delay) return MockSSHConnection(self._delay) @pytest.fixture def mock_host() -> SSHHost: """Create test SSH host.""" return SSHHost( name="test-host", hostname="localhost", user="test", port=22, ) @pytest.mark.asyncio async def test_cold_start_latency( mock_host: SSHHost, monkeypatch: pytest.MonkeyPatch, ) -> None: """Benchmark cold start connection time.""" mock_ssh = MockAsyncSSH(delay=0.01) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() start = time.perf_counter() await pool.get_connection(mock_host) elapsed = time.perf_counter() - start print(f"\n[PERF] Cold start latency: {elapsed * 1000:.2f}ms") assert elapsed < 0.05, "Cold start should complete in <50ms" @pytest.mark.asyncio async def test_warm_connection_latency( mock_host: SSHHost, monkeypatch: pytest.MonkeyPatch, ) -> None: """Benchmark warm connection retrieval time.""" mock_ssh = MockAsyncSSH(delay=0.01) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() # Prime the pool await pool.get_connection(mock_host) # Measure warm retrieval start = time.perf_counter() await pool.get_connection(mock_host) elapsed = time.perf_counter() - start print(f"\n[PERF] Warm connection latency: {elapsed * 1000:.2f}ms") assert elapsed < 0.001, "Warm retrieval should be <1ms (lock + dict lookup)" @pytest.mark.asyncio async def test_concurrent_single_host_lock_contention( mock_host: SSHHost, monkeypatch: pytest.MonkeyPatch, ) -> None: """Measure lock contention with concurrent requests to same host.""" mock_ssh = MockAsyncSSH(delay=0.001) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() num_requests = 100 async def get_conn() -> float: start = time.perf_counter() await pool.get_connection(mock_host) return time.perf_counter() - start # Run concurrent requests start_total = time.perf_counter() results = await asyncio.gather(*[get_conn() for _ in range(num_requests)]) elapsed_total = time.perf_counter() - start_total avg_latency = statistics.mean(results) * 1000 p95_latency = statistics.quantiles(results, n=20)[18] * 1000 p99_latency = statistics.quantiles(results, n=100)[98] * 1000 throughput = num_requests / elapsed_total print(f"\n[PERF] Concurrent single-host (n={num_requests}):") print(f" Avg latency: {avg_latency:.2f}ms") print(f" P95 latency: {p95_latency:.2f}ms") print(f" P99 latency: {p99_latency:.2f}ms") print(f" Throughput: {throughput:.0f} req/s") print(f" Connections created: {mock_ssh._connection_count}") # Should reuse single connection assert mock_ssh._connection_count == 1, "Should create only 1 connection" @pytest.mark.asyncio async def test_concurrent_multi_host_parallelism( monkeypatch: pytest.MonkeyPatch, ) -> None: """Measure parallelism with concurrent requests to different hosts.""" mock_ssh = MockAsyncSSH(delay=0.01) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() num_hosts = 10 hosts = [ SSHHost( name=f"host-{i}", hostname="localhost", user="test", port=22 + i, ) for i in range(num_hosts) ] async def get_conn(host: SSHHost) -> float: start = time.perf_counter() await pool.get_connection(host) return time.perf_counter() - start # Run concurrent requests to different hosts start_total = time.perf_counter() results = await asyncio.gather(*[get_conn(host) for host in hosts]) elapsed_total = time.perf_counter() - start_total avg_latency = statistics.mean(results) * 1000 print(f"\n[PERF] Concurrent multi-host (n={num_hosts}):") print(f" Total time: {elapsed_total * 1000:.2f}ms") print(f" Avg latency: {avg_latency:.2f}ms") print(f" Connections created: {mock_ssh._connection_count}") # Should create connections in parallel assert mock_ssh._connection_count == num_hosts # With 10ms connection time, serial would be ~100ms # Parallel should be faster, but system load can cause variance assert elapsed_total < 0.2, "Should create connections in parallel (<200ms)" @pytest.mark.asyncio async def test_pool_memory_footprint( monkeypatch: pytest.MonkeyPatch, ) -> None: """Measure connection pool memory usage.""" import sys mock_ssh = MockAsyncSSH(delay=0.001) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() # Create connections to 100 hosts hosts = [ SSHHost( name=f"host-{i}", hostname="localhost", user="test", port=22, ) for i in range(100) ] for host in hosts: await pool.get_connection(host) # Measure pool size pool_size = sys.getsizeof(pool._connections) pooled_conn_size = sum(sys.getsizeof(pc) for pc in pool._connections.values()) total_size = pool_size + pooled_conn_size print("\n[PERF] Memory footprint (100 connections):") print(f" Pool dict: {pool_size} bytes") print(f" Pooled connections: {pooled_conn_size} bytes") print(f" Total: {total_size} bytes ({total_size / 1024:.1f} KB)") @pytest.mark.asyncio async def test_cleanup_task_overhead( mock_host: SSHHost, monkeypatch: pytest.MonkeyPatch, ) -> None: """Measure cleanup task impact on active connections.""" mock_ssh = MockAsyncSSH(delay=0.001) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool(idle_timeout=1) # Create connection to start cleanup task await pool.get_connection(mock_host) # Measure latency during cleanup cycles latencies = [] for _ in range(10): await asyncio.sleep(0.6) # Wait for cleanup cycle start = time.perf_counter() await pool.get_connection(mock_host) latencies.append(time.perf_counter() - start) avg_latency = statistics.mean(latencies) * 1000 max_latency = max(latencies) * 1000 print("\n[PERF] Cleanup task overhead:") print(f" Avg latency: {avg_latency:.2f}ms") print(f" Max latency: {max_latency:.2f}ms") # Cleanup shouldn't block operations assert max_latency < 5.0, "Cleanup should not block pool access >5ms" @pytest.mark.asyncio async def test_stale_connection_detection( mock_host: SSHHost, monkeypatch: pytest.MonkeyPatch, ) -> None: """Benchmark stale connection detection performance.""" mock_ssh = MockAsyncSSH(delay=0.01) monkeypatch.setattr("scout_mcp.services.pool.asyncssh", mock_ssh) pool = ConnectionPool() # Create connection conn = await pool.get_connection(mock_host) # Close it to make stale conn.close() # Measure stale detection + reconnect start = time.perf_counter() await pool.get_connection(mock_host) elapsed = time.perf_counter() - start print(f"\n[PERF] Stale connection detection + reconnect: {elapsed * 1000:.2f}ms") # Should detect stale and reconnect assert mock_ssh._connection_count == 2, "Should create new connection" assert elapsed < 0.05, "Stale detection + reconnect should be <50ms"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/scout_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server