Skip to main content
Glama
test_tool_response_snapshots.py9.54 kB
"""Snapshot tests for MCP tool response schemas using inline-snapshot. These tests ensure that tool response structures remain stable and don't change unexpectedly, which would break client integrations. """ from __future__ import annotations from unittest.mock import Mock import pytest from inline_snapshot import snapshot from igloo_mcp.config import Config, SnowflakeConfig from igloo_mcp.living_reports.service import ReportService from igloo_mcp.mcp.tools.create_report import CreateReportTool from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool from igloo_mcp.service_layer.query_service import QueryService class TestExecuteQueryResponseSchema: """Snapshot tests for execute_query tool response structure.""" @pytest.mark.asyncio async def test_success_response_keys(self, tmp_path, monkeypatch): """Verify execute_query success response has stable schema.""" monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled") config = Config.from_env() snowflake_service = Mock() query_service = Mock(spec=QueryService) tool = ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=query_service, health_monitor=None, ) # Mock the sync execution tool._execute_query_sync = Mock( return_value={ "statement": "SELECT 1", "rowcount": 1, "rows": [[1]], "duration_ms": 10, } ) result = await tool.execute( statement="SELECT 1", reason="Snapshot test", response_mode="full", ) # Verify response schema stability assert set(result.keys()) == snapshot( { "audit_info", "cache", "duration_ms", "insights", "key_metrics", "rowcount", "rows", "session_context", "statement", } ) # Verify nested structures assert set(result["cache"].keys()) == snapshot({"cache_key", "hit"}) assert set(result["audit_info"].keys()) == snapshot( { "artifact_root", "artifacts", "cache", "cache_hit", "execution_id", "history_enabled", "history_path", "session_context", "sql_sha256", } ) @pytest.mark.asyncio async def test_cache_metadata_structure(self, tmp_path, monkeypatch): """Verify cache-related metadata has stable structure.""" monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled") config = Config.from_env() tool = ExecuteQueryTool( config=config, snowflake_service=Mock(), query_service=Mock(spec=QueryService), health_monitor=None, ) tool._execute_query_sync = Mock( return_value={ "statement": "SELECT 1", "rowcount": 1, "rows": [[1]], "duration_ms": 10, } ) result = await tool.execute( statement="SELECT 1", reason="Cache test", ) # Cache info should always have these fields cache_info = result["cache"] assert "hit" in cache_info assert isinstance(cache_info["hit"], bool) class TestCreateReportResponseSchema: """Snapshot tests for create_report tool response structure.""" @pytest.mark.asyncio async def test_create_report_success_schema(self, tmp_path): """Verify create_report success response has stable schema.""" config = Config(snowflake=SnowflakeConfig(profile="TEST_PROFILE")) report_service = ReportService(reports_root=tmp_path / "reports") tool = CreateReportTool(config, report_service) result = await tool.execute(title="Test Report") # Verify response schema assert set(result.keys()) == snapshot( { "insight_ids_added", "message", "report_id", "request_id", "section_ids_added", "status", "tags", "template", "timing", "title", } ) assert result["status"] == "success" assert isinstance(result["report_id"], str) assert isinstance(result["title"], str) @pytest.mark.asyncio async def test_create_report_with_template_schema(self, tmp_path): """Verify response schema when creating report with template.""" config = Config(snowflake=SnowflakeConfig(profile="TEST_PROFILE")) report_service = ReportService(reports_root=tmp_path / "reports") tool = CreateReportTool(config, report_service) result = await tool.execute( title="Analyst Report", template="analyst_v1", ) # Schema should be same regardless of template assert set(result.keys()) == snapshot( { "insight_ids_added", "message", "report_id", "request_id", "section_ids_added", "status", "tags", "template", "timing", "title", } ) class TestErrorResponseSchema: """Snapshot tests for error response structures.""" def test_validation_error_structure(self): """Verify validation error response has stable structure.""" from igloo_mcp.mcp.exceptions import MCPValidationError error = MCPValidationError( "Test validation error", validation_errors=["field1: error1", "field2: error2"], context={"request_id": "test-123"}, ) error_dict = error.to_dict() # Verify error response structure assert set(error_dict.keys()) == snapshot( {"context", "error_code", "error_type", "message", "validation_errors"} ) assert error_dict["error_type"] == "MCPValidationError" assert isinstance(error_dict["validation_errors"], list) def test_execution_error_structure(self): """Verify execution error response has stable structure.""" from igloo_mcp.mcp.exceptions import MCPExecutionError error = MCPExecutionError( "Test execution error", operation="test_op", context={"request_id": "test-456", "detail": "error detail"}, ) error_dict = error.to_dict() assert set(error_dict.keys()) == snapshot({"context", "error_code", "error_type", "message", "operation"}) assert error_dict["error_type"] == "MCPExecutionError" assert error_dict["operation"] == "test_op" class TestKeyMetricsSchema: """Snapshot tests for key_metrics structure in query results.""" @pytest.mark.asyncio async def test_key_metrics_structure(self, monkeypatch): """Verify key_metrics has stable structure.""" monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") monkeypatch.setenv("IGLOO_MCP_CACHE_MODE", "disabled") config = Config.from_env() tool = ExecuteQueryTool( config=config, snowflake_service=Mock(), query_service=Mock(spec=QueryService), health_monitor=None, ) # Mock with structured data tool._execute_query_sync = Mock( return_value={ "statement": "SELECT amount FROM sales", "rowcount": 3, "rows": [ {"amount": 100}, {"amount": 200}, {"amount": 150}, ], "duration_ms": 50, } ) result = await tool.execute( statement="SELECT amount FROM sales", reason="Metrics test", ) # Verify key_metrics structure metrics = result.get("key_metrics") assert metrics is not None assert set(metrics.keys()) == snapshot( {"columns", "num_columns", "sampled_rows", "total_rows", "truncated_output"} ) # Verify column metadata structure if metrics["columns"]: first_col = metrics["columns"][0] assert set(first_col.keys()) == snapshot({"avg", "kind", "max", "min", "name", "non_null_ratio"}) class TestPostQueryInsightSchema: """Snapshot tests for post_query_insight structure.""" def test_structured_insight_format(self): """Verify structured post_query_insight has stable schema.""" # PostQueryInsight class was removed - skip this test pytest.skip("PostQueryInsight class no longer exists") insight_dict = {} # Verify structure assert set(insight_dict.keys()) == snapshot( { "summary", "key_metrics", "business_impact", "follow_up_needed", } ) assert isinstance(insight_dict["key_metrics"], list) assert isinstance(insight_dict["follow_up_needed"], bool)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server