Skip to main content
Glama
query_factory.py6.87 kB
""" Factory for creating standardized test queries expected to work reliably with Dune API. """ from typing import Dict, Tuple, List import time class QueryFactory: """Factory for creating reliable test queries for Dune API testing.""" @staticmethod def simple_select() -> str: """Simple deterministic query for basic functionality testing.""" return "SELECT 1 as test_col, 'fixed_value' as message" @staticmethod def parametric_query() -> Tuple[str, Dict[str, int]]: """Query with parameters for testing parameter handling.""" sql = "SELECT {{limit_count}} as value" params = {"limit_count": 5} return sql, params @staticmethod def string_parameter_query() -> Tuple[str, Dict[str, str]]: """Query with string parameter for testing different parameter types.""" sql = "SELECT '{{test_string}}' as message, 42 as number" params = {"test_string": "hello_world"} return sql, params @staticmethod def data_types_query() -> str: """Query covering all major Dune data types.""" return """ SELECT 1 as int_col, 123.45 as float_col, 'test_string' as string_col, true as bool_col, DOUBLE '123.45' as double_col, DATE '2023-01-01' as date_col, TIMESTAMP '2023-01-01 12:00:00' as timestamp_col, NULL as null_col """ @staticmethod def joined_query() -> str: """More complex query with joins for testing advanced functionality.""" return """ SELECT block_time, block_number, 'ethereum_mainnet' as chain FROM ( SELECT DATE '2023-01-01' as block_time, 1641024000 as block_number ) blocks WHERE block_time >= DATE '2023-01-01' ORDER BY block_time DESC LIMIT 5 """ @staticmethod def time_series_query() -> str: """Time series query for testing temporal data handling.""" return """ SELECT date_trunc('day', block_time) as day, COUNT(*) as block_count FROM ( SELECT DATE '2023-01-01' + INTERVAL '1 day' * n as block_time FROM ( SELECT generate_series(0, 4) as n ) series ) daily_blocks GROUP BY day ORDER BY day DESC LIMIT 7 """ @staticmethod def aggregate_query() -> str: """Query with aggregations for testing complex calculations.""" return """ SELECT SUM(value) as total_value, AVG(value) as avg_value, MIN(value) as min_value, MAX(value) as max_value, COUNT(*) as row_count FROM ( SELECT 10 as value UNION SELECT 20 UNION SELECT 30 ) sample_data """ @staticmethod def stress_test_query(limit: int = 1000) -> str: """Query designed to stress test with larger result sets.""" return f""" SELECT n as row_num, 'row_' || n as row_label, n * 1.5 as float_val, n % 2 = 0 as is_even FROM ( SELECT generate_series(1, {limit}) as n ) numbers """ @staticmethod def unique_timestamp_suffix() -> str: """Generate unique string for test isolation.""" return f"test_{int(time.time())}_{hash(time.time()) % 10000:04d}" class QueryValidator: """Validator for checking query results meet expected patterns.""" @staticmethod def validate_simple_result(result) -> bool: """Validate simple query result structure.""" try: # Check shape if hasattr(result, 'shape'): assert result.shape == (1, 2), f"Expected (1, 2), got {result.shape}" # Check columns if hasattr(result, 'columns'): expected_cols = ['test_col', 'message'] actual_cols = list(result.columns) assert set(actual_cols) == set(expected_cols), f"Expected {expected_cols}, got {actual_cols}" # Check values if hasattr(result, 'to_dict'): data = result.to_dict() assert data.get('test_col', [None])[0] == 1, "test_col should be 1" assert data.get('message', [None])[0] == 'fixed_value', "message should be fixed_value" return True except Exception as e: print(f"Validation error: {e}") return False @staticmethod def validate_data_types_result(result) -> bool: """Validate data types query result.""" try: if hasattr(result, 'shape'): assert result.shape == (1, 8), f"Expected 1 row, 8 columns, got {result.shape}" if hasattr(result, 'columns'): expected_cols = ['int_col', 'float_col', 'string_col', 'bool_col', 'double_col', 'date_col', 'timestamp_col', 'null_col'] actual_cols = list(result.columns) assert set(actual_cols) == set(expected_cols), f"Column mismatch: {actual_cols}" if hasattr(result, 'to_dict'): data = result.to_dict() # Check specific type indicators assert data.get('int_col', [None])[0] == 1 assert data.get('string_col', [None])[0] == 'test_string' assert data.get('bool_col', [None])[0] == True assert data.get('null_col', [None])[0] is None return True except Exception as e: print(f"Data types validation error: {e}") return False @staticmethod def validate_aggregate_result(result) -> bool: """Validate aggregate query result.""" try: if hasattr(result, 'shape'): assert result.shape == (1, 5), f"Expected 1 row, 5 columns, got {result.shape}" if hasattr(result, 'to_dict'): data = result.to_dict() assert data.get('total_value', [None])[0] == 60 # 10 + 20 + 30 assert data.get('avg_value', [None])[0] == 20 # (10 + 20 + 30) / 3 assert data.get('min_value', [None])[0] == 10 assert data.get('max_value', [None])[0] == 30 assert data.get('row_count', [None])[0] == 3 return True except Exception as e: print(f"Aggregate validation error: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server