mcp-dbutils

"""Integration tests for resource monitoring""" import tempfile import pytest import yaml from mcp_dbutils.base import ConnectionHandlerError, ConnectionServer @pytest.mark.asyncio async def test_sqlite_monitoring(sqlite_db, mcp_config): """Test resource monitoring with SQLite""" with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp: yaml.dump(mcp_config, tmp) tmp.flush() server = ConnectionServer(config_path=tmp.name) async with server.get_handler("test_sqlite") as handler: # Connection stats stats = handler.stats assert stats.active_connections == 1 assert stats.total_connections == 1 # Execute queries and check stats await handler.execute_query("SELECT * FROM products") assert stats.query_count == 1 await handler.execute_query("SELECT name FROM products") assert stats.query_count == 2 # Test error recording try: await handler.execute_query("SELECT * FROM nonexistent") except ConnectionHandlerError: pass assert stats.error_count == 1 assert "ConnectionHandlerError" in stats.error_types # After context exit, connection should be closed assert stats.active_connections == 0 @pytest.mark.asyncio async def test_postgres_monitoring(postgres_db, mcp_config): """Test resource monitoring with PostgreSQL""" with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp: yaml.dump(mcp_config, tmp) tmp.flush() server = ConnectionServer(config_path=tmp.name) async with server.get_handler("test_pg") as handler: # Connection stats stats = handler.stats assert stats.active_connections == 1 assert stats.total_connections == 1 # Execute queries and check stats await handler.execute_query("SELECT * FROM users") assert stats.query_count == 1 await handler.execute_query("SELECT name FROM users") assert stats.query_count == 2 # Test error recording try: await handler.execute_query("SELECT * FROM nonexistent") except ConnectionHandlerError: pass assert stats.error_count == 1 assert "ConnectionHandlerError" in stats.error_types # Test stats serialization stats_dict = stats.to_dict() assert stats_dict["query_count"] == 3 # Two successful queries + one failed query assert stats_dict["error_count"] == 1 assert isinstance(stats_dict["connection_duration"], (int, float)) # After context exit, connection should be closed assert stats.active_connections == 0