Skip to main content
Glama
test_smoke_integration.py19.5 kB
"""Smoke tests that verify integration without requiring Snowflake database. These tests focus on the end-to-end behavior of our enhancements using mocks and in-memory operations. """ from __future__ import annotations import json import os import tempfile from pathlib import Path from unittest.mock import Mock, patch import pytest from igloo_mcp.config import Config from igloo_mcp.logging.query_history import QueryHistory from igloo_mcp.mcp.tools.execute_query import ExecuteQueryTool from igloo_mcp.sql_validation import validate_sql_statement from tests.helpers.fake_snowflake_connector import FakeQueryPlan, FakeSnowflakeService class TestSmokeIntegration: """Smoke tests for integrated functionality without database dependencies.""" def test_complete_lateral_query_workflow(self): """Test complete workflow for LATERAL queries end-to-end.""" # Step 1: SQL validation should allow LATERAL queries lateral_query = """SELECT fees.position_object_id, rewards.seq AS reward_index FROM tmp_cetus_lp_positions_calculated_enhanced_may20_2025 fees , LATERAL FLATTEN(input => fees.rewards_info) rewards WHERE fees.rewards_info IS NOT NULL AND rewards.seq < 10""" stmt_type, is_valid, error_msg = validate_sql_statement( lateral_query, ["Select"], [] ) assert is_valid is True, "LATERAL query validation failed in smoke test" assert error_msg is None, f"LATERAL query had error message: {error_msg}" assert stmt_type == "Select" def test_complete_post_query_insight_workflow(self): """Test complete workflow for post_query_insight feature.""" with tempfile.TemporaryDirectory() as temp_dir: # Simulate git repository (Path(temp_dir) / ".git").mkdir() # Create QueryHistory in git repo (should be enabled by default) with patch("igloo_mcp.path_utils.find_repo_root") as mock_find: mock_find.return_value = Path(temp_dir) history = QueryHistory.from_env() assert ( history.enabled ), "Query history should be enabled in git repo smoke test" # Simulate recording query with post_query_insight payload = { "ts": 1699999999, "status": "success", "profile": "simulate_test", "statement_preview": "SELECT revenue_growth FROM quarterly_metrics", "rowcount": 4, "post_query_insight": { "summary": ( "Q4 revenue grew 23% YoY exceeding forecast by 8 points" ), "key_metrics": [ "revenue:+23%", "forecast_deviation:+8pp", "market_share:+2.1%", ], "business_impact": ( "Strong performance driven by new product launches and " "expanded enterprise contracts" ), "follow_up_needed": True, }, } history.record(payload) # Verify JSONL file was created and contains correct data history_file = history._path assert ( history_file.exists() ), "History file should be created in smoke test" lines = [ line for line in history_file.read_text().splitlines() if line.strip() ] assert lines, "History file should contain at least one entry" recorded = json.loads(lines[-1]) # Verify post_query_insight was properly structured assert "post_query_insight" in recorded assert ( recorded["post_query_insight"]["summary"] == payload["post_query_insight"]["summary"] ) assert ( recorded["post_query_insight"]["key_metrics"] == payload["post_query_insight"]["key_metrics"] ) assert ( recorded["post_query_insight"]["business_impact"] == payload["post_query_insight"]["business_impact"] ) assert recorded["post_query_insight"]["follow_up_needed"] is True @pytest.mark.asyncio async def test_complete_query_execution_workflow_with_all_features(self): """Test complete query execution workflow with all enhancements.""" config = Config.from_env() query_service = Mock() mock_rows = [{"id": i, "value": f"data_{i}"} for i in range(1500)] with tempfile.TemporaryDirectory() as temp_dir: history_file = Path(temp_dir) / "logs" / "doc.jsonl" artifact_root = Path(temp_dir) / "artifacts" cache_root = Path(temp_dir) / "cache" env_overrides = { "IGLOO_MCP_QUERY_HISTORY": str(history_file), "IGLOO_MCP_ARTIFACT_ROOT": str(artifact_root), "IGLOO_MCP_CACHE_ROOT": str(cache_root), } with patch.dict(os.environ, env_overrides, clear=False): complex_query = """WITH monthly_metrics AS ( SELECT month, SUM(revenue) as total_revenue, COUNT(DISTINCT customer_id) as unique_customers FROM sales s , LATERAL FLATTEN(input => s.metadata) meta WHERE s.created_at >= '2023-01-01' GROUP BY month ) SELECT m.month, m.total_revenue, m.unique_customers, prev.revenue FROM monthly_metrics m LEFT JOIN monthly_metrics prev ON m.month = DATEADD(month, 1, prev.month) ORDER BY m.month""" service = FakeSnowflakeService( [ FakeQueryPlan( statement=complex_query, rows=mock_rows, rowcount=1500, duration=0.01, sfqid="test_query_id_12345", ) ] ) tool = ExecuteQueryTool( config=config, snowflake_service=service, query_service=query_service, health_monitor=None, ) result = await tool.execute( statement=complex_query, warehouse="ANALYTICS_WH", timeout_seconds=120, reason="Q4 2023 revenue analysis with customer cohort breakdown", post_query_insight={ "summary": ( "Q4 showed record performance with 23% revenue growth " "and improved customer acquisition" ), "key_metrics": [ "revenue:+23%", "customers:+15%", "avg_order_value:+8%", ], "business_impact": ( "Significant outperformance of Q4 targets, driven by " "successful holiday campaign and new product launches" ), }, ) assert result["rowcount"] == 1500 assert "query_id" in result assert "duration_ms" in result assert result["cache"]["hit"] is False assert result["audit_info"]["cache"]["hit"] is False manifest_path = Path(result["cache"]["manifest_path"]) assert manifest_path.exists() assert (manifest_path.parent / "rows.jsonl").exists() assert history_file.exists() lines = history_file.read_text().strip().splitlines() assert len(lines) == 1 recorded = json.loads(lines[0]) assert recorded["status"] == "success" assert recorded["cache_key"] assert recorded["execution_id"] == result["audit_info"]["execution_id"] cached_result = await tool.execute( statement=complex_query, warehouse="ANALYTICS_WH", timeout_seconds=120, reason="Q4 2023 revenue analysis with customer cohort breakdown", ) executed_cursors = [ cursor for cursor in service.cursors if cursor._main_executed ] assert len(executed_cursors) == 1 assert service.cursors[-1]._main_executed is False assert cached_result["cache"]["hit"] is True assert cached_result["audit_info"]["cache"]["hit"] is True assert cached_result["rowcount"] == 1500 lines = history_file.read_text().strip().splitlines() assert len(lines) == 2 cache_entry = json.loads(lines[-1]) assert cache_entry["status"] == "cache_hit" assert cache_entry["cache_key"] == recorded["cache_key"] def test_error_propagation_workflow(self): """Test error handling and propagation end-to-end.""" # Test complete error workflow for SQL validation blocked_queries = [ ("DELETE FROM users WHERE id = 1", "Delete"), ("DROP TABLE important_data", "Drop"), ("TRUNCATE TABLE logs", "Truncate"), ] for query, expected_type in blocked_queries: stmt_type, is_valid, error_msg = validate_sql_statement( query, ["Select"], ["Delete", "Drop", "Truncate"] ) assert is_valid is False, f"Blocked query should fail: {query}" assert ( expected_type in stmt_type ), f"Should detect correct type: {stmt_type}" assert ( "Safe alternatives:" in error_msg ), f"Should provide alternatives: {error_msg}" def test_repository_detection_workflow(self): """Test repository detection and logging workflow.""" with tempfile.TemporaryDirectory() as temp_dir: # Test non-git directory (logging should still enable with workspace path) with ( patch("igloo_mcp.path_utils.find_repo_root") as mock_find, patch("igloo_mcp.path_utils.Path.home", lambda: Path(temp_dir)), ): mock_find.return_value = Path(temp_dir) with patch.dict("os.environ", {}, clear=True): history = QueryHistory.from_env() assert ( history.enabled is True ), "Should default to workspace history" assert history.path is not None expected = ( Path(temp_dir) / ".igloo_mcp" / "logs" / "doc.jsonl" ).resolve() assert history.path.resolve() == expected # Test git directory (logging should be enabled) (Path(temp_dir) / ".git").mkdir() with ( patch("igloo_mcp.path_utils.find_repo_root") as mock_find, patch("igloo_mcp.path_utils.Path.home", lambda: Path(temp_dir)), ): mock_find.return_value = Path(temp_dir) with patch.dict( "os.environ", {"IGLOO_MCP_LOG_SCOPE": "repo"}, clear=True, ): history = QueryHistory.from_env() assert history.enabled is True, "Should be enabled in git repo" assert history.path is not None resolved = history.path.resolve() assert resolved.is_relative_to(Path(temp_dir).resolve()) assert ( "logs" in resolved.as_posix() ), "Should use repository-specific path" def test_complex_snowflake_patterns_validation(self): """Test validation of complex Snowflake-specific patterns.""" snowflake_patterns = [ # Complex LATERAL with VARIANT operations """SELECT event.user_id, event_data.event_name, event_data.properties:page_name::STRING as page_name, event_data.properties:session_id::STRING as session_id FROM ( SELECT user_id, parse_json(event_data) as event_data FROM user_events WHERE event_date >= '2023-01-01' ) events , LATERAL FLATTEN(input => events.event_data) event WHERE event_data.event_name = 'page_view'""", # Window functions with CTE and LATERAL """WITH user_sessions AS ( SELECT user_id, session_start, LAG(session_end) OVER (PARTITION BY user_id ORDER BY session_start) as prev_session_end FROM user_activity ), session_gaps AS ( SELECT user_id, session_start, prev_session_end, DATEDIFF(hours, prev_session_end, session_start) as gap_hours FROM user_sessions WHERE prev_session_end IS NOT NULL ) SELECT user_id, AVG(gap_hours) as avg_gap_between_sessions FROM session_gaps WHERE gap_hours > 24 GROUP BY user_id""", ] for pattern in snowflake_patterns: stmt_type, is_valid, error_msg = validate_sql_statement( pattern, ["Select"], [] ) assert ( is_valid is True ), f"Complex Snowflake pattern should be valid: {pattern[:100]}..." assert ( error_msg is None ), f"Complex pattern should not have error: {error_msg}" def test_parameter_schema_completeness(self): """Test that parameter schemas are complete and include all new features.""" config = Config.from_env() tool = ExecuteQueryTool( config=config, snowflake_service=Mock(), query_service=Mock(), health_monitor=None, ) schema = tool.get_parameter_schema() # Verify all expected parameters are present expected_params = [ "statement", "warehouse", "database", "schema", "role", "timeout_seconds", "verbose_errors", "reason", "post_query_insight", ] for param in expected_params: assert ( param in schema["properties"] ), f"Missing parameter in schema: {param}" # Verify post_query_insight schema details metric_schema = schema["properties"]["post_query_insight"] assert "anyOf" in metric_schema assert any(option.get("type") == "string" for option in metric_schema["anyOf"]) assert any(option.get("type") == "object" for option in metric_schema["anyOf"]) assert "insights" in metric_schema["description"].lower() assert len(metric_schema["examples"]) >= 2 # Verify timeout validation timeout_schema = schema["properties"]["timeout_seconds"] assert timeout_schema["minimum"] == 1 assert timeout_schema["maximum"] == 3600 def test_joined_enhancements_integration(self): """Test that all enhancements work together seamlessly.""" with tempfile.TemporaryDirectory() as temp_dir: # Create simulated git repository (Path(temp_dir) / ".git").mkdir() history_file = Path(temp_dir) / "logs" / "doc.jsonl" history_file.parent.mkdir(exist_ok=True) # Create complex query that uses multiple enhancement features complex_lateral_query = """WITH event_patterns AS ( SELECT user_id, event_type, COUNT(*) as event_count, LAG(event_type) OVER (PARTITION BY user_id ORDER BY created_at) as prev_event_type FROM user_events ue , LATERAL FLATTEN(input => ue.event_properties) props WHERE ue.created_at >= CURRENT_DATE() - 7 AND props.key = 'page_name' ) SELECT user_id, prev_event_type, event_type, event_count FROM event_patterns WHERE prev_event_type != event_type ORDER BY event_count DESC LIMIT 100""" # Test validation (should work) stmt_type, is_valid, error_msg = validate_sql_statement( complex_lateral_query, ["Select"], [] ) assert is_valid is True, "Complex integration query should validate" assert stmt_type == "Select" # Test history recording (should work) history = QueryHistory(history_file) integration_payload = { "ts": 1699999999, "status": "success", "profile": "integration_test", "statement_preview": complex_lateral_query[:200], "rowcount": 50, "reason": "User behavior pattern analysis for product recommendation improvements", "post_query_insight": { "summary": ( "Users show distinct navigation patterns with 60% following " "predictable paths" ), "key_metrics": [ "pattern_recognition:60%", "unique_flows:25%", "bounce_rate_reduction:15%", ], "business_impact": ( "Opportunity to implement intelligent navigation suggestions " "and improve user journey flow" ), }, } history.record(integration_payload) # Verify integration recording worked assert history_file.exists(), "Integration history should be created" content = history_file.read_text() recorded = json.loads(content.strip()) # All enhancement features should be present assert "timestamp" in recorded assert "reason" in recorded assert "post_query_insight" in recorded assert ( recorded["post_query_insight"]["summary"] == integration_payload["post_query_insight"]["summary"] ) assert recorded["statement_preview"] == complex_lateral_query[:200]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server