Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_database_pool_config.py35.5 kB
""" Comprehensive tests for DatabasePoolConfig. This module tests the enhanced database pool configuration that provides validation, monitoring, and optimization capabilities. Tests cover: - Pool validation logic against database limits - Warning conditions for insufficient pool sizing - Environment variable overrides - Factory methods (development, production, high-concurrency) - Monitoring thresholds and SQLAlchemy event listeners - Integration with existing DatabaseConfig - Production validation checks """ import os import warnings from unittest.mock import Mock, patch import pytest from sqlalchemy.pool import QueuePool from maverick_mcp.config.database import ( DatabasePoolConfig, create_engine_with_enhanced_config, create_monitored_engine_kwargs, get_default_pool_config, get_development_pool_config, get_high_concurrency_pool_config, get_pool_config_from_settings, validate_production_config, ) from maverick_mcp.providers.interfaces.persistence import DatabaseConfig class TestDatabasePoolConfig: """Test the main DatabasePoolConfig class.""" def test_default_configuration(self): """Test default configuration values.""" config = DatabasePoolConfig() # Test environment variable defaults assert config.pool_size == int(os.getenv("DB_POOL_SIZE", "20")) assert config.max_overflow == int(os.getenv("DB_MAX_OVERFLOW", "10")) assert config.pool_timeout == int(os.getenv("DB_POOL_TIMEOUT", "30")) assert config.pool_recycle == int(os.getenv("DB_POOL_RECYCLE", "3600")) assert config.max_database_connections == int( os.getenv("DB_MAX_CONNECTIONS", "100") ) assert config.reserved_superuser_connections == int( os.getenv("DB_RESERVED_SUPERUSER_CONNECTIONS", "3") ) assert config.expected_concurrent_users == int( os.getenv("DB_EXPECTED_CONCURRENT_USERS", "20") ) assert config.connections_per_user == float( os.getenv("DB_CONNECTIONS_PER_USER", "1.2") ) assert config.pool_pre_ping == ( os.getenv("DB_POOL_PRE_PING", "true").lower() == "true" ) assert config.echo_pool == ( os.getenv("DB_ECHO_POOL", "false").lower() == "true" ) @patch.dict( os.environ, { "DB_POOL_SIZE": "25", "DB_MAX_OVERFLOW": "10", "DB_POOL_TIMEOUT": "45", "DB_POOL_RECYCLE": "1800", "DB_MAX_CONNECTIONS": "80", "DB_RESERVED_SUPERUSER_CONNECTIONS": "2", "DB_EXPECTED_CONCURRENT_USERS": "25", "DB_CONNECTIONS_PER_USER": "1.2", "DB_POOL_PRE_PING": "false", "DB_ECHO_POOL": "true", }, ) def test_environment_variable_overrides(self): """Test that environment variables override defaults.""" config = DatabasePoolConfig() assert config.pool_size == 25 assert config.max_overflow == 10 assert config.pool_timeout == 45 assert config.pool_recycle == 1800 assert config.max_database_connections == 80 assert config.reserved_superuser_connections == 2 assert config.expected_concurrent_users == 25 assert config.connections_per_user == 1.2 assert config.pool_pre_ping is False assert config.echo_pool is True def test_valid_configuration(self): """Test a valid configuration passes validation.""" config = DatabasePoolConfig( pool_size=10, max_overflow=5, max_database_connections=50, reserved_superuser_connections=3, expected_concurrent_users=10, connections_per_user=1.2, ) # Should not raise any exceptions assert config.pool_size == 10 assert config.max_overflow == 5 # Calculated values total_app_connections = config.pool_size + config.max_overflow available_connections = ( config.max_database_connections - config.reserved_superuser_connections ) assert total_app_connections <= available_connections def test_validation_exceeds_database_capacity(self): """Test validation failure when pool exceeds database capacity.""" with pytest.raises( ValueError, match="Pool configuration exceeds database capacity" ): DatabasePoolConfig( pool_size=50, max_overflow=30, # Total = 80 max_database_connections=70, # Available = 67 (70-3) reserved_superuser_connections=3, ) def test_validation_insufficient_for_expected_load(self): """Test validation failure when pool is insufficient for expected load.""" with pytest.raises( ValueError, match="Total connection capacity .* is insufficient" ): DatabasePoolConfig( pool_size=5, max_overflow=0, # Total capacity = 5 expected_concurrent_users=10, connections_per_user=1.0, # Expected demand = 10 max_database_connections=50, ) def test_validation_warning_for_small_pool(self): """Test warning when pool size may be insufficient.""" with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") DatabasePoolConfig( pool_size=5, # Small pool max_overflow=15, # But enough overflow to meet demand expected_concurrent_users=10, connections_per_user=1.5, # Expected demand = 15 max_database_connections=50, ) # Should generate a warning assert len(w) > 0 assert "Pool size (5) may be insufficient" in str(w[0].message) def test_field_validation_ranges(self): """Test field validation for valid ranges.""" from pydantic import ValidationError # Test valid ranges with proper expected demand config = DatabasePoolConfig( pool_size=5, # Minimum safe size max_overflow=0, # Minimum pool_timeout=1, # Minimum pool_recycle=300, # Minimum expected_concurrent_users=3, # Lower expected demand connections_per_user=1.0, ) assert config.pool_size == 5 config = DatabasePoolConfig( pool_size=80, # Large but fits in database capacity max_overflow=15, # Reasonable overflow pool_timeout=300, # Maximum pool_recycle=7200, # Maximum expected_concurrent_users=85, # Fit within total capacity of 95 connections_per_user=1.0, max_database_connections=120, # Higher limit to accommodate pool ) assert config.pool_size == 80 # Test invalid ranges with pytest.raises(ValidationError): DatabasePoolConfig(pool_size=0) # Below minimum with pytest.raises(ValidationError): DatabasePoolConfig(pool_size=101) # Above maximum with pytest.raises(ValidationError): DatabasePoolConfig(max_overflow=-1) # Below minimum with pytest.raises(ValidationError): DatabasePoolConfig(max_overflow=51) # Above maximum def test_get_pool_kwargs(self): """Test SQLAlchemy pool configuration generation.""" config = DatabasePoolConfig( pool_size=15, max_overflow=8, pool_timeout=45, pool_recycle=1800, pool_pre_ping=True, echo_pool=True, expected_concurrent_users=18, # Match capacity connections_per_user=1.0, ) kwargs = config.get_pool_kwargs() expected = { "poolclass": QueuePool, "pool_size": 15, "max_overflow": 8, "pool_timeout": 45, "pool_recycle": 1800, "pool_pre_ping": True, "echo_pool": True, } assert kwargs == expected def test_get_monitoring_thresholds(self): """Test monitoring threshold calculation.""" config = DatabasePoolConfig(pool_size=20, max_overflow=10) thresholds = config.get_monitoring_thresholds() expected = { "warning_threshold": int(20 * 0.8), # 16 "critical_threshold": int(20 * 0.95), # 19 "pool_size": 20, "max_overflow": 10, "total_capacity": 30, } assert thresholds == expected def test_validate_against_database_limits_matching(self): """Test validation when actual limits match configuration.""" config = DatabasePoolConfig(max_database_connections=100) # Should not raise any exceptions when limits match config.validate_against_database_limits(100) assert config.max_database_connections == 100 def test_validate_against_database_limits_higher_actual(self): """Test validation when actual limits are higher.""" config = DatabasePoolConfig(max_database_connections=100) with patch("maverick_mcp.config.database.logger") as mock_logger: config.validate_against_database_limits(150) # Should update configuration and log info assert config.max_database_connections == 150 mock_logger.info.assert_called_once() def test_validate_against_database_limits_lower_actual_safe(self): """Test validation when actual limits are lower but pool still fits.""" config = DatabasePoolConfig( pool_size=10, max_overflow=5, # Total = 15 max_database_connections=100, reserved_superuser_connections=3, expected_concurrent_users=12, # Fit within total capacity of 15 connections_per_user=1.0, ) with patch("maverick_mcp.config.database.logger") as mock_logger: # Actual limit is 80, available is 77, pool needs 15 - should be fine config.validate_against_database_limits(80) mock_logger.warning.assert_called_once() warning_call = mock_logger.warning.call_args[0][0] assert "lower than configured" in warning_call def test_validate_against_database_limits_lower_actual_unsafe(self): """Test validation failure when actual limits are too low.""" config = DatabasePoolConfig( pool_size=30, max_overflow=20, # Total = 50 max_database_connections=100, reserved_superuser_connections=3, ) with pytest.raises( ValueError, match="Configuration invalid for actual database limits" ): # Actual limit is 40, available is 37, pool needs 50 - should fail config.validate_against_database_limits(40) def test_to_legacy_config(self): """Test conversion to legacy DatabaseConfig.""" config = DatabasePoolConfig( pool_size=15, max_overflow=8, pool_timeout=45, pool_recycle=1800, echo_pool=True, expected_concurrent_users=18, # Fit within total capacity of 23 connections_per_user=1.0, ) database_url = "postgresql://user:pass@localhost/test" legacy_config = config.to_legacy_config(database_url) assert isinstance(legacy_config, DatabaseConfig) assert legacy_config.database_url == database_url assert legacy_config.pool_size == 15 assert legacy_config.max_overflow == 8 assert legacy_config.pool_timeout == 45 assert legacy_config.pool_recycle == 1800 assert legacy_config.echo is True assert legacy_config.autocommit is False assert legacy_config.autoflush is True assert legacy_config.expire_on_commit is True def test_from_legacy_config(self): """Test creation from legacy DatabaseConfig.""" legacy_config = DatabaseConfig( database_url="postgresql://user:pass@localhost/test", pool_size=12, max_overflow=6, pool_timeout=60, pool_recycle=2400, echo=False, ) enhanced_config = DatabasePoolConfig.from_legacy_config( legacy_config, expected_concurrent_users=15, # Override max_database_connections=80, # Override ) assert enhanced_config.pool_size == 12 assert enhanced_config.max_overflow == 6 assert enhanced_config.pool_timeout == 60 assert enhanced_config.pool_recycle == 2400 assert enhanced_config.echo_pool is False assert enhanced_config.expected_concurrent_users == 15 # Override applied assert enhanced_config.max_database_connections == 80 # Override applied def test_setup_pool_monitoring(self): """Test SQLAlchemy event listener setup.""" config = DatabasePoolConfig( pool_size=10, echo_pool=True, expected_concurrent_users=15, # Fit within capacity connections_per_user=1.0, ) # Create a mock engine with pool mock_engine = Mock() mock_pool = Mock() mock_pool.checkedout.return_value = 5 mock_pool.checkedin.return_value = 3 mock_engine.pool = mock_pool # Mock the event listener registration with patch("maverick_mcp.config.database.event") as mock_event: config.setup_pool_monitoring(mock_engine) # Verify event listeners were registered assert ( mock_event.listens_for.call_count == 5 ) # connect, checkout, checkin, invalidate, soft_invalidate # Test the event listener functions were called correctly expected_events = [ "connect", "checkout", "checkin", "invalidate", "soft_invalidate", ] for call_args in mock_event.listens_for.call_args_list: assert call_args[0][0] is mock_engine assert call_args[0][1] in expected_events class TestFactoryFunctions: """Test factory functions for different configuration types.""" def test_get_default_pool_config(self): """Test default pool configuration factory.""" config = get_default_pool_config() assert isinstance(config, DatabasePoolConfig) # Should use environment variable defaults assert config.pool_size == int(os.getenv("DB_POOL_SIZE", "20")) def test_get_development_pool_config(self): """Test development pool configuration factory.""" config = get_development_pool_config() assert isinstance(config, DatabasePoolConfig) assert config.pool_size == 5 assert config.max_overflow == 2 assert config.pool_timeout == 30 assert config.pool_recycle == 3600 assert config.expected_concurrent_users == 5 assert config.connections_per_user == 1.0 assert config.max_database_connections == 20 assert config.reserved_superuser_connections == 2 assert config.echo_pool is True # Debug enabled in development def test_get_high_concurrency_pool_config(self): """Test high concurrency pool configuration factory.""" config = get_high_concurrency_pool_config() assert isinstance(config, DatabasePoolConfig) assert config.pool_size == 50 assert config.max_overflow == 30 assert config.pool_timeout == 60 assert config.pool_recycle == 1800 # 30 minutes assert config.expected_concurrent_users == 60 assert config.connections_per_user == 1.3 assert config.max_database_connections == 200 assert config.reserved_superuser_connections == 5 def test_get_pool_config_from_settings_development(self): """Test configuration from settings in development.""" # Create a mock settings module and settings object mock_settings_module = Mock() mock_settings_obj = Mock() mock_settings_obj.environment = "development" # Configure hasattr to return False for 'db' to avoid overrides path mock_settings_obj.configure_mock(**{"db": None}) mock_settings_module.settings = mock_settings_obj # Patch the import to return our mock with patch.dict( "sys.modules", {"maverick_mcp.config.settings": mock_settings_module} ): # Also patch hasattr to return False for the db attribute with patch("builtins.hasattr", side_effect=lambda obj, attr: attr != "db"): config = get_pool_config_from_settings() # Should return development configuration assert config.pool_size == 5 # Development default assert config.echo_pool is True def test_get_pool_config_from_settings_production(self): """Test configuration from settings in production.""" # Create a mock settings module and settings object mock_settings_module = Mock() mock_settings_obj = Mock() mock_settings_obj.environment = "production" mock_settings_module.settings = mock_settings_obj # Patch the import to return our mock with patch.dict( "sys.modules", {"maverick_mcp.config.settings": mock_settings_module} ): # Also patch hasattr to return False for the db attribute with patch("builtins.hasattr", side_effect=lambda obj, attr: attr != "db"): config = get_pool_config_from_settings() # Should return high concurrency configuration assert config.pool_size == 50 # Production default assert config.max_overflow == 30 def test_get_pool_config_from_settings_with_overrides(self): """Test configuration from settings with database-specific overrides.""" # Create a mock settings module and settings object mock_settings_module = Mock() mock_settings_obj = Mock() mock_settings_obj.environment = "development" # Create proper mock for db settings with real values, not Mock objects class MockDbSettings: pool_size = 8 pool_max_overflow = 3 pool_timeout = 60 mock_settings_obj.db = MockDbSettings() mock_settings_module.settings = mock_settings_obj # Patch the import to return our mock with patch.dict( "sys.modules", {"maverick_mcp.config.settings": mock_settings_module} ): config = get_pool_config_from_settings() # Should use overrides assert config.pool_size == 8 assert config.max_overflow == 3 assert config.pool_timeout == 60 # Other development defaults should remain assert config.echo_pool is True def test_get_pool_config_from_settings_import_error(self): """Test fallback when settings import fails.""" # Create a mock import function that raises ImportError for settings module def mock_import(name, *args, **kwargs): if name == "maverick_mcp.config.settings": raise ImportError("No module named 'maverick_mcp.config.settings'") return __import__(name, *args, **kwargs) with patch("builtins.__import__", side_effect=mock_import): with patch("maverick_mcp.config.database.logger") as mock_logger: config = get_pool_config_from_settings() # Should fall back to default assert isinstance(config, DatabasePoolConfig) # Should call warning twice: import error + pool size warning assert mock_logger.warning.call_count == 2 import_warning_call = mock_logger.warning.call_args_list[0] assert ( "Could not import settings, using default pool configuration" in str(import_warning_call) ) class TestUtilityFunctions: """Test utility functions.""" def test_create_monitored_engine_kwargs(self): """Test monitored engine kwargs creation.""" config = DatabasePoolConfig( pool_size=15, max_overflow=8, pool_timeout=45, pool_recycle=1800, pool_pre_ping=True, echo_pool=False, expected_concurrent_users=18, # Reduce to fit total capacity of 23 connections_per_user=1.0, ) database_url = "postgresql://user:pass@localhost/test" kwargs = create_monitored_engine_kwargs(database_url, config) expected = { "url": database_url, "poolclass": QueuePool, "pool_size": 15, "max_overflow": 8, "pool_timeout": 45, "pool_recycle": 1800, "pool_pre_ping": True, "echo_pool": False, "connect_args": { "application_name": "maverick_mcp", }, } assert kwargs == expected @patch("sqlalchemy.create_engine") @patch("maverick_mcp.config.database.get_pool_config_from_settings") def test_create_engine_with_enhanced_config( self, mock_get_config, mock_create_engine ): """Test complete engine creation with monitoring.""" mock_config = Mock(spec=DatabasePoolConfig) mock_config.pool_size = 20 mock_config.max_overflow = 10 mock_config.get_pool_kwargs.return_value = {"pool_size": 20} mock_config.setup_pool_monitoring = Mock() mock_get_config.return_value = mock_config mock_engine = Mock() mock_create_engine.return_value = mock_engine database_url = "postgresql://user:pass@localhost/test" result = create_engine_with_enhanced_config(database_url) # Verify engine creation and monitoring setup assert result is mock_engine mock_create_engine.assert_called_once() mock_config.setup_pool_monitoring.assert_called_once_with(mock_engine) def test_validate_production_config_valid(self): """Test production validation for valid configuration.""" config = DatabasePoolConfig( pool_size=25, max_overflow=15, pool_timeout=30, pool_recycle=3600, ) with patch("maverick_mcp.config.database.logger") as mock_logger: result = validate_production_config(config) assert result is True mock_logger.info.assert_called_with( "Production configuration validation passed" ) def test_validate_production_config_warnings(self): """Test production validation with warnings.""" config = DatabasePoolConfig( pool_size=5, # Too small max_overflow=0, # No overflow pool_timeout=30, pool_recycle=7200, # Maximum allowed (was 8000, too high) expected_concurrent_users=4, # Reduce to fit capacity of 5 connections_per_user=1.0, ) with patch("maverick_mcp.config.database.logger") as mock_logger: result = validate_production_config(config) assert result is True # Warnings don't fail validation # Should log multiple warnings warning_calls = list(mock_logger.warning.call_args_list) assert ( len(warning_calls) == 2 ) # Small pool, no overflow (recycle=7200 is max allowed, not "too long") # Check final info message mentions warnings info_call = mock_logger.info.call_args[0][0] assert "warnings" in info_call def test_validate_production_config_errors(self): """Test production validation with errors.""" config = DatabasePoolConfig( pool_size=15, max_overflow=5, pool_timeout=5, # Too aggressive pool_recycle=3600, expected_concurrent_users=18, # Reduce to fit capacity of 20 connections_per_user=1.0, ) with pytest.raises( ValueError, match="Production configuration validation failed" ): validate_production_config(config) class TestEventListenerBehavior: """Test SQLAlchemy event listener behavior with real scenarios.""" def test_connect_event_logging(self): """Test connect event logging behavior.""" config = DatabasePoolConfig( pool_size=10, echo_pool=True, expected_concurrent_users=8, # Reduce expected demand to fit capacity connections_per_user=1.0, ) # Mock engine and pool mock_engine = Mock() mock_pool = Mock() mock_pool.checkedout.return_value = 7 # 70% usage mock_pool.checkedin.return_value = 3 mock_engine.pool = mock_pool # Mock the event registration and capture listener functions captured_listeners = {} def mock_listens_for(target, event_name): def decorator(func): captured_listeners[event_name] = func return func return decorator with patch( "maverick_mcp.config.database.event.listens_for", side_effect=mock_listens_for, ): config.setup_pool_monitoring(mock_engine) # Verify we captured the connect listener assert "connect" in captured_listeners connect_listener = captured_listeners["connect"] # Test the listener function with patch("maverick_mcp.config.database.logger") as mock_logger: connect_listener(None, None) # dbapi_connection, connection_record # Should log warning at 70% usage (above 80% threshold would be warning) # At 70%, should not trigger warning (threshold is 80%) mock_logger.warning.assert_not_called() def test_connect_event_warning_threshold(self): """Test connect event warning threshold.""" config = DatabasePoolConfig( pool_size=10, echo_pool=True, expected_concurrent_users=8, # Reduce expected demand connections_per_user=1.0, ) mock_engine = Mock() mock_pool = Mock() mock_pool.checkedout.return_value = 9 # 90% usage (above 80% warning threshold) mock_pool.checkedin.return_value = 1 mock_engine.pool = mock_pool # Mock the event registration and capture listener functions captured_listeners = {} def mock_listens_for(target, event_name): def decorator(func): captured_listeners[event_name] = func return func return decorator with patch( "maverick_mcp.config.database.event.listens_for", side_effect=mock_listens_for, ): config.setup_pool_monitoring(mock_engine) # Verify we captured the connect listener assert "connect" in captured_listeners connect_listener = captured_listeners["connect"] # Test warning threshold with patch("maverick_mcp.config.database.logger") as mock_logger: connect_listener(None, None) # Should log warning mock_logger.warning.assert_called_once() warning_message = mock_logger.warning.call_args[0][0] assert "Pool usage approaching capacity" in warning_message def test_connect_event_critical_threshold(self): """Test connect event critical threshold.""" config = DatabasePoolConfig( pool_size=10, echo_pool=True, expected_concurrent_users=8, # Reduce expected demand connections_per_user=1.0, ) mock_engine = Mock() mock_pool = Mock() mock_pool.checkedout.return_value = ( 10 # 100% usage (above 95% critical threshold) ) mock_pool.checkedin.return_value = 0 mock_engine.pool = mock_pool # Mock the event registration and capture listener functions captured_listeners = {} def mock_listens_for(target, event_name): def decorator(func): captured_listeners[event_name] = func return func return decorator with patch( "maverick_mcp.config.database.event.listens_for", side_effect=mock_listens_for, ): config.setup_pool_monitoring(mock_engine) # Verify we captured the connect listener assert "connect" in captured_listeners connect_listener = captured_listeners["connect"] # Test critical threshold with patch("maverick_mcp.config.database.logger") as mock_logger: connect_listener(None, None) # Should log both warning and error mock_logger.warning.assert_called_once() mock_logger.error.assert_called_once() error_message = mock_logger.error.call_args[0][0] assert "Pool usage critical" in error_message def test_invalidate_event_logging(self): """Test connection invalidation event logging.""" config = DatabasePoolConfig( pool_size=10, echo_pool=True, expected_concurrent_users=8, # Reduce expected demand connections_per_user=1.0, ) mock_engine = Mock() # Mock the event registration and capture listener functions captured_listeners = {} def mock_listens_for(target, event_name): def decorator(func): captured_listeners[event_name] = func return func return decorator with patch( "maverick_mcp.config.database.event.listens_for", side_effect=mock_listens_for, ): config.setup_pool_monitoring(mock_engine) # Verify we captured the invalidate listener assert "invalidate" in captured_listeners invalidate_listener = captured_listeners["invalidate"] # Test the listener function with patch("maverick_mcp.config.database.logger") as mock_logger: test_exception = Exception("Connection lost") invalidate_listener(None, None, test_exception) mock_logger.warning.assert_called_once() warning_message = mock_logger.warning.call_args[0][0] assert "Connection invalidated due to error" in warning_message assert "Connection lost" in warning_message class TestRealWorldScenarios: """Test realistic usage scenarios.""" def test_microservice_configuration(self): """Test configuration suitable for microservice deployment.""" config = DatabasePoolConfig( pool_size=8, max_overflow=4, pool_timeout=30, pool_recycle=1800, expected_concurrent_users=10, connections_per_user=1.0, max_database_connections=50, reserved_superuser_connections=2, ) # Should be valid assert config.pool_size == 8 # Test monitoring setup thresholds = config.get_monitoring_thresholds() assert thresholds["warning_threshold"] == 6 # 80% of 8 assert thresholds["critical_threshold"] == 7 # 95% of 8 def test_high_traffic_web_app_configuration(self): """Test configuration for high-traffic web application.""" config = get_high_concurrency_pool_config() # Validate it's production-ready assert validate_production_config(config) is True # Should handle expected load total_capacity = config.pool_size + config.max_overflow expected_demand = config.expected_concurrent_users * config.connections_per_user assert total_capacity >= expected_demand def test_development_to_production_migration(self): """Test migrating from development to production configuration.""" # Start with development config dev_config = get_development_pool_config() assert dev_config.echo_pool is True # Debug enabled assert dev_config.pool_size == 5 # Small pool # Convert to legacy for compatibility testing legacy_config = dev_config.to_legacy_config("postgresql://localhost/test") assert isinstance(legacy_config, DatabaseConfig) # Upgrade to production config prod_config = DatabasePoolConfig.from_legacy_config( legacy_config, pool_size=30, # Production sizing max_overflow=20, expected_concurrent_users=40, max_database_connections=150, echo_pool=False, # Disable debug ) # Should be production-ready assert validate_production_config(prod_config) is True assert prod_config.echo_pool is False assert prod_config.pool_size == 30 def test_database_upgrade_scenario(self): """Test handling database capacity upgrades.""" # Original configuration for smaller database config = DatabasePoolConfig( pool_size=20, max_overflow=10, max_database_connections=100, ) # Database upgraded to higher capacity config.validate_against_database_limits(200) # Configuration should be updated assert config.max_database_connections == 200 # Can now safely increase pool size larger_config = DatabasePoolConfig( pool_size=40, max_overflow=20, max_database_connections=200, expected_concurrent_users=50, connections_per_user=1.2, ) # Should validate successfully assert larger_config.pool_size == 40 def test_connection_exhaustion_prevention(self): """Test that configuration prevents connection exhaustion.""" # Configuration that would exhaust connections with pytest.raises(ValueError, match="exceeds database capacity"): DatabasePoolConfig( pool_size=45, max_overflow=35, # Total = 80 max_database_connections=75, # Available = 72 (75-3) reserved_superuser_connections=3, ) # Safe configuration safe_config = DatabasePoolConfig( pool_size=30, max_overflow=20, # Total = 50 max_database_connections=75, # Available = 72 (75-3) reserved_superuser_connections=3, ) # Should leave room for other applications and admin access total_used = safe_config.pool_size + safe_config.max_overflow available = ( safe_config.max_database_connections - safe_config.reserved_superuser_connections ) assert total_used < available # Should not use ALL available connections

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server