Skip to main content
Glama
test_performance.py25.4 kB
#!/usr/bin/env python3 """ Performance Test - Tier 2 Test Suite Tests performance characteristics, benchmarks, and resource usage. Ensures the system meets performance requirements and identifies bottlenecks. """ import os import sys import time import json import psutil import threading from pathlib import Path from typing import Dict, Any, Tuple, List from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) from tests.support.api_client import DuneTestClient, TestQueryManager from tests.support.helpers import PerformanceTimer, TestResultCollector, TestEnvironment from tests.support import QueryFactory, ExpectedResults def load_env_variables(): """Load environment variables from .env file.""" env_file = Path(__file__).parent.parent.parent / ".env" if env_file.exists(): with open(env_file) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) os.environ[key] = value def get_resource_usage() -> Dict[str, float]: """Get current system resource usage.""" process = psutil.Process() return { 'cpu_percent': process.cpu_percent(), 'memory_mb': process.memory_info().rss / 1024 / 1024, 'memory_percent': process.memory_percent(), 'open_files': len(process.open_files()), 'threads': process.num_threads() } def test_query_execution_benchmarks() -> Tuple[bool, Dict[str, Any]]: """Benchmark query execution times against expected performance.""" print("⏱️ Testing Query Execution Benchmarks...") timer = PerformanceTimer() try: timer.start() initial_resources = get_resource_usage() api_key = os.getenv("DUNE_API_KEY") benchmarks = [] # Test different query types with performance expectations test_cases = [ ("simple_query", QueryFactory.simple_select(), 10.0), ("data_types_query", QueryFactory.data_types_query(), 15.0), ("aggregate_query", QueryFactory.aggregate_query(), 20.0), ("time_series_query", QueryFactory.time_series_query(), 25.0), ] with TestQueryManager(DuneTestClient(api_key)) as qm: for test_name, test_sql, expected_max_time in test_cases: test_timer = PerformanceTimer() test_timer.start() print(f" Testing {test_name}...") # Create query query_id = qm.create_test_query(test_sql, f"perf_test_{test_name}") test_timer.checkpoint("query_created") # Execute query execution_id = qm.execute_and_wait(query_id, timeout=expected_max_time * 2) test_timer.checkpoint("query_completed") # Get results results_json = qm.client.get_results_json(query_id) rows_returned = len(results_json.get('data', [])) test_timer.checkpoint("results_retrieved") # Stop timer test_timer.stop() duration = test_timer.duration # Check performance passes_benchmark = duration <= expected_max_time benchmarks.append({ 'test_name': test_name, 'duration': duration, 'expected_max_time': expected_max_time, 'passes_benchmark': passes_benchmark, 'rows_returned': rows_returned, 'performance_ratio': duration / expected_max_time }) status = "✓" if passes_benchmark else "✗" print(f" {status} {test_name}: {duration:.2f}s (expected: ≤{expected_max_time}s)") final_resources = get_resource_usage() timer.stop() # Analyze results passed_benchmarks = sum(1 for b in benchmarks if b['passes_benchmark']) total_benchmarks = len(benchmarks) avg_performance_ratio = sum(b['performance_ratio'] for b in benchmarks) / total_benchmarks print(f" Benchmarks: {passed_benchmarks}/{total_benchmarks} within expected time") print(f" Average performance ratio: {avg_performance_ratio:.2f} (1.0 = optimal)") print(f" Memory delta: {final_resources['memory_mb'] - initial_resources['memory_mb']:.1f} MB") details = { "benchmarks": benchmarks, "passed_benchmarks": passed_benchmarks, "total_benchmarks": total_benchmarks, "avg_performance_ratio": avg_performance_ratio, "resource_usage": { "initial": initial_resources, "final": final_resources, "memory_delta": final_resources['memory_mb'] - initial_resources['memory_mb'] }, "timings": timer.get_report() } # At least 80% of benchmarks should pass return passed_benchmarks >= total_benchmarks * 0.8, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_concurrent_query_limits() -> Tuple[bool, Dict[str, Any]]: """Test concurrent query handling and identify limits.""" print("🚀 Testing Concurrent Query Limits...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") # Test different concurrency levels concurrency_levels = [1, 2, 3] # Conservative to avoid rate limiting concurrent_results = [] for concurrent_count in concurrency_levels: print(f" Testing {concurrent_count} concurrent queries...") concurrent_timer = PerformanceTimer() concurrent_timer.start() results = [] exceptions = [] def run_single_query(index): try: client = DuneTestClient(api_key) test_sql = f"SELECT {index} as query_index, 'concurrent_test' as label" with TestQueryManager(client) as qm: query_id = qm.create_test_query(test_sql, f"concurrent_{index}_{int(time.time())}") execution_id = qm.execute_and_wait(query_id, timeout=60) results_json = qm.client.get_results_json(query_id) return { 'index': index, 'query_id': query_id, 'execution_id': execution_id, 'success': True, 'rows': len(results_json.get('data', [])) } except Exception as e: exceptions.append({'index': index, 'error': str(e)}) return { 'index': index, 'success': False, 'error': str(e) } # Execute queries concurrently with ThreadPoolExecutor(max_workers=concurrent_count) as executor: futures = [executor.submit(run_single_query, i) for i in range(concurrent_count)] try: for future in futures: result = future.result(timeout=120) # 2 minute timeout per query results.append(result) except FutureTimeoutError: exceptions.append({'error': 'Query execution timeout'}) except Exception as e: exceptions.append({'error': f'Thread execution error: {e}'}) concurrent_timer.stop() # Analyze concurrent execution successful_queries = [r for r in results if r.get('success', False)] failed_queries = exceptions concurrent_results.append({ 'concurrent_count': concurrent_count, 'successful_count': len(successful_queries), 'failed_count': len(failed_queries), 'success_rate': len(successful_queries) / concurrent_count, 'duration': concurrent_timer.duration, 'results': results, 'exceptions': failed_queries }) print(f" ✓ {concurrent_count} concurrent: {len(successful_queries)}/{concurrent_count} success") if failed_queries: print(f" ⚠ Errors: {len(failed_queries)} failed queries") timer.stop() # Find optimal concurrency level success_rates = [r['success_rate'] for r in concurrent_results] max_successful = max(success_rates) if success_rates else 0 optimal_concurrency = next( (r['concurrent_count'] for r in concurrent_results if r['success_rate'] == max_successful), 1 ) details = { "concurrent_results": concurrent_results, "optimal_concurrency": optimal_concurrency, "max_success_rate": max_successful, "recommended_concurrent_limit": optimal_concurrency, "timings": timer.get_report() } # Should handle at least basic concurrency (1 query) without issues return max_successful >= 0.5, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_memory_usage_patterns() -> Tuple[bool, Dict[str, Any]]: """Test memory usage patterns and detect potential leaks.""" print("💾 Testing Memory Usage Patterns...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") memory_snapshots = [] # Baseline memory usage baseline_memory = get_resource_usage() memory_snapshots.append({ 'phase': 'baseline', 'memory_mb': baseline_memory['memory_mb'], 'timestamp': time.time() }) # Perform multiple queries and track memory query_count = 5 stress_query = QueryFactory.data_types_query() with TestQueryManager(DuneTestClient(api_key)) as qm: for i in range(query_count): print(f" Memory test query {i+1}/{query_count}...") # Create and execute query query_id = qm.create_test_query(stress_query, f"memory_test_{i}") execution_id = qm.execute_and_wait(query_id, timeout=60) results_json = qm.client.get_results_json(query_id) # Take memory snapshot current_memory = get_resource_usage() memory_snapshots.append({ 'phase': f'query_{i+1}', 'memory_mb': current_memory['memory_mb'], 'timestamp': time.time(), 'rows_processed': len(results_json.get('data', [])) }) # Small delay to allow for GC time.sleep(0.5) # Final memory snapshot final_memory = get_resource_usage() memory_snapshots.append({ 'phase': 'final', 'memory_mb': final_memory['memory_mb'], 'timestamp': time.time() }) timer.stop() # Analyze memory patterns baseline_mb = baseline_memory['memory_mb'] peak_mb = max(snapshot['memory_mb'] for snapshot in memory_snapshots) final_mb = final_memory['memory_mb'] memory_increase = final_mb - baseline_mb memory_growth_rate = memory_increase / baseline_mb if baseline_mb > 0 else 0 # Check for memory leak indicators memory_ok = True issues = [] if memory_growth_rate > 0.5: # More than 50% growth issues.append(f"High memory growth: {memory_growth_rate:.1%}") memory_ok = False if memory_increase > 100: # More than 100MB growth issues.append(f"Large memory increase: {memory_increase:.1f}MB") memory_ok = False # Check memory stability over queries query_memories = [s for s in memory_snapshots if s['phase'].startswith('query_')] if len(query_memories) > 1: memory_trend = query_memories[-1]['memory_mb'] - query_memories[0]['memory_mb'] if memory_trend > 50: # Growing trend during queries issues.append(f"Memory growth during queries: {memory_trend:.1f}MB") memory_ok = False print(f" Baseline memory: {baseline_mb:.1f} MB") print(f" Peak memory: {peak_mb:.1f} MB") print(f" Final memory: {final_mb:.1f} MB") print(f" Memory increase: {memory_increase:.1f} MB ({memory_growth_rate:.1%})") if memory_ok: print(" ✓ Memory usage appears stable") else: print(f" ⚠ Memory issues detected: {', '.join(issues)}") details = { "memory_snapshots": memory_snapshots, "baseline_memory_mb": baseline_mb, "peak_memory_mb": peak_mb, "final_memory_mb": final_mb, "memory_increase_mb": memory_increase, "memory_growth_rate": memory_growth_rate, "memory_ok": memory_ok, "issues": issues, "timings": timer.get_report() } return memory_ok, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_timeout_behavior() -> Tuple[bool, Dict[str, Any]]: """Test timeout behavior at different levels.""" print("⏰ Testing Timeout Behavior...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") timeout_tests = [] # Test 1: Client-level timeout print(" Testing client-level timeout...") client_timer = PerformanceTimer() client_timer.start() try: # Create client with very short timeout short_timeout_client = DuneTestClient(api_key) simple_sql = QueryFactory.simple_select() query_id = short_timeout_client.create_query(simple_sql, "timeout_test_1") # This should work fine (creation is fast) timeout_tests.append({ 'test_type': 'client_creation', 'success': True, 'timeout_set': 'short', 'result': 'succeeded' }) # Execute with reasonable timeout execution_id = short_timeout_client.execute_query(query_id) timeout_tests.append({ 'test_type': 'client_execution', 'success': True, 'timeout_set': 'short', 'result': 'executed' }) # Wait with short timeout (should timeout) try: short_timeout_client.wait_for_completion(execution_id, timeout=1) timeout_tests.append({ 'test_type': 'client_wait_short', 'success': False, 'timeout_set': 'short', 'result': 'unexpected_success' }) except TimeoutError: timeout_tests.append({ 'test_type': 'client_wait_short', 'success': True, 'timeout_set': 'short', 'result': 'timeout_as_expected' }) # Cleanup short_timeout_client.delete_query(query_id) except Exception as e: timeout_tests.append({ 'test_type': 'client_short_timeout', 'success': False, 'error': str(e) }) client_timer.stop() # Test 2: Query performance levels print(" Testing performance level timeouts...") 绩效测试 = ['medium', 'large'] # Skip 'low' for reliability for performance in 绩效测试: perf_timer = PerformanceTimer() perf_timer.start() try: with TestQueryManager(DuneTestClient(api_key)) as qm: test_sql = QueryFactory.aggregate_query() query_id = qm.create_test_query(test_sql, f"perf_{performance}") # Execute with different performance levels execution_id = qm.client.execute_query(query_id, performance=performance) status = qm.client.wait_for_completion(execution_id, timeout=45) perf_timer.stop() timeout_tests.append({ 'test_type': f'performance_{performance}', 'success': True, 'duration': perf_timer.duration, 'state': status.get('state', 'unknown') }) print(f" ✓ Performance {performance}: {perf_timer.duration:.2f}s") except TimeoutError: perf_timer.stop() timeout_tests.append({ 'test_type': f'performance_{performance}', 'success': False, 'duration': perf_timer.duration, 'result': 'timeout' }) print(f" ⚠ Performance {performance}: timeout after {perf_timer.duration:.2f}s") except Exception as e: perf_timer.stop() timeout_tests.append({ 'test_type': f'performance_{performance}', 'success': False, 'duration': perf_timer.duration, 'error': str(e) }) print(f" ✗ Performance {performance}: error - {e}") timer.stop() # Analyze timeout behavior successful_timeouts = sum(1 for t in timeout_tests if t.get('success') or t.get('result') == 'timeout_as_expected') total_timeouts = len(timeout_tests) timeout_behavior_ok = successful_timeouts >= total_timeouts * 0.7 print(f" Timeout behavior: {successful_timeouts}/{total_timeouts} handled correctly") details = { "timeout_tests": timeout_tests, "successful_timeouts": successful_timeouts, "total_timeouts": total_timeouts, "timeout_behavior_ok": timeout_behavior_ok, "timings": timer.get_report() } return timeout_behavior_ok, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def test_rate_limiting_behavior() -> Tuple[bool, Dict[str, Any]]: """Test rate limiting behavior and backoff mechanisms.""" print("🚦 Testing Rate Limiting Behavior...") timer = PerformanceTimer() try: timer.start() api_key = os.getenv("DUNE_API_KEY") # Test rapid succession requests print(" Testing ratelimit with rapid requests...") rapid_timer = PerformanceTimer() rapid_timer.start() client = DuneTestClient(api_key) test_sql = QueryFactory.simple_select() request_times = [] rate_limit_detected = False try: for i in range(8): # Make several rapid requests req_start = time.time() query_id = client.create_query(f"{test_sql} -- rapid {i}", f"rapid_test_{i}") req_end = time.time() duration = req_end - req_start request_times.append(duration) # Clean up client.delete_query(query_id) print(f" Request {i+1}: {duration:.3f}s") # Check for rate limiting indicators if duration > 2.0: # Significantly slower than normal rate_limit_detected = True # Small delay between requests time.sleep(0.2) except Exception as e: if "rate limit" in str(e).lower(): rate_limit_detected = True print(f" ✓ Rate limiting detected: {e}") else: print(f" ✗ Unexpected error during rapid requests: {e}") rapid_timer.stop() # Analyze request patterns if request_times: avg_time = sum(request_times) / len(request_times) max_time = max(request_times) min_time = min(request_times) # Rate limiting indicators time_variance = max_time - min_time high_variance = time_variance > avg_time # High variance suggests throttling rate_limit_analysis = { 'request_count': len(request_times), 'avg_time': avg_time, 'min_time': min_time, 'max_time': max_time, 'time_variance': time_variance, 'rate_limit_detected': rate_limit_detected or high_variance } else: rate_limit_analysis = {'error': 'No requests completed'} timer.stop() # Rate limiting behavior is considered OK if we don't get hard failures rate_limiting_ok = len(request_times) >= 5 # At least some requests should work print(f" Requests completed: {len(request_times)}/8") print(f" Rate limiting OK: {rate_limiting_ok}") details = { "rate_limit_analysis": rate_limit_analysis, "request_times": request_times, "rate_limiting_ok": rate_limiting_ok, "timings": timer.get_report() } return rate_limiting_ok, details except Exception as e: timer.stop() return False, {"error": str(e), "timings": timer.get_report()} def main(): """Run performance test suite.""" print("⏱️ DUNE PERFORMANCE TEST SUITE") print("=" * 50) # Load environment load_env_variables() # Check API key if not os.getenv("DUNE_API_KEY"): print("❌ DUNE_API_KEY not found. Please set it in your environment or .env file.") return False # Initialize result collector results = TestResultCollector() results.start_collection() # Run performance tests tests = [ ("Query Execution Benchmarks", test_query_execution_benchmarks), ("Concurrent Query Limits", test_concurrent_query_limits), ("Memory Usage Patterns", test_memory_usage_patterns), ("Timeout Behavior", test_timeout_behavior), ("Rate Limiting Behavior", test_rate_limiting_behavior), ] passed = 0 total = len(tests) for test_name, test_func in tests: print(f"\n{'-' * 40}") try: success, details = test_func() results.add_result(test_name, success, details) if success: print(f"✅ {test_name} PASSED") passed += 1 else: error = details.get('error', 'Unknown error') print(f"❌ {test_name} FAILED: {error}") except Exception as e: results.add_result(test_name, False, {"error": str(e)}) print(f"❌ {test_name} EXCEPTION: {e}") results.finish_collection() summary = results.get_summary() # Summary print(f"\n{'=' * 50}") print("🎯 PERFORMANCE TEST SUMMARY") print(f"✅ {passed}/{total} tests passed") print(f"⏱️ Total duration: {summary['duration']:.2f}s") if passed >= total * 0.8: # 80% pass rate for performance tests print("🎉 Performance tests passed! System meets performance requirements.") return True else: print("⚠️ Some performance tests failed. Review bottlenecks and resource usage.") return False if __name__ == "__main__": success = main() sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/spice-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server