Skip to main content
Glama
test_query_history_comprehensive.py16.7 kB
"""Comprehensive tests for QueryHistory. Tests cover initialization, recording, insight normalization, truncation, concurrency, and edge cases. Target: 48% → 85% coverage """ from __future__ import annotations import json import threading from pathlib import Path import pytest from igloo_mcp.logging.query_history import ( QueryHistory, normalize_insight, truncate_insight_for_storage, ) class TestQueryHistoryInitialization: """Test QueryHistory initialization.""" def test_init_with_path(self, tmp_path): """Initialize with valid path.""" # Arrange history_path = tmp_path / "query_history.jsonl" # Act history = QueryHistory(path=history_path) # Assert assert history.path == history_path assert not history.disabled def test_init_disabled(self): """Initialize with disabled sentinel.""" # Act history = QueryHistory(path=None, disabled=True) # Assert assert history.disabled is True def test_init_creates_directory(self, tmp_path): """Initialize creates parent directory if needed.""" # Arrange nested_path = tmp_path / "nested" / "dir" / "history.jsonl" # Act history = QueryHistory(path=nested_path) history.record({"execution_id": "test", "statement": "SELECT 1"}) # Assert assert nested_path.parent.exists() class TestQueryHistoryRecord: """Test record() functionality.""" def test_record_query_success(self, tmp_path): """Record a query successfully.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) record = { "execution_id": "exec-123", "statement": "SELECT * FROM users", "rowcount": 100, "duration_ms": 150, } # Act history.record(record) # Assert assert history_path.exists() content = history_path.read_text() assert "exec-123" in content assert "SELECT * FROM users" in content def test_record_with_all_fields(self, tmp_path): """Record with all possible fields.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) record = { "execution_id": "exec-456", "sql_sha256": "abc123", "statement": "INSERT INTO table VALUES (1)", "rowcount": 1, "duration_ms": 50, "timestamp": "2024-01-01T00:00:00Z", "cache_hit": False, "post_query_insight": {"summary": "Test insight"}, } # Act history.record(record) # Assert content = history_path.read_text() loaded = json.loads(content) assert loaded["execution_id"] == "exec-456" assert loaded["sql_sha256"] == "abc123" def test_record_with_insight_truncation(self, tmp_path): """Record truncates large insights.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) large_insight = {"summary": "x" * 20000} # Very large record = { "execution_id": "exec-789", "statement": "SELECT 1", "post_query_insight": large_insight, } # Act history.record(record) # Assert - Just verify it doesn't crash and file exists assert history_path.exists() content = history_path.read_text() assert "exec-789" in content def test_record_to_disabled_history(self): """Recording to disabled history is a no-op.""" # Arrange history = QueryHistory(path=None) # Act & Assert - Should not raise history.record({"execution_id": "test"}) def test_record_concurrent_writes(self, tmp_path): """Concurrent writes don't corrupt file.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) def write_record(i): history.record( { "execution_id": f"exec-{i}", "statement": f"SELECT {i}", } ) # Act - Multiple threads writing threads = [threading.Thread(target=write_record, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() # Assert - All records written lines = history_path.read_text().strip().split("\n") assert len(lines) == 10 # Each line should be valid JSON for line in lines: json.loads(line) class TestInsightNormalization: """Test normalize_insight() function.""" def test_normalize_insight_from_string(self): """Normalize string insight to structured format.""" # Act result = normalize_insight("Simple insight text") # Assert assert result["summary"] == "Simple insight text" assert result["key_metrics"] == [] assert result["business_impact"] == "" assert result["follow_up_needed"] is False def test_normalize_insight_from_dict(self): """Normalize dict insight preserves fields.""" # Arrange insight = { "summary": "Test summary", "key_metrics": ["metric1", "metric2"], "business_impact": "High impact", "follow_up_needed": True, } # Act result = normalize_insight(insight) # Assert assert result["summary"] == "Test summary" assert result["key_metrics"] == ["metric1", "metric2"] assert result["business_impact"] == "High impact" assert result["follow_up_needed"] is True def test_normalize_insight_with_partial_dict(self): """Normalize dict with missing fields adds defaults.""" # Arrange insight = {"summary": "Partial insight"} # Act result = normalize_insight(insight) # Assert assert result["summary"] == "Partial insight" assert result["key_metrics"] == [] assert result["business_impact"] == "" assert result["follow_up_needed"] is False class TestInsightTruncation: """Test truncate_insight_for_storage() function.""" def test_truncate_insight_within_limit(self): """Insights within limit are not truncated.""" # Arrange insight = { "summary": "Short summary", "key_metrics": ["m1"], "business_impact": "Low", "follow_up_needed": False, } # Act result = truncate_insight_for_storage(insight, max_bytes=16384) # Assert assert result == insight def test_truncate_large_insight(self): """Large insights are truncated.""" # Arrange insight = { "summary": "x" * 20000, # Very large summary "key_metrics": [], "business_impact": "", "follow_up_needed": False, } # Act result = truncate_insight_for_storage(insight, max_bytes=1000) # Assert # Summary should be truncated assert len(result["summary"]) < 20000 assert result["summary"].endswith("...") def test_truncate_preserves_json_validity(self): """Truncated insight is valid JSON.""" # Arrange insight = { "summary": "Test " + "x" * 5000, # Simple ASCII "key_metrics": [], "business_impact": "", "follow_up_needed": False, } # Act result = truncate_insight_for_storage(insight, max_bytes=1000) # Assert - Should serialize without error serialized = json.dumps(result) assert len(serialized) > 0 # Just verify it serializes class TestQueryHistoryEdgeCases: """Test edge cases and error conditions.""" def test_record_with_none_fields(self, tmp_path): """Record with None values doesn't crash.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) record = { "execution_id": "test", "statement": None, "rowcount": None, } # Act history.record(record) # Assert assert history_path.exists() def test_record_empty_dict(self, tmp_path): """Record empty dict is handled.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) # Act history.record({}) # Assert assert history_path.exists() def test_normalize_insight_with_extra_fields(self): """Normalize preserves extra fields.""" # Arrange insight = { "summary": "Test", "key_metrics": [], "business_impact": "", "follow_up_needed": False, "custom_field": "custom_value", } # Act result = normalize_insight(insight) # Assert assert "custom_field" in result assert result["custom_field"] == "custom_value" class TestQueryHistoryFileOperations: """Test file operation edge cases.""" def test_history_appends_not_overwrites(self, tmp_path): """Multiple records append to file.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) # Act history.record({"execution_id": "first"}) history.record({"execution_id": "second"}) history.record({"execution_id": "third"}) # Assert lines = history_path.read_text().strip().split("\n") assert len(lines) == 3 assert "first" in lines[0] assert "second" in lines[1] assert "third" in lines[2] def test_history_survives_process_restart(self, tmp_path): """History persists across QueryHistory instances.""" # Arrange history_path = tmp_path / "history.jsonl" # Act - First instance history1 = QueryHistory(path=history_path) history1.record({"execution_id": "record1"}) # Second instance (simulates process restart) history2 = QueryHistory(path=history_path) history2.record({"execution_id": "record2"}) # Assert lines = history_path.read_text().strip().split("\n") assert len(lines) == 2 class TestQueryHistoryDisabledSentinels: """Test disabled sentinel handling.""" @pytest.mark.parametrize("sentinel", [None, "", "disabled", "off", "false", "0"]) def test_disabled_sentinels(self, sentinel): """Various sentinels disable history.""" # Act if sentinel is None: history = QueryHistory(path=None) else: history = QueryHistory(path=Path(sentinel) if sentinel else None) # Assert # Should not raise on record history.record({"execution_id": "test"}) class TestQueryHistoryProperties: """Test QueryHistory properties.""" def test_enabled_property_true(self, tmp_path): """Enabled property returns True when initialized with path.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) # Assert assert history.enabled is True def test_enabled_property_false(self): """Enabled property returns False when disabled.""" # Arrange history = QueryHistory(path=None, disabled=True) # Assert assert history.enabled is False def test_path_property(self, tmp_path): """Path property returns the history path.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) # Assert assert history.path == history_path def test_disabled_property(self): """Disabled property returns correct state.""" # Arrange history = QueryHistory(path=None, disabled=True) # Assert assert history.disabled is True class TestQueryHistoryWarnings: """Test warning collection.""" def test_pop_warnings_empty(self, tmp_path): """Pop warnings returns empty list when no warnings.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) # Act warnings = history.pop_warnings() # Assert assert warnings == [] def test_pop_warnings_clears_list(self, tmp_path): """Pop warnings clears the warnings list.""" # Arrange - Create history with bad fallback to generate warning history_path = tmp_path / "history.jsonl" bad_fallback = Path("/nonexistent/path/history.jsonl") history = QueryHistory(path=history_path, fallbacks=[bad_fallback]) # Act _warnings1 = history.pop_warnings() warnings2 = history.pop_warnings() # Assert - Second call should be empty assert warnings2 == [] class TestQueryHistoryTimestampConversion: """Test timestamp handling in record().""" def test_record_converts_ts_to_timestamp(self, tmp_path): """Record converts numeric ts to ISO timestamp.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) record = { "execution_id": "test", "statement": "SELECT 1", "ts": 1609459200.0, # 2021-01-01 00:00:00 UTC } # Act history.record(record) # Assert content = history_path.read_text() assert "timestamp" in content assert "2021-01-01" in content def test_record_preserves_existing_timestamp(self, tmp_path): """Record preserves existing timestamp field.""" # Arrange history_path = tmp_path / "history.jsonl" history = QueryHistory(path=history_path) record = { "execution_id": "test", "statement": "SELECT 1", "timestamp": "2024-01-01T00:00:00Z", } # Act history.record(record) # Assert content = history_path.read_text() assert "2024-01-01T00:00:00Z" in content class TestQueryHistoryFallbacks: """Test fallback path handling.""" def test_init_with_fallbacks(self, tmp_path): """Initialize with fallback paths.""" # Arrange primary_path = tmp_path / "primary" / "history.jsonl" fallback_path = tmp_path / "fallback" / "history.jsonl" # Act history = QueryHistory(path=primary_path, fallbacks=[fallback_path]) # Assert assert history.enabled is True assert history.path == primary_path def test_fallback_used_when_primary_fails(self, tmp_path): """Fallback path used when primary path fails.""" # Arrange bad_primary = Path("/root/cannot_write/history.jsonl") fallback_path = tmp_path / "fallback" / "history.jsonl" # Act history = QueryHistory(path=bad_primary, fallbacks=[fallback_path]) # Assert assert history.enabled is True assert history.path == fallback_path # Should have warning about fallback warnings = history.pop_warnings() assert len(warnings) > 0 class TestQueryHistoryFromEnv: """Test from_env() class method.""" def test_from_env_disabled(self, monkeypatch): """from_env returns disabled history when env var is disable sentinel.""" # Arrange monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", "disabled") # Act history = QueryHistory.from_env() # Assert assert history.disabled is True def test_from_env_with_path(self, monkeypatch, tmp_path): """from_env uses path from environment variable.""" # Arrange history_path = tmp_path / "env_history.jsonl" monkeypatch.setenv("IGLOO_MCP_QUERY_HISTORY", str(history_path)) # Act history = QueryHistory.from_env() # Assert assert history.enabled is True assert history.path == history_path def test_from_env_no_env_var(self, monkeypatch): """from_env handles missing environment variable.""" # Arrange monkeypatch.delenv("IGLOO_MCP_QUERY_HISTORY", raising=False) # Act history = QueryHistory.from_env() # Assert # Should either be disabled or use fallback assert history is not None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server