Skip to main content
Glama
test_bottleneck_analyzer.py7.52 kB
""" Tests for the bottleneck analyzer. """ import unittest from datetime import datetime, timedelta from analyzer.bottleneck.analyzer import BottleneckAnalyzer from analyzer.models import EndpointMetrics, TimeSeriesMetrics class TestBottleneckAnalyzer(unittest.TestCase): """Tests for the BottleneckAnalyzer class.""" def setUp(self): """Set up test fixtures.""" self.analyzer = BottleneckAnalyzer() # Create endpoint metrics self.endpoint_metrics = { "Fast Endpoint": EndpointMetrics( endpoint="Fast Endpoint", total_samples=100, error_count=0, error_rate=0.0, average_response_time=100.0, median_response_time=95.0, percentile_90=150.0, percentile_95=180.0, percentile_99=200.0, min_response_time=50.0, max_response_time=250.0, throughput=10.0, test_duration=10.0 ), "Medium Endpoint": EndpointMetrics( endpoint="Medium Endpoint", total_samples=100, error_count=2, error_rate=2.0, average_response_time=200.0, median_response_time=190.0, percentile_90=300.0, percentile_95=350.0, percentile_99=400.0, min_response_time=100.0, max_response_time=450.0, throughput=10.0, test_duration=10.0 ), "Slow Endpoint": EndpointMetrics( endpoint="Slow Endpoint", total_samples=100, error_count=5, error_rate=5.0, average_response_time=500.0, median_response_time=450.0, percentile_90=800.0, percentile_95=900.0, percentile_99=1000.0, min_response_time=200.0, max_response_time=1200.0, throughput=10.0, test_duration=10.0 ), "Error Endpoint": EndpointMetrics( endpoint="Error Endpoint", total_samples=100, error_count=15, error_rate=15.0, average_response_time=300.0, median_response_time=280.0, percentile_90=450.0, percentile_95=500.0, percentile_99=600.0, min_response_time=150.0, max_response_time=700.0, throughput=10.0, test_duration=10.0 ) } # Create time series metrics base_time = datetime(2023, 1, 1, 12, 0, 0) self.time_series_metrics = [ TimeSeriesMetrics( timestamp=base_time + timedelta(seconds=i * 5), active_threads=i + 1, throughput=10.0, average_response_time=100.0 + i * 20, error_rate=0.0 if i < 8 else 5.0 ) for i in range(10) ] # Add an anomaly self.time_series_metrics[5].average_response_time = 500.0 # Spike in the middle def test_identify_slow_endpoints(self): """Test identifying slow endpoints.""" # Use a higher threshold factor to get only the Slow Endpoint bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=2.0) # We should have identified the slow endpoint self.assertEqual(len(bottlenecks), 1) self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint") self.assertEqual(bottlenecks[0].metric_type, "response_time") # With threshold_factor=2.0, the severity should be medium or high self.assertIn(bottlenecks[0].severity, ["medium", "high"]) # Test with a lower threshold factor to catch more endpoints bottlenecks = self.analyzer.identify_slow_endpoints(self.endpoint_metrics, threshold_factor=0.8) self.assertGreaterEqual(len(bottlenecks), 2) self.assertEqual(bottlenecks[0].endpoint, "Slow Endpoint") # Should still be first def test_identify_error_prone_endpoints(self): """Test identifying error-prone endpoints.""" bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=3.0) # We should have identified both error-prone endpoints self.assertEqual(len(bottlenecks), 2) self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint") # Higher error rate should be first self.assertEqual(bottlenecks[0].metric_type, "error_rate") self.assertEqual(bottlenecks[0].severity, "high") self.assertEqual(bottlenecks[1].endpoint, "Slow Endpoint") self.assertEqual(bottlenecks[1].metric_type, "error_rate") self.assertEqual(bottlenecks[1].severity, "medium") # Test with a higher threshold to catch fewer endpoints bottlenecks = self.analyzer.identify_error_prone_endpoints(self.endpoint_metrics, threshold_error_rate=10.0) self.assertEqual(len(bottlenecks), 1) self.assertEqual(bottlenecks[0].endpoint, "Error Endpoint") def test_detect_anomalies(self): """Test detecting response time anomalies.""" anomalies = self.analyzer.detect_anomalies(self.time_series_metrics) # We should have detected the spike self.assertGreaterEqual(len(anomalies), 1) # The spike should be the first anomaly spike_anomaly = anomalies[0] self.assertEqual(spike_anomaly.timestamp, datetime(2023, 1, 1, 12, 0, 25)) # 5th interval self.assertGreater(abs(spike_anomaly.deviation_percentage), 50) # Should be a significant deviation def test_analyze_concurrency_impact(self): """Test analyzing concurrency impact.""" # Our time series has increasing thread counts and response times analysis = self.analyzer.analyze_concurrency_impact(self.time_series_metrics) # There should be a positive correlation self.assertGreater(analysis["correlation"], 0.5) # Create a new time series with no correlation no_correlation_series = [ TimeSeriesMetrics( timestamp=datetime(2023, 1, 1, 12, 0, 0) + timedelta(seconds=i * 5), active_threads=i + 1, throughput=10.0, average_response_time=200.0, # Constant response time error_rate=0.0 ) for i in range(10) ] analysis = self.analyzer.analyze_concurrency_impact(no_correlation_series) self.assertLess(analysis["correlation"], 0.5) self.assertFalse(analysis["has_degradation"]) def test_empty_inputs(self): """Test handling of empty inputs.""" self.assertEqual(len(self.analyzer.identify_slow_endpoints({})), 0) self.assertEqual(len(self.analyzer.identify_error_prone_endpoints({})), 0) self.assertEqual(len(self.analyzer.detect_anomalies([])), 0) analysis = self.analyzer.analyze_concurrency_impact([]) self.assertEqual(analysis["correlation"], 0) self.assertFalse(analysis["has_degradation"]) if __name__ == '__main__': unittest.main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/QAInsights/jmeter-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server