Skip to main content
Glama
test_parallel.py6.2 kB
"""Tests for parallel Snowflake CLI execution helpers.""" from __future__ import annotations from typing import Dict, List import pytest from igloo_mcp.parallel import ( ParallelQueryConfig, ParallelQueryExecutor, QueryResult, query_multiple_objects, ) from igloo_mcp.snow_cli import QueryOutput class StubCLI: def __init__(self, responses: List[QueryOutput | Exception]): self._responses = responses self.calls: List[str] = [] def run_query(self, query: str, *args, **kwargs) -> QueryOutput: self.calls.append(query) response = self._responses.pop(0) if isinstance(response, Exception): raise response return response def _patch_snow_cli(monkeypatch: pytest.MonkeyPatch, cli: StubCLI) -> None: monkeypatch.setattr("igloo_mcp.parallel.SnowCLI", lambda: cli) def test_execute_single_query_success(monkeypatch: pytest.MonkeyPatch): cli = StubCLI( [ QueryOutput( raw_stdout="", raw_stderr="", returncode=0, rows=[{"col1": "1"}, {"col1": "2"}], columns=["col1"], ) ] ) _patch_snow_cli(monkeypatch, cli) executor = ParallelQueryExecutor( ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=1, timeout_seconds=60 ) ) result = executor.execute_single_query("SELECT 1", object_name="test") assert result.success is True assert result.row_count == 2 assert result.rows[0]["col1"] == "1" def test_execute_single_query_retries(monkeypatch: pytest.MonkeyPatch): cli = StubCLI( [ RuntimeError("first failure"), QueryOutput( raw_stdout="", raw_stderr="", returncode=0, rows=[], columns=[], ), ] ) _patch_snow_cli(monkeypatch, cli) config = ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=2, retry_delay=0, timeout_seconds=10, ) executor = ParallelQueryExecutor(config) result = executor.execute_single_query("SELECT 1", object_name="retry") assert result.success is True assert result.row_count == 0 assert len(cli.calls) == 2 def test_execute_single_query_exhausts_retries(monkeypatch: pytest.MonkeyPatch): cli = StubCLI( [ RuntimeError("fail 1"), RuntimeError("fail 2"), ] ) _patch_snow_cli(monkeypatch, cli) config = ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=2, retry_delay=0, timeout_seconds=5, ) executor = ParallelQueryExecutor(config) result = executor.execute_single_query("SELECT 1", object_name="fail") assert result.success is False assert "fail 2" in result.error @pytest.mark.asyncio async def test_execute_queries_async(monkeypatch: pytest.MonkeyPatch): cli = StubCLI( [ QueryOutput("", "", 0, rows=[{"col": "a"}], columns=["col"]), QueryOutput("", "", 0, rows=[{"col": "b"}], columns=["col"]), ] ) _patch_snow_cli(monkeypatch, cli) executor = ParallelQueryExecutor( ParallelQueryConfig( max_concurrent_queries=2, retry_attempts=1, timeout_seconds=30 ) ) queries: Dict[str, str] = {"q1": "SELECT 'a'", "q2": "SELECT 'b'"} results = await executor.execute_queries_async(queries) assert results["q1"].success and results["q2"].success assert len(cli.calls) == 2 def test_query_multiple_objects(monkeypatch: pytest.MonkeyPatch): results = { "obj": QueryResult( object_name="obj", query="SELECT 1", success=True, rows=[{"col": "1"}], execution_time=0.1, row_count=1, ) } def fake_execute(self, queries): # noqa: D401 return results monkeypatch.setattr( ParallelQueryExecutor, "execute_queries", fake_execute, raising=False, ) summary = query_multiple_objects({"obj": "SELECT 1"}, max_concurrent=1) assert summary["obj"].success is True def test_execute_single_query_parses_json(monkeypatch: pytest.MonkeyPatch): cli = StubCLI( [ QueryOutput( raw_stdout="", raw_stderr="", returncode=0, rows=[ {"object_json": '{"foo": 1}'}, {"object_json": 5}, ], columns=["object_json"], ) ] ) _patch_snow_cli(monkeypatch, cli) executor = ParallelQueryExecutor( ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=1, timeout_seconds=30 ) ) result = executor.execute_single_query( "SELECT object_json", object_name="json_test" ) assert result.success is True assert result.json_data == [{"foo": 1}] def test_execute_single_query_zero_retries(monkeypatch: pytest.MonkeyPatch): executor = ParallelQueryExecutor( ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=0, timeout_seconds=1 ) ) cli = StubCLI([]) _patch_snow_cli(monkeypatch, cli) with pytest.raises(RuntimeError): executor._execute_single_query("SELECT 1", "no_retries", cli) @pytest.mark.asyncio async def test_execute_queries_async_handles_future_exception( monkeypatch: pytest.MonkeyPatch, ): _patch_snow_cli(monkeypatch, StubCLI([])) async_executor = ParallelQueryExecutor( ParallelQueryConfig( max_concurrent_queries=1, retry_attempts=1, timeout_seconds=5 ) ) def boom(self, query, object_name, cli): # noqa: D401 raise RuntimeError("boom") monkeypatch.setattr( ParallelQueryExecutor, "_execute_single_query", boom, raising=False, ) results = await async_executor.execute_queries_async({"obj": "SELECT 1"}) assert results["obj"].success is False assert "Unexpected error" in results["obj"].error

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server