Skip to main content
Glama
test_connection_pool.py5.58 kB
"""Tests for connection pool management.""" import asyncio import ssl import weakref from unittest.mock import patch import httpx import pytest from biomcp.connection_pool import ( EventLoopConnectionPools, close_all_pools, get_connection_pool, ) @pytest.fixture def pool_manager(): """Create a fresh pool manager for testing.""" return EventLoopConnectionPools() @pytest.mark.asyncio async def test_get_pool_creates_new_pool(pool_manager): """Test that get_pool creates a new pool when none exists.""" timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) assert not pool.is_closed @pytest.mark.asyncio async def test_get_pool_reuses_existing_pool(pool_manager): """Test that get_pool reuses existing pools.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) pool2 = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool1 is pool2 @pytest.mark.asyncio async def test_get_pool_different_verify_settings(pool_manager): """Test that different verify settings create different pools.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) pool2 = await pool_manager.get_pool(verify=False, timeout=timeout) assert pool1 is not pool2 @pytest.mark.asyncio async def test_get_pool_ssl_context(pool_manager): """Test pool creation with SSL context.""" ssl_context = ssl.create_default_context() timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=ssl_context, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) @pytest.mark.asyncio async def test_pool_cleanup_on_close_all(pool_manager): """Test that close_all properly closes all pools.""" timeout = httpx.Timeout(30) await pool_manager.get_pool(verify=True, timeout=timeout) await pool_manager.get_pool(verify=False, timeout=timeout) await pool_manager.close_all() # After close_all, pools should be cleared assert len(pool_manager._loop_pools) == 0 @pytest.mark.asyncio async def test_no_event_loop_returns_single_use_client(pool_manager): """Test behavior when no event loop is running.""" with patch("asyncio.get_running_loop", side_effect=RuntimeError): timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool is not None # Single-use client should have no keepalive # Note: httpx client internal structure may vary @pytest.mark.asyncio async def test_pool_recreation_after_close(pool_manager): """Test that a new pool is created after the old one is closed.""" timeout = httpx.Timeout(30) pool1 = await pool_manager.get_pool(verify=True, timeout=timeout) await pool1.aclose() pool2 = await pool_manager.get_pool(verify=True, timeout=timeout) assert pool1 is not pool2 assert pool1.is_closed assert not pool2.is_closed @pytest.mark.asyncio async def test_weak_reference_cleanup(): """Test that weak references are used for event loops.""" pool_manager = EventLoopConnectionPools() # Verify that the pool manager uses weak references assert isinstance(pool_manager._loop_pools, weakref.WeakKeyDictionary) # Create a pool timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) # Verify pool was created assert pool is not None # The current event loop should be in the weak key dict current_loop = asyncio.get_running_loop() assert current_loop in pool_manager._loop_pools @pytest.mark.asyncio async def test_global_get_connection_pool(): """Test the global get_connection_pool function.""" with patch.dict("os.environ", {"BIOMCP_USE_CONNECTION_POOL": "true"}): timeout = httpx.Timeout(30) pool = await get_connection_pool(verify=True, timeout=timeout) assert pool is not None assert isinstance(pool, httpx.AsyncClient) @pytest.mark.asyncio async def test_global_close_all_pools(): """Test the global close_all_pools function.""" # Create some pools timeout = httpx.Timeout(30) await get_connection_pool(verify=True, timeout=timeout) await get_connection_pool(verify=False, timeout=timeout) # Close all pools await close_all_pools() # Verify cleanup (this is implementation-specific) from biomcp.connection_pool import _pool_manager assert len(_pool_manager._loop_pools) == 0 @pytest.mark.asyncio async def test_concurrent_pool_creation(pool_manager): """Test thread-safe pool creation under concurrent access.""" timeout = httpx.Timeout(30) async def get_pool(): return await pool_manager.get_pool(verify=True, timeout=timeout) # Create 10 concurrent requests for the same pool pools = await asyncio.gather(*[get_pool() for _ in range(10)]) # All should return the same pool instance assert all(pool is pools[0] for pool in pools) @pytest.mark.asyncio async def test_connection_pool_limits(): """Test that connection pools have proper limits set.""" pool_manager = EventLoopConnectionPools() timeout = httpx.Timeout(30) pool = await pool_manager.get_pool(verify=True, timeout=timeout) # Verify pool was created (actual limits are internal to httpx) assert pool is not None assert isinstance(pool, httpx.AsyncClient)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/genomoncology/biomcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server