Skip to main content
Glama
analyzer.py12.5 kB
""" Main analyzer module for JMeter test results. This module provides the main entry point for analyzing JMeter test results. It orchestrates the flow of data through the various components of the analyzer. """ from pathlib import Path from datetime import timedelta from typing import Dict, List, Optional, Union from analyzer.models import TestResults, OverallMetrics, Bottleneck from analyzer.parser.base import JTLParser from analyzer.parser.xml_parser import XMLJTLParser from analyzer.parser.csv_parser import CSVJTLParser from analyzer.metrics.calculator import MetricsCalculator from analyzer.bottleneck.analyzer import BottleneckAnalyzer from analyzer.insights.generator import InsightsGenerator class TestResultsAnalyzer: """Main analyzer class for JMeter test results.""" def __init__(self): """Initialize the analyzer.""" self.parsers = {} self.metrics_calculator = MetricsCalculator() self.bottleneck_analyzer = BottleneckAnalyzer() self.insights_generator = InsightsGenerator() # Register default parsers self.register_parser('xml', XMLJTLParser()) self.register_parser('csv', CSVJTLParser()) def register_parser(self, format_name: str, parser: JTLParser) -> None: """Register a parser for a specific format. Args: format_name: Name of the format (e.g., 'xml', 'csv') parser: Parser instance """ self.parsers[format_name] = parser def analyze_file(self, file_path: Union[str, Path], detailed: bool = False) -> Dict: """Analyze a JTL file and return the results. Args: file_path: Path to the JTL file detailed: Whether to include detailed analysis Returns: Dictionary containing analysis results Raises: FileNotFoundError: If the file does not exist ValueError: If the file format is invalid or unsupported """ path = Path(file_path) # Validate file if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Detect format format_name = JTLParser.detect_format(path) # Get appropriate parser if format_name not in self.parsers: raise ValueError(f"No parser available for format: {format_name}") parser = self.parsers[format_name] # Parse file test_results = parser.parse_file(path) # Perform analysis analysis_results = self._analyze_results(test_results, detailed) return analysis_results def _analyze_results(self, test_results: TestResults, detailed: bool = False) -> Dict: """Analyze test results and return the analysis. Args: test_results: TestResults object detailed: Whether to include detailed analysis Returns: Dictionary containing analysis results """ # Calculate overall metrics overall_metrics = self.metrics_calculator.calculate_overall_metrics(test_results) # Create basic results structure results = { "summary": { "total_samples": overall_metrics.total_samples, "error_count": overall_metrics.error_count, "error_rate": overall_metrics.error_rate, "average_response_time": overall_metrics.average_response_time, "median_response_time": overall_metrics.median_response_time, "percentile_90": overall_metrics.percentile_90, "percentile_95": overall_metrics.percentile_95, "percentile_99": overall_metrics.percentile_99, "min_response_time": overall_metrics.min_response_time, "max_response_time": overall_metrics.max_response_time, "throughput": overall_metrics.throughput, "start_time": test_results.start_time, "end_time": test_results.end_time, "duration": overall_metrics.test_duration } } # Add detailed analysis if requested if detailed: # Calculate endpoint metrics endpoint_metrics = self.metrics_calculator.calculate_endpoint_metrics(test_results) # Calculate time series metrics (5-second intervals) try: time_series_metrics = self.metrics_calculator.calculate_time_series_metrics( test_results, interval_seconds=5) except ValueError: time_series_metrics = [] # Identify bottlenecks slow_endpoints = self.bottleneck_analyzer.identify_slow_endpoints(endpoint_metrics) error_prone_endpoints = self.bottleneck_analyzer.identify_error_prone_endpoints(endpoint_metrics) anomalies = self.bottleneck_analyzer.detect_anomalies(time_series_metrics) concurrency_impact = self.bottleneck_analyzer.analyze_concurrency_impact(time_series_metrics) # Generate insights and recommendations all_bottlenecks = slow_endpoints + error_prone_endpoints bottleneck_recommendations = self.insights_generator.generate_bottleneck_recommendations(all_bottlenecks) # Create error analysis error_analysis = self._create_error_analysis(test_results) error_recommendations = self.insights_generator.generate_error_recommendations(error_analysis) # Generate scaling insights scaling_insights = self.insights_generator.generate_scaling_insights(concurrency_impact) # Prioritize all recommendations all_recommendations = bottleneck_recommendations + error_recommendations prioritized_recommendations = self.insights_generator.prioritize_recommendations(all_recommendations) # Add to results results["detailed"] = { "samples_count": len(test_results.samples), "endpoints": { endpoint: { "total_samples": metrics.total_samples, "error_count": metrics.error_count, "error_rate": metrics.error_rate, "average_response_time": metrics.average_response_time, "median_response_time": metrics.median_response_time, "percentile_90": metrics.percentile_90, "percentile_95": metrics.percentile_95, "percentile_99": metrics.percentile_99, "min_response_time": metrics.min_response_time, "max_response_time": metrics.max_response_time, "throughput": metrics.throughput } for endpoint, metrics in endpoint_metrics.items() }, "time_series": [ { "timestamp": metrics.timestamp.isoformat(), "active_threads": metrics.active_threads, "throughput": metrics.throughput, "average_response_time": metrics.average_response_time, "error_rate": metrics.error_rate } for metrics in time_series_metrics ], "bottlenecks": { "slow_endpoints": [ { "endpoint": bottleneck.endpoint, "response_time": bottleneck.value, "threshold": bottleneck.threshold, "severity": bottleneck.severity } for bottleneck in slow_endpoints ], "error_prone_endpoints": [ { "endpoint": bottleneck.endpoint, "error_rate": bottleneck.value, "threshold": bottleneck.threshold, "severity": bottleneck.severity } for bottleneck in error_prone_endpoints ], "anomalies": [ { "timestamp": anomaly.timestamp.isoformat(), "expected_value": anomaly.expected_value, "actual_value": anomaly.actual_value, "deviation_percentage": anomaly.deviation_percentage } for anomaly in anomalies ], "concurrency_impact": concurrency_impact }, "insights": { "recommendations": [ { "issue": rec["recommendation"].issue, "recommendation": rec["recommendation"].recommendation, "expected_impact": rec["recommendation"].expected_impact, "implementation_difficulty": rec["recommendation"].implementation_difficulty, "priority_level": rec["priority_level"] } for rec in prioritized_recommendations ], "scaling_insights": [ { "topic": insight.topic, "description": insight.description } for insight in scaling_insights ] } } return results def _create_error_analysis(self, test_results: TestResults) -> Dict: """Create error analysis from test results. Args: test_results: TestResults object Returns: Dictionary containing error analysis """ # Extract error samples error_samples = [sample for sample in test_results.samples if not sample.success] if not error_samples: return {"error_types": {}, "error_patterns": []} # Count error types error_types = {} for sample in error_samples: error_message = sample.error_message or f"HTTP {sample.response_code}" if error_message in error_types: error_types[error_message] += 1 else: error_types[error_message] = 1 # Detect error patterns error_patterns = [] # Check for error spikes if test_results.start_time and test_results.end_time: # Group errors by time intervals (5-second intervals) interval_seconds = 5 duration = (test_results.end_time - test_results.start_time).total_seconds() num_intervals = int(duration / interval_seconds) + 1 # Count errors in each interval interval_errors = [0] * num_intervals for sample in error_samples: interval_index = int((sample.timestamp - test_results.start_time).total_seconds() / interval_seconds) if 0 <= interval_index < num_intervals: interval_errors[interval_index] += 1 # Calculate average errors per interval avg_errors = sum(interval_errors) / len(interval_errors) if interval_errors else 0 # Detect spikes (intervals with errors > 2 * average) for i, error_count in enumerate(interval_errors): if error_count > 2 * avg_errors and error_count > 1: spike_time = test_results.start_time + timedelta(seconds=i * interval_seconds) error_patterns.append({ "type": "spike", "timestamp": spike_time.isoformat(), "error_count": error_count }) return { "error_types": error_types, "error_patterns": error_patterns }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/QAInsights/jmeter-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server