mcp-dbutils
by donghao1393
- tests
- integration
"""Integration tests for enhanced monitoring features"""
import tempfile
from datetime import datetime
import pytest
import yaml
from mcp_dbutils.base import ConnectionHandlerError, ConnectionServer
from mcp_dbutils.stats import ResourceStats
@pytest.mark.asyncio
async def test_resource_stats_performance_tracking(sqlite_db, mcp_config):
"""Test performance tracking in ResourceStats"""
# Create a ResourceStats instance
stats = ResourceStats()
# Record some query durations
stats.record_query_duration("SELECT * FROM products", 0.05) # 50ms
stats.record_query_duration("SELECT name FROM products", 0.02) # 20ms
stats.record_query_duration("SELECT * FROM products WHERE price > 10", 0.15) # 150ms (slow)
# Check query count and types
assert stats.query_count == 0 # query_count is only incremented by record_query()
assert "SELECT" in stats.query_types
assert stats.query_types["SELECT"] == 3
# Check slow queries
assert len(stats.slow_queries) == 1
assert stats.slow_queries[0][1] == 0.15
# Check query time stats
time_stats = stats.get_query_time_stats()
assert time_stats["min"] == 0.02
assert time_stats["max"] == 0.15
assert 0.07 < time_stats["avg"] < 0.08 # (0.05 + 0.02 + 0.15) / 3 = 0.073
# Check performance stats formatting
perf_stats = stats.get_performance_stats()
assert "Database Performance Statistics" in perf_stats
assert "Query Times" in perf_stats
assert "Query Types" in perf_stats
assert "SELECT: 3" in perf_stats
assert "Slow Queries" in perf_stats
@pytest.mark.asyncio
async def test_query_duration_tracking(sqlite_db, mcp_config):
"""Test query duration tracking in database handler"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
async with server.get_handler("test_sqlite") as handler:
# Execute a query and check duration tracking
await handler.execute_query("SELECT * FROM products")
# Verify that query duration was recorded
assert len(handler.stats.query_durations) == 1
assert handler.stats.query_durations[0] > 0
# Verify query type tracking
assert "SELECT" in handler.stats.query_types
assert handler.stats.query_types["SELECT"] == 1
# Execute another query
await handler.execute_query("SELECT name FROM products")
# Verify that query duration was recorded
assert len(handler.stats.query_durations) == 2
# Verify to_dict includes query times
stats_dict = handler.stats.to_dict()
assert "query_times_ms" in stats_dict
assert "min" in stats_dict["query_times_ms"]
assert "max" in stats_dict["query_times_ms"]
assert "avg" in stats_dict["query_times_ms"]
@pytest.mark.asyncio
async def test_get_performance_stats(sqlite_db, mcp_config):
"""Test performance statistics generation"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
# Execute some queries to generate stats
async with server.get_handler("test_sqlite") as handler:
await handler.execute_query("SELECT * FROM products")
await handler.execute_query("SELECT name FROM products")
# Intentionally cause an error
try:
await handler.execute_query("SELECT * FROM nonexistent")
except ConnectionHandlerError:
pass
# Get performance stats directly from handler
perf_stats = handler.stats.get_performance_stats()
# Verify performance stats content
assert "Database Performance Statistics" in perf_stats
assert "Query Count: 3" in perf_stats # 2 successful + 1 failed
assert "Query Types" in perf_stats
assert "SELECT: 2" in perf_stats # Only successful queries are counted in query_types
assert "Error Rate" in perf_stats
# Verify query durations are tracked
assert len(handler.stats.query_durations) == 2 # Only successful queries have durations recorded
assert len(handler.stats.query_types) == 1 # Only SELECT queries
# Verify error tracking
assert handler.stats.error_count == 1
assert "ConnectionHandlerError" in handler.stats.error_types
@pytest.mark.asyncio
async def test_query_analysis(sqlite_db, mcp_config):
"""Test query analysis functionality"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
async with server.get_handler("test_sqlite") as handler:
# Get execution plan
explain_result = await handler.explain_query("SELECT * FROM products")
assert explain_result # Should not be empty
# Execute query and measure time
start_time = datetime.now()
await handler.execute_query("SELECT * FROM products")
duration = (datetime.now() - start_time).total_seconds()
# Verify query duration is reasonable
assert duration > 0
assert duration < 1.0 # Should be fast for a simple query
# Verify query type tracking
assert "SELECT" in handler.stats.query_types
assert handler.stats.query_types["SELECT"] == 1