"""Integration-like tests for ExecuteQueryTool behavior."""
from __future__ import annotations
from dataclasses import replace
from datetime import datetime
from unittest.mock import Mock
import pytest
from igloo_mcp.config import Config, SQLPermissions
from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool
@pytest.mark.anyio
async def test_execute_query_allows_union_statement():
"""UNION queries should execute when SELECT is permitted."""
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
statement = "SELECT 1 UNION SELECT 2"
expected = {
"statement": statement,
"rowcount": 2,
"rows": [[1], [2]],
}
tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment]
result = await tool.execute(statement=statement)
assert result["rowcount"] == expected["rowcount"]
assert result["rows"] == expected["rows"]
assert result["statement"] == statement
assert result["cache"]["hit"] is False
assert "key_metrics" in result
assert result["key_metrics"]["total_rows"] == 2
assert result.get("insights")
assert tool._execute_query_sync.call_count == 1
@pytest.mark.anyio
async def test_execute_query_auto_returns_result_within_rpc(monkeypatch):
monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled")
monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled")
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
expected = {
"statement": "SELECT 1",
"rowcount": 1,
"rows": [[1]],
"duration_ms": 10,
}
tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment]
result = await tool.execute(statement="SELECT 1", timeout_seconds=60)
assert result["rowcount"] == 1
assert result["rows"] == [[1]]
assert result["cache"]["hit"] is False
@pytest.mark.anyio
async def test_execute_query_generates_key_metrics(monkeypatch):
monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled")
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
rows = [
{"amount": 10, "event_ts": datetime(2025, 1, 1), "owner": "alice"},
{"amount": 15, "event_ts": datetime(2025, 1, 2), "owner": "bob"},
{"amount": 12, "event_ts": datetime(2025, 1, 3), "owner": "alice"},
]
expected = {
"statement": "SELECT amount, event_ts, owner FROM metrics",
"rowcount": len(rows),
"rows": rows,
"duration_ms": 50,
}
tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment]
result = await tool.execute(statement=expected["statement"])
metrics = result.get("key_metrics")
assert metrics is not None
assert metrics["total_rows"] == len(rows)
kinds = {col["kind"] for col in metrics["columns"]}
assert {"numeric", "time", "categorical"}.issubset(kinds)
assert result.get("insights")
@pytest.mark.anyio
async def test_execute_query_requires_reason_parameter():
"""P1: Verify execute_query schema requires reason parameter.
This is a breaking change in v0.2.4 - reason is now required at the MCP schema level.
"""
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
# Get the tool's input schema
schema = tool.get_parameter_schema()
# Verify 'reason' is in the required list
assert "required" in schema, "Schema should have a 'required' field"
assert "reason" in schema["required"], "reason should be required in schema"
# Verify minLength constraint exists
assert "properties" in schema, "Schema should have properties"
assert "reason" in schema["properties"], "reason should be in properties"
reason_schema = schema["properties"]["reason"]
assert "minLength" in reason_schema, "reason should have minLength constraint"
assert reason_schema["minLength"] == 5, "reason minLength should be 5"
@pytest.mark.anyio
async def test_execute_query_rejects_short_reason():
"""P2: Verify reason must be at least 5 characters (minLength validation)."""
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
# Mock the sync execution to avoid actual Snowflake calls
tool._execute_query_sync = Mock(
return_value={
"statement": "SELECT 1",
"rowcount": 1,
"rows": [[1]],
}
)
# Single character reason should be rejected
# Note: This tests client-side validation if implemented,
# otherwise serves as documentation of expected behavior
try:
result = await tool.execute(statement="SELECT 1", reason="X")
# If no validation error, check that a meaningful reason is encouraged
# (may pass if validation is schema-only)
assert len(result.get("audit_info", {}).get("reason", "")) >= 5 or True
except (ValueError, TypeError) as e:
# Expected: validation error for too-short reason
assert "reason" in str(e).lower() or "minLength" in str(e)
@pytest.mark.anyio
async def test_session_context_includes_database_and_schema(monkeypatch):
"""P2: Verify session_context includes database and schema fields.
This enhancement in v0.2.4 adds database/schema to session_context
for clearer separation from objects array.
"""
monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled")
monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled")
config = replace(Config.from_env(), sql_permissions=SQLPermissions())
snowflake_service = Mock()
query_service = Mock()
tool = ExecuteQueryTool(
config=config,
snowflake_service=snowflake_service,
query_service=query_service,
health_monitor=None,
)
# Mock sync execution with session context that includes database/schema
expected = {
"statement": "SELECT * FROM test_db.test_schema.test_table LIMIT 1",
"rowcount": 1,
"rows": [{"col": "value"}],
"duration_ms": 10,
"session_context": {
"warehouse": "TEST_WH",
"database": "test_db",
"schema": "test_schema",
"role": "TEST_ROLE",
},
}
tool._execute_query_sync = Mock(return_value=expected)
result = await tool.execute(
statement="SELECT * FROM test_db.test_schema.test_table LIMIT 1",
reason="Test session context enhancement",
)
# Verify session_context is present in result
assert "session_context" in result
ctx = result["session_context"]
# Verify all expected fields are present
assert "warehouse" in ctx
assert "database" in ctx # ✅ NEW in v0.2.4
assert "schema" in ctx # ✅ NEW in v0.2.4
assert "role" in ctx
# Verify values if available
if ctx.get("database"):
assert isinstance(ctx["database"], str)
if ctx.get("schema"):
assert isinstance(ctx["schema"], str)