mcp-dbutils
by donghao1393
- tests
- integration
"""Integration tests for advanced database tools"""
import asyncio
import tempfile
import anyio
import mcp.types as types
import pytest
import yaml
from mcp import ClientSession
from mcp_dbutils.base import ConnectionServer
from mcp_dbutils.log import create_logger
# 创建测试用的 logger
logger = create_logger("test-tools-advanced", True) # debug=True 以显示所有日志
@pytest.mark.asyncio
async def test_get_stats_tool(postgres_db, sqlite_db, mcp_config):
"""Test the dbutils-get-stats tool with both PostgreSQL and SQLite connections"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
# Create bidirectional streams
client_to_server_send, client_to_server_recv = anyio.create_memory_object_stream[types.JSONRPCMessage | Exception](10)
server_to_client_send, server_to_client_recv = anyio.create_memory_object_stream[types.JSONRPCMessage](10)
# Start server in background
server_task = asyncio.create_task(
server.server.run(
client_to_server_recv,
server_to_client_send,
server.server.create_initialization_options(),
raise_exceptions=True
)
)
try:
# Initialize client session
client = ClientSession(server_to_client_recv, client_to_server_send)
async with client:
await client.initialize()
# Verify tool is available
response = await client.list_tools()
tool_names = [tool.name for tool in response.tools]
assert "dbutils-get-stats" in tool_names
# Test PostgreSQL stats
pg_args = {"connection": "test_pg", "table": "users"}
result = await client.call_tool("dbutils-get-stats", pg_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[postgres]" in result.content[0].text
assert "Table Statistics for users" in result.content[0].text
# Test SQLite stats
sqlite_args = {"connection": "test_sqlite", "table": "products"}
result = await client.call_tool("dbutils-get-stats", sqlite_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[sqlite]" in result.content[0].text
assert "Table Statistics for products" in result.content[0].text
finally:
# Cleanup
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
await client_to_server_send.aclose()
await client_to_server_recv.aclose()
await server_to_client_send.aclose()
await server_to_client_recv.aclose()
@pytest.mark.asyncio
async def test_list_constraints_tool(postgres_db, sqlite_db, mcp_config):
"""Test the dbutils-list-constraints tool with both PostgreSQL and SQLite connections"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
client_to_server_send, client_to_server_recv = anyio.create_memory_object_stream[types.JSONRPCMessage | Exception](10)
server_to_client_send, server_to_client_recv = anyio.create_memory_object_stream[types.JSONRPCMessage](10)
server_task = asyncio.create_task(
server.server.run(
client_to_server_recv,
server_to_client_send,
server.server.create_initialization_options(),
raise_exceptions=True
)
)
try:
client = ClientSession(server_to_client_recv, client_to_server_send)
async with client:
await client.initialize()
response = await client.list_tools()
tool_names = [tool.name for tool in response.tools]
assert "dbutils-list-constraints" in tool_names
# Test PostgreSQL constraints
pg_args = {"connection": "test_pg", "table": "users"}
result = await client.call_tool("dbutils-list-constraints", pg_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[postgres]" in result.content[0].text
assert "Constraints for users" in result.content[0].text
# Test SQLite constraints
sqlite_args = {"connection": "test_sqlite", "table": "products"}
result = await client.call_tool("dbutils-list-constraints", sqlite_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[sqlite]" in result.content[0].text
assert "Constraints for products" in result.content[0].text
finally:
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
await client_to_server_send.aclose()
await client_to_server_recv.aclose()
await server_to_client_send.aclose()
await server_to_client_recv.aclose()
@pytest.mark.asyncio
async def test_explain_query_tool(postgres_db, sqlite_db, mcp_config):
"""Test the dbutils-explain-query tool with both PostgreSQL and SQLite connections"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml') as tmp:
yaml.dump(mcp_config, tmp)
tmp.flush()
server = ConnectionServer(config_path=tmp.name)
client_to_server_send, client_to_server_recv = anyio.create_memory_object_stream[types.JSONRPCMessage | Exception](10)
server_to_client_send, server_to_client_recv = anyio.create_memory_object_stream[types.JSONRPCMessage](10)
server_task = asyncio.create_task(
server.server.run(
client_to_server_recv,
server_to_client_send,
server.server.create_initialization_options(),
raise_exceptions=True
)
)
try:
client = ClientSession(server_to_client_recv, client_to_server_send)
async with client:
await client.initialize()
response = await client.list_tools()
tool_names = [tool.name for tool in response.tools]
assert "dbutils-explain-query" in tool_names
# Test PostgreSQL explain
pg_args = {
"connection": "test_pg",
"sql": "SELECT * FROM users WHERE id > 0"
}
result = await client.call_tool("dbutils-explain-query", pg_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[postgres]" in result.content[0].text
assert "Query Execution Plan" in result.content[0].text
# Test SQLite explain
sqlite_args = {
"connection": "test_sqlite",
"sql": "SELECT * FROM products WHERE id > 0"
}
result = await client.call_tool("dbutils-explain-query", sqlite_args)
assert len(result.content) == 1
assert result.content[0].type == "text"
assert "[sqlite]" in result.content[0].text
assert "Query Execution Plan" in result.content[0].text
finally:
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
await client_to_server_send.aclose()
await client_to_server_recv.aclose()
await server_to_client_send.aclose()
await server_to_client_recv.aclose()