Skip to main content
Glama
test_execute_query_failures.py5.68 kB
"""ExecuteQueryTool failure-path coverage.""" from __future__ import annotations from pathlib import Path from typing import Any import pytest from igloo_mcp.config import Config from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool from igloo_mcp.snow_cli import QueryOutput from tests.helpers.fake_snowflake_connector import FakeQueryPlan, FakeSnowflakeService class StubQueryService: def session_from_mapping(self, mapping): # noqa: D401 return {k: v for k, v in mapping.items() if v} def execute_with_service(self, *args, **kwargs): # noqa: D401 return QueryOutput("", "", 0, rows=[{"value": 1}], columns=["value"]) class StubHealthMonitor: class ProfileHealth: def __init__(self, is_valid: bool, error: str = "invalid") -> None: self.is_valid = is_valid self.validation_error = error self.available_profiles = ["DEFAULT"] def __init__(self, valid: bool = True): self.valid = valid self.errors: list[str] = [] def get_profile_health(self, profile: str, use_cache: bool): # noqa: ARG002 return self.ProfileHealth(self.valid) def record_error(self, message: str) -> None: self.errors.append(message) def _tool( config: Config, snowflake_service: FakeSnowflakeService, **kwargs: Any ) -> ExecuteQueryTool: return ExecuteQueryTool( config=config, snowflake_service=snowflake_service, query_service=StubQueryService(), **kwargs, ) @pytest.mark.asyncio async def test_sql_validation_failure(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "off") monkeypatch.setenv("IGLOO_MCP_ARTIFACT_ROOT", str(tmp_path / "artifacts")) config = Config.from_env() service = FakeSnowflakeService([FakeQueryPlan(statement="DELETE", rows=None)]) with pytest.raises(ValueError) as exc_info: await _tool(config, service).execute(statement="DELETE FROM t") assert "not permitted" in str(exc_info.value) @pytest.mark.asyncio async def test_profile_validation_failure( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "off") monkeypatch.setenv("IGLOO_MCP_ARTIFACT_ROOT", str(tmp_path / "artifacts")) config = Config.from_env() service = FakeSnowflakeService( [FakeQueryPlan(statement="SELECT 1", rows=[{"v": 1}])] ) monitor = StubHealthMonitor(valid=False) with pytest.raises(ValueError) as exc_info: await _tool(config, service, health_monitor=monitor).execute( statement="SELECT 1" ) assert "profile validation failed" in str(exc_info.value).lower() assert monitor.errors @pytest.mark.asyncio async def test_cache_lookup_failure_records_warning( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "off") monkeypatch.setenv("IGLOO_MCP_ARTIFACT_ROOT", str(tmp_path / "artifacts")) config = Config.from_env() plan = FakeQueryPlan( statement="SELECT 1", rows=[{"value": 1}], duration=0.01, ) service = FakeSnowflakeService([plan]) tool = _tool(config, service) class ExplodingCache: enabled = True mode = "enabled" root = None def pop_warnings(self): return [] def compute_cache_key(self, **kwargs): # noqa: D401 return "abc" def lookup(self, cache_key): # noqa: D401 raise RuntimeError("boom") def store(self, *a, **kw): # noqa: D401 return None tool.cache = ExplodingCache() tool._cache_enabled = True result = await tool.execute(statement="SELECT 1") warnings = result["audit_info"].get("warnings", []) assert any("cache" in msg for msg in warnings) @pytest.mark.asyncio async def test_history_write_failure_is_swallowed( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ): monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", str(tmp_path / "history.jsonl")) monkeypatch.setenv("IGLOO_MCP_ARTIFACT_ROOT", str(tmp_path / "artifacts")) config = Config.from_env() plan = FakeQueryPlan(statement="SELECT 1", rows=[{"value": 1}]) service = FakeSnowflakeService([plan]) tool = _tool(config, service) def fail_record(_payload): raise RuntimeError("history write failed") tool.history.record = fail_record await tool.execute(statement="SELECT 1") assert True # no exception raised @pytest.mark.asyncio async def test_cache_store_failure(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("IGLOO_MCP_ARTIFACT_ROOT", str(tmp_path / "artifacts")) monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "off") config = Config.from_env() plan = FakeQueryPlan(statement="SELECT 1", rows=[{"value": 1}], duration=0.01) service = FakeSnowflakeService([plan]) tool = _tool(config, service) class FailingCache: enabled = True mode = "enabled" def __init__(self, root: Path) -> None: self.root = root def pop_warnings(self): return [] def compute_cache_key(self, **kwargs): return "abc" def lookup(self, cache_key): return None def store(self, *args, **kwargs): # noqa: D401 raise RuntimeError("store failed") tool.cache = FailingCache(tmp_path / "cache") tool._cache_enabled = True result = await tool.execute(statement="SELECT 1") warnings = result["audit_info"].get("warnings", []) assert any("persist query cache" in msg for msg in warnings)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server