Skip to main content
Glama
test_execute_query_tool.py8.06 kB
"""Integration-like tests for ExecuteQueryTool behavior.""" from __future__ import annotations from dataclasses import replace from datetime import datetime from unittest.mock import Mock import pytest from igloo_mcp.config import Config, SQLPermissions from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool @pytest.mark.anyio async def test_execute_query_allows_union_statement(): """UNION queries should execute when SELECT is permitted.""" config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) statement = "SELECT 1 UNION SELECT 2" expected = { "statement": statement, "rowcount": 2, "rows": [[1], [2]], } tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment] result = await tool.execute(statement=statement) assert result["rowcount"] == expected["rowcount"] assert result["rows"] == expected["rows"] assert result["statement"] == statement assert result["cache"]["hit"] is False assert "key_metrics" in result assert result["key_metrics"]["total_rows"] == 2 assert result.get("insights") assert tool._execute_query_sync.call_count == 1 @pytest.mark.anyio async def test_execute_query_auto_returns_result_within_rpc(monkeypatch): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled") config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) expected = { "statement": "SELECT 1", "rowcount": 1, "rows": [[1]], "duration_ms": 10, } tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment] result = await tool.execute(statement="SELECT 1", timeout_seconds=60) assert result["rowcount"] == 1 assert result["rows"] == [[1]] assert result["cache"]["hit"] is False @pytest.mark.anyio async def test_execute_query_generates_key_metrics(monkeypatch): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) rows = [ {"amount": 10, "event_ts": datetime(2025, 1, 1), "owner": "alice"}, {"amount": 15, "event_ts": datetime(2025, 1, 2), "owner": "bob"}, {"amount": 12, "event_ts": datetime(2025, 1, 3), "owner": "alice"}, ] expected = { "statement": "SELECT amount, event_ts, owner FROM metrics", "rowcount": len(rows), "rows": rows, "duration_ms": 50, } tool._execute_query_sync = Mock(return_value=expected) # type: ignore[assignment] result = await tool.execute(statement=expected["statement"]) metrics = result.get("key_metrics") assert metrics is not None assert metrics["total_rows"] == len(rows) kinds = {col["kind"] for col in metrics["columns"]} assert {"numeric", "time", "categorical"}.issubset(kinds) assert result.get("insights") @pytest.mark.anyio async def test_execute_query_requires_reason_parameter(): """P1: Verify execute_query schema requires reason parameter. This is a breaking change in v0.2.4 - reason is now required at the MCP schema level. """ config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) # Get the tool's input schema schema = tool.get_parameter_schema() # Verify 'reason' is in the required list assert "required" in schema, "Schema should have a 'required' field" assert "reason" in schema["required"], "reason should be required in schema" # Verify minLength constraint exists assert "properties" in schema, "Schema should have properties" assert "reason" in schema["properties"], "reason should be in properties" reason_schema = schema["properties"]["reason"] assert "minLength" in reason_schema, "reason should have minLength constraint" assert reason_schema["minLength"] == 5, "reason minLength should be 5" @pytest.mark.anyio async def test_execute_query_rejects_short_reason(): """P2: Verify reason must be at least 5 characters (minLength validation).""" config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) # Mock the sync execution to avoid actual Snowflake calls tool._execute_query_sync = Mock( return_value={ "statement": "SELECT 1", "rowcount": 1, "rows": [[1]], } ) # Single character reason should be rejected # Note: This tests client-side validation if implemented, # otherwise serves as documentation of expected behavior try: result = await tool.execute(statement="SELECT 1", reason="X") # If no validation error, check that a meaningful reason is encouraged # (may pass if validation is schema-only) assert len(result.get("audit_info", {}).get("reason", "")) >= 5 or True except (ValueError, TypeError) as e: # Expected: validation error for too-short reason assert "reason" in str(e).lower() or "minLength" in str(e) @pytest.mark.anyio async def test_session_context_includes_database_and_schema(monkeypatch): """P2: Verify session_context includes database and schema fields. This enhancement in v0.2.4 adds database/schema to session_context for clearer separation from objects array. """ monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled") config = replace(Config.from_env(), sql_permissions=SQLPermissions()) snowflake_service = Mock() query_service = Mock() tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) # Mock sync execution with session context that includes database/schema expected = { "statement": "SELECT * FROM test_db.test_schema.test_table LIMIT 1", "rowcount": 1, "rows": [{"col": "value"}], "duration_ms": 10, "session_context": { "warehouse": "TEST_WH", "database": "test_db", "schema": "test_schema", "role": "TEST_ROLE", }, } tool._execute_query_sync = Mock(return_value=expected) result = await tool.execute( statement="SELECT * FROM test_db.test_schema.test_table LIMIT 1", reason="Test session context enhancement", ) # Verify session_context is present in result assert "session_context" in result ctx = result["session_context"] # Verify all expected fields are present assert "warehouse" in ctx assert "database" in ctx # ✅ NEW in v0.2.4 assert "schema" in ctx # ✅ NEW in v0.2.4 assert "role" in ctx # Verify values if available if ctx.get("database"): assert isinstance(ctx["database"], str) if ctx.get("schema"): assert isinstance(ctx["schema"], str)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server