analyze_jmeter_results
Analyze JMeter test results to extract key metrics, generate summaries, and provide insights. Input a JTL file for quick or detailed performance analysis to enhance test assessment.
Instructions
Analyze JMeter test results and provide a summary of key metrics and insights.
Args: jtl_file: Path to the JTL file containing test results detailed: Whether to include detailed analysis (default: False)
Returns: str: Analysis results in a formatted string
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| detailed | No | ||
| jtl_file | Yes |
Implementation Reference
- jmeter_server.py:150-302 (handler)The core handler function for the 'analyze_jmeter_results' tool. Decorated with @mcp.tool() for MCP registration. Analyzes JMeter JTL file using TestResultsAnalyzer, computes summary metrics, detailed endpoint/bottleneck analysis, insights, and formats into a comprehensive string report.@mcp.tool() async def analyze_jmeter_results(jtl_file: str, detailed: bool = False) -> str: """Analyze JMeter test results and provide a summary of key metrics and insights. Args: jtl_file: Path to the JTL file containing test results detailed: Whether to include detailed analysis (default: False) Returns: str: Analysis results in a formatted string """ try: analyzer = TestResultsAnalyzer() # Validate file exists file_path = Path(jtl_file) if not file_path.exists(): return f"Error: JTL file not found: {jtl_file}" try: # Analyze the file analysis_results = analyzer.analyze_file(file_path, detailed=detailed) # Format the results as a string result_str = f"Analysis of {jtl_file}:\n\n" # Add summary information summary = analysis_results.get("summary", {}) result_str += "Summary:\n" result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n" result_str += f"- Error count: {summary.get('error_count', 'N/A')} ({summary.get('error_rate', 'N/A'):.2f}%)\n" result_str += f"- Response times (ms):\n" result_str += f" - Average: {summary.get('average_response_time', 'N/A'):.2f}\n" result_str += f" - Median: {summary.get('median_response_time', 'N/A'):.2f}\n" result_str += f" - 90th percentile: {summary.get('percentile_90', 'N/A'):.2f}\n" result_str += f" - 95th percentile: {summary.get('percentile_95', 'N/A'):.2f}\n" result_str += f" - 99th percentile: {summary.get('percentile_99', 'N/A'):.2f}\n" result_str += f" - Min: {summary.get('min_response_time', 'N/A'):.2f}\n" result_str += f" - Max: {summary.get('max_response_time', 'N/A'):.2f}\n" result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n" result_str += f"- Start time: {summary.get('start_time', 'N/A')}\n" result_str += f"- End time: {summary.get('end_time', 'N/A')}\n" result_str += f"- Duration: {summary.get('duration', 'N/A'):.2f} seconds\n\n" # Add detailed information if requested if detailed and "detailed" in analysis_results: detailed_info = analysis_results["detailed"] # Add endpoint information endpoints = detailed_info.get("endpoints", {}) if endpoints: result_str += "Endpoint Analysis:\n" for endpoint, metrics in endpoints.items(): result_str += f"- {endpoint}:\n" result_str += f" - Samples: {metrics.get('total_samples', 'N/A')}\n" result_str += f" - Errors: {metrics.get('error_count', 'N/A')} ({metrics.get('error_rate', 'N/A'):.2f}%)\n" result_str += f" - Average response time: {metrics.get('average_response_time', 'N/A'):.2f} ms\n" result_str += f" - 95th percentile: {metrics.get('percentile_95', 'N/A'):.2f} ms\n" result_str += f" - Throughput: {metrics.get('throughput', 'N/A'):.2f} requests/second\n" result_str += "\n" # Add bottleneck information bottlenecks = detailed_info.get("bottlenecks", {}) if bottlenecks: result_str += "Bottleneck Analysis:\n" # Slow endpoints slow_endpoints = bottlenecks.get("slow_endpoints", []) if slow_endpoints: result_str += "- Slow Endpoints:\n" for endpoint in slow_endpoints: result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" # Error-prone endpoints error_endpoints = bottlenecks.get("error_prone_endpoints", []) if error_endpoints: result_str += "- Error-Prone Endpoints:\n" for endpoint in error_endpoints: result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" # Anomalies anomalies = bottlenecks.get("anomalies", []) if anomalies: result_str += "- Response Time Anomalies:\n" for anomaly in anomalies[:3]: # Show only top 3 anomalies result_str += f" - At {anomaly.get('timestamp')}: " result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, " result_str += f"Got {anomaly.get('actual_value'):.2f} ms " result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n" result_str += "\n" # Concurrency impact concurrency = bottlenecks.get("concurrency_impact", {}) if concurrency: result_str += "- Concurrency Impact:\n" correlation = concurrency.get("correlation", 0) result_str += f" - Correlation between threads and response time: {correlation:.2f}\n" if concurrency.get("has_degradation", False): result_str += f" - Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n" else: result_str += " - No significant performance degradation detected with increasing threads\n" result_str += "\n" # Add insights and recommendations insights = detailed_info.get("insights", {}) if insights: result_str += "Insights and Recommendations:\n" # Recommendations recommendations = insights.get("recommendations", []) if recommendations: result_str += "- Top Recommendations:\n" for rec in recommendations[:3]: # Show only top 3 recommendations result_str += f" - [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n" result_str += f" Recommendation: {rec.get('recommendation')}\n" result_str += f" Expected Impact: {rec.get('expected_impact')}\n" result_str += "\n" # Scaling insights scaling_insights = insights.get("scaling_insights", []) if scaling_insights: result_str += "- Scaling Insights:\n" for insight in scaling_insights[:2]: # Show only top 2 insights result_str += f" - {insight.get('topic')}: {insight.get('description')}\n" result_str += "\n" # Add time series information (just a summary) time_series = detailed_info.get("time_series", []) if time_series: result_str += "Time Series Analysis:\n" result_str += f"- Intervals: {len(time_series)}\n" result_str += f"- Interval duration: 5 seconds\n" # Calculate average throughput and response time over intervals avg_throughput = sum(ts.get('throughput', 0) for ts in time_series) / len(time_series) avg_response_time = sum(ts.get('average_response_time', 0) for ts in time_series) / len(time_series) result_str += f"- Average throughput over intervals: {avg_throughput:.2f} requests/second\n" result_str += f"- Average response time over intervals: {avg_response_time:.2f} ms\n\n" return result_str except ValueError as e: return f"Error analyzing JTL file: {str(e)}" except Exception as e: return f"Error analyzing JMeter results: {str(e)}"