Skip to main content
Glama
cache_fixture_builder.py7.09 kB
"""Utility to generate deterministic cache history fixtures using FakeSnowflake.""" from __future__ import annotations import asyncio import json import os import shutil import uuid from contextlib import contextmanager from pathlib import Path from unittest.mock import patch from igloo_mcp.cache.query_result_cache import QueryResultCache from igloo_mcp.config import Config, SnowflakeConfig from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool from igloo_mcp.service_layer.query_service import QueryService from tests.helpers.fake_snowflake_connector import ( FakeQueryPlan, FakeSessionDefaults, FakeSnowflakeService, ) class _TimeSequence: """Iterator-friendly helper to supply deterministic timestamps.""" def __init__(self, values: list[float]) -> None: self._values = iter(values) self._last = values[-1] if values else 0.0 def __call__(self) -> float: try: self._last = next(self._values) except StopIteration: self._last += 0.5 return self._last @contextmanager def _patched_time(values: list[float]): seq = _TimeSequence(values) def fake_sleep(_seconds: float) -> None: # No-op; time progression is controlled by the sequence. return None with ( patch("time.time", side_effect=seq), patch("time.sleep", side_effect=fake_sleep), ): yield def generate_cache_fixture(output_dir: Path) -> dict[str, Path]: """Produce deterministic cache/history artifacts for testing. Args: output_dir: Directory where artifacts will be written. Returns: Mapping with key file paths (history, manifest, rows, sql, csv). """ output_dir = output_dir.resolve() history_path = output_dir / "history" / "doc.jsonl" artifact_root = output_dir / "artifacts" cache_root = artifact_root / "cache" if history_path.exists(): history_path.unlink() if artifact_root.exists(): shutil.rmtree(artifact_root) history_path.parent.mkdir(parents=True, exist_ok=True) artifact_root.mkdir(parents=True, exist_ok=True) env = { "IGLOO_MCP_QUERY_HISTORY": str(history_path), "IGLOO_MCP_ARTIFACT_ROOT": str(artifact_root), "IGLOO_MCP_CACHE_ROOT": str(cache_root), "IGLOO_MCP_CACHE_MODE": "enabled", "IGLOO_MCP_CACHE_MAX_ROWS": "5000", } config = Config(snowflake=SnowflakeConfig(profile="fixture_profile")) service = FakeSnowflakeService( [ FakeQueryPlan( statement="SELECT month, total_revenue FROM fixture_source", rows=[ {"MONTH": "2024-01", "TOTAL_REVENUE": 125000.25}, {"MONTH": "2024-02", "TOTAL_REVENUE": 132500.75}, ], duration=0.02, sfqid="FIXTURE_QID_001", ) ], session_defaults=FakeSessionDefaults( role="FIXTURE_ROLE", warehouse="FIXTURE_WH", database="FIXTURE_DB", schema="ANALYTICS", ), ) async def _run() -> None: tool = ExecuteQueryTool( config=config, snowflake_service=service, query_service=QueryService(context=None), health_monitor=None, ) _ = await tool.execute( statement="SELECT month, total_revenue FROM fixture_source", warehouse="FIXTURE_WH", timeout_seconds=120, reason="Fixture baseline history", post_query_insight={ "summary": "Fixture revenue sample", "key_metrics": ["jan_revenue:125000.25", "feb_revenue:132500.75"], "business_impact": "Used for unit testing", "follow_up_needed": False, }, ) # Second execution should hit the cache immediately. _ = await tool.execute( statement="SELECT month, total_revenue FROM fixture_source", warehouse="FIXTURE_WH", timeout_seconds=120, reason="Fixture baseline history", ) time_sequence = [ 1700000000.0, # initial requested_ts 1700000000.05, # started 1700000000.10, # deadline base 1700000000.12, # loop check (below deadline) 1700000000.18, # loop check (exceeds deadline -> exit) 1700000000.22, # duration calculation 1700000000.30, # completed ts for success 1700000100.0, # cache-hit requested_ts ] uuid_values = [ uuid.UUID("11111111111111111111111111111111"), uuid.UUID("22222222222222222222222222222222"), ] cache_isos = [ "2024-01-01T00:00:00+00:00", "2024-01-01T00:10:00+00:00", ] with ( patch.dict(os.environ, env, clear=False), _patched_time(time_sequence), patch("uuid.uuid4", side_effect=uuid_values), patch.object(QueryResultCache, "_iso_now", side_effect=cache_isos), ): asyncio.run(_run()) manifest_paths = list(cache_root.rglob("manifest.json")) if not manifest_paths: raise RuntimeError("Failed to locate cache manifest for fixture generation") manifest_path = manifest_paths[0] cache_dir = manifest_path.parent sql_files = list((artifact_root / "queries" / "by_sha").rglob("*.sql")) if not sql_files: raise RuntimeError("Failed to locate SQL artifact for fixture generation") _normalize_history(history_path, output_dir) return { "history": history_path, "manifest": manifest_path, "rows_jsonl": cache_dir / "rows.jsonl", "rows_csv": cache_dir / "rows.csv", "sql": sql_files[0], } def load_jsonl(path: Path) -> list[dict[str, object]]: records: list[dict[str, object]] = [] with path.open("r", encoding="utf-8") as fh: for line in fh: line = line.strip() if line: records.append(json.loads(line)) return records def _normalize_history(history_path: Path, base_dir: Path) -> None: records = load_jsonl(history_path) base_resolved = base_dir.resolve() def _normalize_value(value: str) -> str: candidate = Path(value) try: rel = candidate.resolve().relative_to(base_resolved) return rel.as_posix() except (ValueError, OSError): # Path is not relative to base or doesn't exist, return as-is return value for record in records: if cache_manifest := record.get("cache_manifest"): record["cache_manifest"] = _normalize_value(cache_manifest) artifacts = record.get("artifacts") if isinstance(artifacts, dict): for key, value in list(artifacts.items()): if isinstance(value, str): artifacts[key] = _normalize_value(value) with history_path.open("w", encoding="utf-8") as fh: for record in records: fh.write(json.dumps(record, ensure_ascii=False)) fh.write("\n")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server