Skip to main content
Glama
test_query_lifecycle.py21.1 kB
#!/usr/bin/env python3 """ Query Lifecycle Test - Tier 1 Test Suite Tests the complete query lifecycle: Create → Validate → Execute → Poll → Retrieve → Cleanup This ensures the full workflow works end-to-end. """ import os import sys import time import json from pathlib import Path from typing import Dict, Any, Tuple, List # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from tests.support.api_client import DuneTestClient, TestQueryManager from tests.support.helpers import PerformanceTimer, TestEnvironment, TestResultCollector from tests.support import QueryFactory, QueryValidator def load_env_variables(): """Load environment variables from .env file.""" env_file = Path(__file__).parent.parent.parent / ".env" if env_file.exists(): with open(env_file) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) os.environ[key] = value def test_complete_lifecycle() -> Tuple[bool, Dict[str, Any]]: """Test complete query lifecycle from creation to cleanup.""" print("🔄 Testing Complete Query Lifecycle...") timer = PerformanceTimer() try: timer.start() # Setup api_key = os.getenv("DUNE_API_KEY") with TestQueryManager(DuneTestClient(api_key)) as qm: test_sql = QueryFactory.simple_select() unique_name = f"lifecycle_test_{QueryFactory.unique_timestamp_suffix()}" # Step 1: Create Query timer.checkpoint("start_creation") query_id = qm.create_test_query(test_sql, unique_name) timer.checkpoint("query_created") print(f" ✓ Step 1: Query created with ID: {query_id}") # Step 2: Validate Query Info query_info = qm.get_query_info(query_id) if not query_info: return False, {"error": "Query info not properly stored"} timer.checkpoint("query_validated") print(f" ✓ Step 2: Query validated: {query_info.get('name', 'unknown')}") # Step 3: Execute Query timer.checkpoint("start_execution") execution_id = qm.execute_and_wait(query_id, timeout=60) timer.checkpoint("query_completed") print(f" ✓ Step 3: Query executed and completed: {execution_id}") # Step 4: Retrieve Results results_json = qm.client.get_results_json(query_id) timer.checkpoint("results_retrieved") if not isinstance(results_json, dict) or 'data' not in results_json: return False, {"error": "Invalid results format"} data_rows = results_json['data'] print(f" ✓ Step 4: Results retrieved: {len(data_rows)} rows") # Step 5: Validate Results # Convert to DataFrame for validation (basic implementation) if len(data_rows) == 0: return False, {"error": "No data returned from query"} # Basic validation of expected simple query result if len(data_rows[0]) >= 2: print(f" ✓ Step 5: Results validated: expected column count") else: return False, {"error": "Result doesn't match expected format"} timer.checkpoint("results_validated") # Step 6: Cleanup (handled by context manager) print(f" ✓ Step 6: Cleanup will be handled automatically") timer.stop() details = { "query_id": query_id, "execution_id": execution_id, "rows_returned": len(data_rows), "lifecycle_stages": [ "created", "validated", "executed", "completed", "retrieved", "validated", "cleanup" ], "timings": timer.get_report() } return True, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_error_states() -> Tuple[bool, Dict[str, Any]]: """Test error states at each stage of the lifecycle.""" print("⚠️ Testing Error States...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") client = DuneTestClient(api_key) errors_tested = [] # Test 1: Invalid SQL (creation error) try: client.create_query("SELECTTTT INVALID SQL", "invalid_sql_test") errors_tested.append(("invalid_sql", "FAILED", "Should have failed")) except Exception as e: if any(keyword in str(e).lower() for keyword in ["syntax", "invalid", "error"]): errors_tested.append(("invalid_sql", "PASSED", f"Correctly rejected: {e}")) else: errors_tested.append(("invalid_sql", "UNEXPECTED", f"Wrong error type: {e}")) timer.checkpoint("invalid_sql_test") # Test 2: Query execution with non-existent ID try: client.execute_query(999999999) errors_tested.append(("nonexistent_query", "FAILED", "Should have failed")) except Exception as e: if any(keyword in str(e).lower() for keyword in ["not found", "404", "invalid"]): errors_tested.append(("nonexistent_query", "PASSED", f"Correctly rejected: {e}")) else: errors_tested.append(("nonexistent_query", "UNEXPECTED", f"Wrong error type: {e}")) timer.checkpoint("nonexistent_query_test") # Test 3: Access results of non-existent execution try: client.get_execution_status("nonexistent_exec_id") errors_tested.append(("nonexistent_execution", "FAILED", "Should have failed")) except Exception as e: if any(keyword in str(e).lower() for keyword in ["not found", "404", "invalid"]): errors_tested.append(("nonexistent_execution", "PASSED", f"Correctly rejected: {e}")) else: errors_tested.append(("nonexistent_execution", "UNEXPECTED", f"Wrong error type: {e}")) timer.checkpoint("nonexistent_exec_test") # Test 4: Create valid query but test parameter errors try: valid_query_id = client.create_query( "SELECT {{valid_param}} as test", "param_error_test" ) # Try executing without required parameter client.execute_query(valid_query_id) # Missing valid_param errors_tested.append(("missing_parameter", "FAILED", "Should have failed")) # Cleanup client.delete_query(valid_query_id) except Exception as e: if any(keyword in str(e).lower() for keyword in ["param", "missing", "required"]): errors_tested.append(("missing_parameter", "PASSED", f"Correctly rejected: {e}")) else: errors_tested.append(("missing_parameter", "UNCERTAIN", f"Error: {e}")) timer.checkpoint("parameter_errors_test") timer.stop() # Calculate success rate passed_errors = sum(1 for _, status, _ in errors_tested if status == "PASSED") total_errors = len(errors_tested) print(f" ✓ Error state tests: {passed_errors}/{total_errors} proper error handling") for error_type, status, message in errors_tested: icon = "✓" if status == "PASSED" else "✗" if status == "FAILED" else "?" print(f" {icon} {error_type}: {message}") details = { "error_tests": errors_tested, "passed_rate": passed_errors / total_errors if total_errors > 0 else 0, "timings": timer.get_report() } return passed_errors == total_errors, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_rollback_scenarios() -> Tuple[bool, Dict[str, Any]]: """Test rollback scenarios and cleanup failure handling.""" print("🔄 Testing Rollback Scenarios...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") client = DuneTestClient(api_key) rollback_tests = [] # Test 1: Manual cleanup after failure query_ids_to_cleanup = [] try: # Create several queries for i in range(3): query_id = client.create_query( f"SELECT {i} as test_col", f"rollback_test_{i}" ) query_ids_to_cleanup.append(query_id) # Simulate a failure before cleanup # (In real scenario, this would be an exception) print(f" Created {len(query_ids_to_cleanup)} queries for rollback test") except Exception as e: print(f" Error during query creation: {e}") finally: # Manual cleanup cleanup_count = 0 for query_id in query_ids_to_cleanup: if client.delete_query(query_id): cleanup_count += 1 rollback_tests.append(("manual_cleanup", cleanup_count, len(query_ids_to_cleanup))) print(f" ✓ Manual cleanup: {cleanup_count}/{len(query_ids_to_cleanup)} queries") timer.checkpoint("manual_rollback") # Test 2: Context manager cleanup on exception try: with TestQueryManager(client) as qm: query_id = qm.create_test_query("SELECT 1 as test", "exception_test") # Simulate an exception raise Exception("Simulated test exception") except Exception as e: if "Simulated test exception" in str(e): rollback_tests.append(("context_cleanup", "HANDLED", "Exception occurred as expected")) print(" ✓ Context manager: Exception handled correctly") else: rollback_tests.append(("context_cleanup", "UNEXPECTED", f"Wrong exception: {e}")) timer.checkpoint("context_rollback") # Test 3: Cleanup with invalid queries (should not fail) try: cleanup_attempts = 0 cleanup_successes = 0 # Try to delete queries that don't exist for fake_id in [999999, 888888, 777777]: cleanup_attempts += 1 if client.delete_query(fake_id): cleanup_successes += 1 # Try to delete queries that might exist query_id = client.create_query("SELECT 1", "cleanup_test") cleanup_attempts += 1 if client.delete_query(query_id): cleanup_successes += 1 rollback_tests.append(("cleanup_resilience", cleanup_successes, cleanup_attempts)) print(f" ✓ Cleanup resilience: {cleanup_successes}/{cleanup_attempts} operations succeeded") except Exception as e: rollback_tests.append(("cleanup_resilience", "FAILED", str(e))) print(f" ✗ Cleanup resilience failed: {e}") timer.stop() details = { "rollback_tests": rollback_tests, "timings": timer.get_report() } # All rollback tests should pass (or handle errors gracefully) return True, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_parameter_validation() -> Tuple[bool, Dict[str, Any]]: """Test parameter validation during query lifecycle.""" print("🔧 Testing Parameter Validation...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") client = DuneTestClient(api_key) param_tests = [] # Test 1: Create query with parameters param_sql = "SELECT {{test_param}} as value, 42 as constant" query_id = client.create_query(param_sql, "param_validation_test") print(f" ✓ Parameterized query created: {query_id}") timer.checkpoint("param_query_created") # Test 2: Execute with valid parameters valid_params = {"test_param": "hello_world"} execution_id = client.execute_query(query_id, valid_params) print(f" ✓ Executed with valid parameters: {execution_id}") timer.checkpoint("valid_paramexec") param_tests.append(("valid_params", "PASSED", "Execution succeeded")) # Wait for completion try: status = client.wait_for_completion(execution_id, timeout=30) print(f" ✓ Query completed with params: {status.get('state', 'unknown')}") timer.checkpoint("param_query_completed") param_tests.append(("param_completion", "PASSED", "Query completed")) except TimeoutError: param_tests.append(("param_completion", "TIMEOUT", "Query took too long")) # Test 3: Execute with invalid parameters try: # Using different parameter name should fail invalid_params = {"wrong_param": "value"} client.execute_query(query_id, invalid_params) param_tests.append(("invalid_params", "FAILED", "Should have rejected wrong parameters")) except Exception as e: if "param" in str(e).lower(): param_tests.append(("invalid_params", "PASSED", f"Correctly rejected: {e}")) else: param_tests.append(("invalid_params", "UNCERTAIN", f"Different error: {e}")) timer.checkpoint("invalid_param_attempt") # Test 4: Execute with missing parameters try: client.execute_query(query_id) # No parameters param_tests.append(("missing_params", "FAILED", "Should require parameters")) except Exception as e: if "param" in str(e).lower() or "missing" in str(e).lower(): param_tests.append(("missing_params", "PASSED", f"Correctly required: {e}")) else: param_tests.append(("missing_params", "UNCERTAIN", f"Different error: {e}")) timer.checkpoint("missing_param_attempt") # Cleanup client.delete_query(query_id) print(" ✓ Parameter test query cleaned up") timer.stop() # Calculate success rate passed_params = sum(1 for _, status, _ in param_tests if status == "PASSED") total_params = len(param_tests) print(f" ✓ Parameter tests: {passed_params}/{total_params} passed") for test_name, status, message in param_tests: icon = "✓" if status == "PASSED" else "✗" if status == "FAILED" else "?" print(f" {icon} {test_name}: {message}") details = { "parameter_tests": param_tests, "success_rate": passed_params / total_params if total_params > 0 else 0, "timings": timer.get_report() } return passed_params == total_params, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_concurrent_queries() -> Tuple[bool, Dict[str, Any]]: """Test concurrent query handling and resource management.""" print("🚀 Testing Concurrent Queries...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") concurrent_results = [] # Create multiple queries concurrently concurrent_queries = [] num_concurrent = 3 timer.checkpoint("concurrent_start") try: # Create queries for i in range(num_concurrent): client = DuneTestClient(api_key) test_sql = f"SELECT {i} as query_id, 'concurrent_{i}' as label" query_id = client.create_query(test_sql, f"concurrent_test_{i}") concurrent_queries.append((client, query_id, i)) print(f" ✓ Created concurrent query {i}: {query_id}") timer.checkpoint("concurrent_created") # Execute all queries execution_ids = [] for client, query_id, i in concurrent_queries: execution_id = client.execute_query(query_id) execution_ids.append((client, query_id, execution_id, i)) print(f" ✓ Started concurrent execution {i}: {execution_id}") timer.checkpoint("concurrent_executed") # Wait for all to complete (with reasonable timeout) completed_count = 0 for client, query_id, execution_id, i in execution_ids: try: status = client.wait_for_completion(execution_id, timeout=60) if status.get('state') == 'QUERY_STATE_COMPLETED': completed_count += 1 print(f" ✓ Concurrent query {i} completed: {status.get('state')}") except TimeoutError: print(f" ⚠ Concurrent query {i} timed out") finally: client.delete_query(query_id) timer.checkpoint("concurrent_completed") concurrent_results.append(("concurrent_execution", completed_count, num_concurrent)) print(f" ✓ Concurrent queries: {completed_count}/{num_concurrent} completed") except Exception as e: print(f" ✗ Concurrent test error: {e}") concurrent_results.append(("concurrent_execution", "ERROR", str(e))) timer.stop() details = { "concurrent_results": concurrent_results, "concurrent_count": num_concurrent, "timings": timer.get_report() } # At least some concurrent queries should work success = any(result == "ERROR" for _, result, _ in concurrent_results) is False return success, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def main(): """Run query lifecycle test suite.""" print("🔄 DUNE QUERY LIFECYCLE TEST SUITE") print("=" * 50) # Load environment load_env_variables() # Check API key if not os.getenv("DUNE_API_KEY"): print("❌ DUNE_API_KEY not found. Please set it in your environment or .env file.") return False # Initialize result collector results = TestResultCollector() results.start_collection() # Run lifecycle tests tests = [ ("Complete Lifecycle", test_complete_lifecycle), ("Error States", test_error_states), ("Rollback Scenarios", test_rollback_scenarios), ("Parameter Validation", test_parameter_validation), ("Concurrent Queries", test_concurrent_queries), ] passed = 0 total = len(tests) for test_name, test_func in tests: print(f"\n{'-' * 40}") try: success, details = test_func() results.add_result(test_name, success, details) if success: print(f"✅ {test_name} PASSED") passed += 1 else: error = details.get('error', 'Unknown error') print(f"❌ {test_name} FAILED: {error}") except Exception as e: results.add_result(test_name, False, {"error": str(e)}) print(f"❌ {test_name} EXCEPTION: {e}") results.finish_collection() summary = results.get_summary() # Summary print(f"\n{'=' * 50}") print("🎯 LIFECYCLE TEST SUMMARY") print(f"✅ {passed}/{total} tests passed") print(f"⏱️ Total duration: {summary['duration']:.2f}s") if passed == total: print("🎉 All lifecycle tests passed! Query workflow is robust.") return True else: print("⚠️ Some lifecycle tests failed. Review workflow issues.") return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server