Skip to main content
Glama
engine.py21.5 kB
""" Visualization engine for JMeter test results. This module provides functionality for creating visual representations of JMeter test results analysis, including time series graphs, distribution graphs, endpoint comparison charts, and visualization output formats. """ import base64 import io import os from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Union # Note: In a real implementation, we would use matplotlib for visualization. # However, for the purpose of this implementation, we'll create a simplified version # that doesn't rely on external libraries. class VisualizationEngine: """Engine for creating visual representations of test results analysis.""" def __init__(self, output_dir: Optional[str] = None): """Initialize the visualization engine. Args: output_dir: Directory to save visualization files (default: None) """ self.output_dir = output_dir if output_dir: os.makedirs(output_dir, exist_ok=True) def create_time_series_graph(self, time_series_metrics: List, metric_name: str = "average_response_time", title: Optional[str] = None, output_file: Optional[str] = None) -> Union[str, Dict]: """Create a time-series graph showing performance over the test duration. Args: time_series_metrics: List of TimeSeriesMetrics objects metric_name: Name of the metric to plot (default: "average_response_time") title: Graph title (default: None) output_file: Path to save the graph (default: None) Returns: Path to the saved graph file or Figure object """ if not time_series_metrics: raise ValueError("No time series metrics provided") # Extract data timestamps = [metrics.timestamp for metrics in time_series_metrics] if metric_name == "average_response_time": values = [metrics.average_response_time for metrics in time_series_metrics] y_label = "Response Time (ms)" graph_title = title or "Response Time Over Time" elif metric_name == "throughput": values = [metrics.throughput for metrics in time_series_metrics] y_label = "Throughput (requests/second)" graph_title = title or "Throughput Over Time" elif metric_name == "error_rate": values = [metrics.error_rate for metrics in time_series_metrics] y_label = "Error Rate (%)" graph_title = title or "Error Rate Over Time" elif metric_name == "active_threads": values = [metrics.active_threads for metrics in time_series_metrics] y_label = "Active Threads" graph_title = title or "Active Threads Over Time" else: raise ValueError(f"Unknown metric name: {metric_name}") # Create a simple representation of the graph graph = { "type": "time_series", "title": graph_title, "x_label": "Time", "y_label": y_label, "timestamps": [ts.isoformat() for ts in timestamps], "values": values } # Save or return if output_file: output_path = self._get_output_path(output_file) with open(output_path, 'w') as f: f.write(f"Time Series Graph: {graph_title}\n") f.write(f"X-axis: Time\n") f.write(f"Y-axis: {y_label}\n") f.write("Data:\n") for ts, val in zip(timestamps, values): f.write(f"{ts.isoformat()}: {val}\n") return output_path else: return graph def create_distribution_graph(self, response_times: List[float], percentiles: List[int] = [50, 90, 95, 99], title: Optional[str] = None, output_file: Optional[str] = None) -> Union[str, Dict]: """Create a distribution graph showing response time distributions. Args: response_times: List of response times percentiles: List of percentiles to mark (default: [50, 90, 95, 99]) title: Graph title (default: None) output_file: Path to save the graph (default: None) Returns: Path to the saved graph file or Figure object """ if not response_times: raise ValueError("No response times provided") # Calculate percentile values percentile_values = {} for p in percentiles: percentile_values[p] = self._calculate_percentile(response_times, p) # Create a simple representation of the graph graph_title = title or "Response Time Distribution" graph = { "type": "distribution", "title": graph_title, "x_label": "Response Time (ms)", "y_label": "Frequency", "response_times": response_times, "percentiles": percentile_values } # Save or return if output_file: output_path = self._get_output_path(output_file) with open(output_path, 'w') as f: f.write(f"Distribution Graph: {graph_title}\n") f.write(f"X-axis: Response Time (ms)\n") f.write(f"Y-axis: Frequency\n") f.write("Percentiles:\n") for p, val in percentile_values.items(): f.write(f"{p}th Percentile: {val:.2f} ms\n") return output_path else: return graph def create_endpoint_comparison_chart(self, endpoint_metrics: Dict, metric_name: str = "average_response_time", top_n: int = 10, title: Optional[str] = None, output_file: Optional[str] = None) -> Union[str, Dict]: """Create a comparison chart for different endpoints. Args: endpoint_metrics: Dictionary mapping endpoint names to EndpointMetrics objects metric_name: Name of the metric to compare (default: "average_response_time") top_n: Number of top endpoints to include (default: 10) title: Chart title (default: None) output_file: Path to save the chart (default: None) Returns: Path to the saved chart file or Figure object """ if not endpoint_metrics: raise ValueError("No endpoint metrics provided") # Extract data if metric_name == "average_response_time": values = {endpoint: metrics.average_response_time for endpoint, metrics in endpoint_metrics.items()} y_label = "Average Response Time (ms)" chart_title = title or "Endpoint Response Time Comparison" elif metric_name == "error_rate": values = {endpoint: metrics.error_rate for endpoint, metrics in endpoint_metrics.items()} y_label = "Error Rate (%)" chart_title = title or "Endpoint Error Rate Comparison" elif metric_name == "throughput": values = {endpoint: metrics.throughput for endpoint, metrics in endpoint_metrics.items()} y_label = "Throughput (requests/second)" chart_title = title or "Endpoint Throughput Comparison" else: raise ValueError(f"Unknown metric name: {metric_name}") # Sort endpoints by value (descending) and take top N sorted_endpoints = sorted(values.items(), key=lambda x: x[1], reverse=True)[:top_n] endpoints = [item[0] for item in sorted_endpoints] values_list = [item[1] for item in sorted_endpoints] # Create a simple representation of the chart chart = { "type": "endpoint_comparison", "title": chart_title, "x_label": y_label, "y_label": "Endpoint", "endpoints": endpoints, "values": values_list } # Save or return if output_file: output_path = self._get_output_path(output_file) with open(output_path, 'w') as f: f.write(f"Endpoint Comparison Chart: {chart_title}\n") f.write(f"X-axis: {y_label}\n") f.write(f"Y-axis: Endpoint\n") f.write("Data:\n") for endpoint, value in zip(endpoints, values_list): f.write(f"{endpoint}: {value:.2f}\n") return output_path else: return chart def create_html_report(self, analysis_results: Dict, output_file: str) -> str: """Create an HTML report from analysis results. Args: analysis_results: Dictionary containing analysis results output_file: Path to save the HTML report Returns: Path to the saved HTML report """ # Extract data summary = analysis_results.get("summary", {}) detailed = analysis_results.get("detailed", {}) # Create HTML content html_content = f""" <!DOCTYPE html> <html> <head> <title>JMeter Test Results Analysis</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} h1, h2, h3 {{ color: #333; }} table {{ border-collapse: collapse; width: 100%; margin-bottom: 20px; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} th {{ background-color: #f2f2f2; }} tr:nth-child(even) {{ background-color: #f9f9f9; }} .chart {{ margin: 20px 0; max-width: 100%; }} .section {{ margin-bottom: 30px; }} .severity-high {{ color: #d9534f; }} .severity-medium {{ color: #f0ad4e; }} .severity-low {{ color: #5bc0de; }} </style> </head> <body> <h1>JMeter Test Results Analysis</h1> <div class="section"> <h2>Summary</h2> <table> <tr><th>Metric</th><th>Value</th></tr> <tr><td>Total Samples</td><td>{summary.get('total_samples', 'N/A')}</td></tr> <tr><td>Error Count</td><td>{summary.get('error_count', 'N/A')}</td></tr> <tr><td>Error Rate</td><td>{summary.get('error_rate', 'N/A'):.2f}%</td></tr> <tr><td>Average Response Time</td><td>{summary.get('average_response_time', 'N/A'):.2f} ms</td></tr> <tr><td>Median Response Time</td><td>{summary.get('median_response_time', 'N/A'):.2f} ms</td></tr> <tr><td>90th Percentile</td><td>{summary.get('percentile_90', 'N/A'):.2f} ms</td></tr> <tr><td>95th Percentile</td><td>{summary.get('percentile_95', 'N/A'):.2f} ms</td></tr> <tr><td>99th Percentile</td><td>{summary.get('percentile_99', 'N/A'):.2f} ms</td></tr> <tr><td>Min Response Time</td><td>{summary.get('min_response_time', 'N/A'):.2f} ms</td></tr> <tr><td>Max Response Time</td><td>{summary.get('max_response_time', 'N/A'):.2f} ms</td></tr> <tr><td>Throughput</td><td>{summary.get('throughput', 'N/A'):.2f} requests/second</td></tr> <tr><td>Start Time</td><td>{summary.get('start_time', 'N/A')}</td></tr> <tr><td>End Time</td><td>{summary.get('end_time', 'N/A')}</td></tr> <tr><td>Duration</td><td>{summary.get('duration', 'N/A'):.2f} seconds</td></tr> </table> </div> """ # Add detailed information if available if detailed: # Add endpoint information endpoints = detailed.get("endpoints", {}) if endpoints: html_content += """ <div class="section"> <h2>Endpoint Analysis</h2> <table> <tr> <th>Endpoint</th> <th>Samples</th> <th>Errors</th> <th>Error Rate</th> <th>Avg Response Time</th> <th>95th Percentile</th> <th>Throughput</th> </tr> """ for endpoint, metrics in endpoints.items(): html_content += f""" <tr> <td>{endpoint}</td> <td>{metrics['total_samples']}</td> <td>{metrics['error_count']}</td> <td>{metrics['error_rate']:.2f}%</td> <td>{metrics['average_response_time']:.2f} ms</td> <td>{metrics['percentile_95']:.2f} ms</td> <td>{metrics['throughput']:.2f} req/s</td> </tr> """ html_content += """ </table> </div> """ # Add bottleneck information bottlenecks = detailed.get("bottlenecks", {}) if bottlenecks: html_content += """ <div class="section"> <h2>Bottleneck Analysis</h2> """ # Slow endpoints slow_endpoints = bottlenecks.get("slow_endpoints", []) if slow_endpoints: html_content += """ <h3>Slow Endpoints</h3> <table> <tr> <th>Endpoint</th> <th>Response Time</th> <th>Threshold</th> <th>Severity</th> </tr> """ for endpoint in slow_endpoints: severity_class = f"severity-{endpoint.get('severity', 'medium')}" html_content += f""" <tr> <td>{endpoint.get('endpoint')}</td> <td>{endpoint.get('response_time', 'N/A'):.2f} ms</td> <td>{endpoint.get('threshold', 'N/A'):.2f} ms</td> <td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td> </tr> """ html_content += """ </table> """ # Error-prone endpoints error_endpoints = bottlenecks.get("error_prone_endpoints", []) if error_endpoints: html_content += """ <h3>Error-Prone Endpoints</h3> <table> <tr> <th>Endpoint</th> <th>Error Rate</th> <th>Threshold</th> <th>Severity</th> </tr> """ for endpoint in error_endpoints: severity_class = f"severity-{endpoint.get('severity', 'medium')}" html_content += f""" <tr> <td>{endpoint.get('endpoint')}</td> <td>{endpoint.get('error_rate', 'N/A'):.2f}%</td> <td>{endpoint.get('threshold', 'N/A'):.2f}%</td> <td class="{severity_class}">{endpoint.get('severity', 'N/A').upper()}</td> </tr> """ html_content += """ </table> """ html_content += """ </div> """ # Add insights and recommendations insights = detailed.get("insights", {}) if insights: html_content += """ <div class="section"> <h2>Insights and Recommendations</h2> """ # Recommendations recommendations = insights.get("recommendations", []) if recommendations: html_content += """ <h3>Recommendations</h3> <table> <tr> <th>Priority</th> <th>Issue</th> <th>Recommendation</th> <th>Expected Impact</th> </tr> """ for rec in recommendations: priority_level = rec.get('priority_level', 'medium') severity_class = f"severity-{priority_level}" html_content += f""" <tr> <td class="{severity_class}">{priority_level.upper()}</td> <td>{rec.get('issue')}</td> <td>{rec.get('recommendation')}</td> <td>{rec.get('expected_impact')}</td> </tr> """ html_content += """ </table> """ # Scaling insights scaling_insights = insights.get("scaling_insights", []) if scaling_insights: html_content += """ <h3>Scaling Insights</h3> <table> <tr> <th>Topic</th> <th>Description</th> </tr> """ for insight in scaling_insights: html_content += f""" <tr> <td>{insight.get('topic')}</td> <td>{insight.get('description')}</td> </tr> """ html_content += """ </table> """ html_content += """ </div> """ # Close HTML html_content += """ </body> </html> """ # Save HTML report output_path = self._get_output_path(output_file) with open(output_path, 'w') as f: f.write(html_content) return output_path def figure_to_base64(self, fig) -> str: """Convert a figure to a base64-encoded string. Args: fig: Figure object Returns: Base64-encoded string """ # In a real implementation, this would convert a matplotlib figure to base64 # For this simplified version, we'll just return a placeholder return "base64_encoded_image_placeholder" def _get_output_path(self, output_file: str) -> str: """Get the full path for an output file. Args: output_file: Output file name or path Returns: Full path to the output file """ if self.output_dir: return os.path.join(self.output_dir, output_file) else: return output_file def _calculate_percentile(self, values: List[float], percentile: float) -> float: """Calculate a percentile from values. Args: values: List of values percentile: Percentile to calculate (0-100) Returns: Percentile value """ if not values: return 0 # Sort values sorted_values = sorted(values) # Calculate index index = (percentile / 100) * (len(sorted_values) - 1) # If index is an integer, return the value at that index if index.is_integer(): return sorted_values[int(index)] # Otherwise, interpolate between the two nearest values lower_index = int(index) upper_index = lower_index + 1 lower_value = sorted_values[lower_index] upper_value = sorted_values[upper_index] fraction = index - lower_index return lower_value + (upper_value - lower_value) * fraction

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/QAInsights/jmeter-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server