Skip to main content
Glama

JMeter MCP Server

jmeter_server.py28.5 kB
from typing import Any import subprocess from pathlib import Path from mcp.server.fastmcp import FastMCP import os import datetime import uuid import logging import logging from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Initialize MCP server mcp = FastMCP("jmeter") async def run_jmeter(test_file: str, non_gui: bool = True, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str: """Run a JMeter test. Args: test_file: Path to the JMeter test file (.jmx) non_gui: Run in non-GUI mode (default: True) properties: Dictionary of JMeter properties to pass with -J (default: None) generate_report: Whether to generate report dashboard after load test (default: False) report_output_dir: Output folder for report dashboard (default: None) log_file: Name of JTL file to log sample results to (default: None) Returns: str: JMeter execution output """ try: # Convert to absolute path test_file_path = Path(test_file).resolve() # Validate file exists and is a .jmx file if not test_file_path.exists(): return f"Error: Test file not found: {test_file}" if not test_file_path.suffix == '.jmx': return f"Error: Invalid file type. Expected .jmx file: {test_file}" # Get JMeter binary path from environment jmeter_bin = os.getenv('JMETER_BIN', 'jmeter') java_opts = os.getenv('JMETER_JAVA_OPTS', '') # Log the JMeter binary path and Java options logger.info(f"JMeter binary path: {jmeter_bin}") logger.debug(f"Java options: {java_opts}") # Build command cmd = [str(Path(jmeter_bin).resolve())] if non_gui: cmd.extend(['-n']) cmd.extend(['-t', str(test_file_path)]) # Add JMeter properties if provided∑ if properties: for prop_name, prop_value in properties.items(): cmd.extend([f'-J{prop_name}={prop_value}']) logger.debug(f"Adding property: -J{prop_name}={prop_value}") # Add report generation options if requested if generate_report and non_gui: if log_file is None: # Generate unique log file name if not specified unique_id = generate_unique_id() log_file = f"{test_file_path.stem}_{unique_id}_results.jtl" logger.debug(f"Using generated unique log file: {log_file}") cmd.extend(['-l', log_file]) cmd.extend(['-e']) # Always ensure report_output_dir is unique unique_id = unique_id if 'unique_id' in locals() else generate_unique_id() if report_output_dir: # Append unique identifier to user-provided report directory original_dir = report_output_dir report_output_dir = f"{original_dir}_{unique_id}" logger.debug(f"Making user-provided report directory unique: {original_dir} -> {report_output_dir}") else: # Generate unique report output directory if not specified report_output_dir = f"{test_file_path.stem}_{unique_id}_report" logger.debug(f"Using generated unique report output directory: {report_output_dir}") cmd.extend(['-o', report_output_dir]) # Log the full command for debugging logger.debug(f"Executing command: {' '.join(cmd)}") if non_gui: # For non-GUI mode, capture output result = subprocess.run(cmd, capture_output=True, text=True) # Log output for debugging logger.debug("Command output:") logger.debug(f"Return code: {result.returncode}") logger.debug(f"Stdout: {result.stdout}") logger.debug(f"Stderr: {result.stderr}") if result.returncode != 0: return f"Error executing JMeter test:\n{result.stderr}" return result.stdout else: # For GUI mode, start process without capturing output subprocess.Popen(cmd) return "JMeter GUI launched successfully" except Exception as e: return f"Unexpected error: {str(e)}" @mcp.tool() async def execute_jmeter_test(test_file: str, gui_mode: bool = False, properties: dict = None) -> str: """Execute a JMeter test. Args: test_file: Path to the JMeter test file (.jmx) gui_mode: Whether to run in GUI mode (default: False) properties: Dictionary of JMeter properties to pass with -J (default: None) """ return await run_jmeter(test_file, non_gui=not gui_mode, properties=properties) # Run in non-GUI mode by default @mcp.tool() async def execute_jmeter_test_non_gui(test_file: str, properties: dict = None, generate_report: bool = False, report_output_dir: str = None, log_file: str = None) -> str: """Execute a JMeter test in non-GUI mode - supports JMeter properties. Args: test_file: Path to the JMeter test file (.jmx) properties: Dictionary of JMeter properties to pass with -J (default: None) generate_report: Whether to generate report dashboard after load test (default: False) report_output_dir: Output folder for report dashboard (default: None) log_file: Name of JTL file to log sample results to (default: None) """ return await run_jmeter(test_file, non_gui=True, properties=properties, generate_report=generate_report, report_output_dir=report_output_dir, log_file=log_file) # Import the analyzer module from analyzer.models import TestResults from analyzer.analyzer import TestResultsAnalyzer from analyzer.visualization.engine import VisualizationEngine @mcp.tool() async def analyze_jmeter_results(jtl_file: str, detailed: bool = False) -> str: """Analyze JMeter test results and provide a summary of key metrics and insights. Args: jtl_file: Path to the JTL file containing test results detailed: Whether to include detailed analysis (default: False) Returns: str: Analysis results in a formatted string """ try: analyzer = TestResultsAnalyzer() # Validate file exists file_path = Path(jtl_file) if not file_path.exists(): return f"Error: JTL file not found: {jtl_file}" try: # Analyze the file analysis_results = analyzer.analyze_file(file_path, detailed=detailed) # Format the results as a string result_str = f"Analysis of {jtl_file}:\n\n" # Add summary information summary = analysis_results.get("summary", {}) result_str += "Summary:\n" result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n" result_str += f"- Error count: {summary.get('error_count', 'N/A')} ({summary.get('error_rate', 'N/A'):.2f}%)\n" result_str += f"- Response times (ms):\n" result_str += f" - Average: {summary.get('average_response_time', 'N/A'):.2f}\n" result_str += f" - Median: {summary.get('median_response_time', 'N/A'):.2f}\n" result_str += f" - 90th percentile: {summary.get('percentile_90', 'N/A'):.2f}\n" result_str += f" - 95th percentile: {summary.get('percentile_95', 'N/A'):.2f}\n" result_str += f" - 99th percentile: {summary.get('percentile_99', 'N/A'):.2f}\n" result_str += f" - Min: {summary.get('min_response_time', 'N/A'):.2f}\n" result_str += f" - Max: {summary.get('max_response_time', 'N/A'):.2f}\n" result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n" result_str += f"- Start time: {summary.get('start_time', 'N/A')}\n" result_str += f"- End time: {summary.get('end_time', 'N/A')}\n" result_str += f"- Duration: {summary.get('duration', 'N/A'):.2f} seconds\n\n" # Add detailed information if requested if detailed and "detailed" in analysis_results: detailed_info = analysis_results["detailed"] # Add endpoint information endpoints = detailed_info.get("endpoints", {}) if endpoints: result_str += "Endpoint Analysis:\n" for endpoint, metrics in endpoints.items(): result_str += f"- {endpoint}:\n" result_str += f" - Samples: {metrics.get('total_samples', 'N/A')}\n" result_str += f" - Errors: {metrics.get('error_count', 'N/A')} ({metrics.get('error_rate', 'N/A'):.2f}%)\n" result_str += f" - Average response time: {metrics.get('average_response_time', 'N/A'):.2f} ms\n" result_str += f" - 95th percentile: {metrics.get('percentile_95', 'N/A'):.2f} ms\n" result_str += f" - Throughput: {metrics.get('throughput', 'N/A'):.2f} requests/second\n" result_str += "\n" # Add bottleneck information bottlenecks = detailed_info.get("bottlenecks", {}) if bottlenecks: result_str += "Bottleneck Analysis:\n" # Slow endpoints slow_endpoints = bottlenecks.get("slow_endpoints", []) if slow_endpoints: result_str += "- Slow Endpoints:\n" for endpoint in slow_endpoints: result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" # Error-prone endpoints error_endpoints = bottlenecks.get("error_prone_endpoints", []) if error_endpoints: result_str += "- Error-Prone Endpoints:\n" for endpoint in error_endpoints: result_str += f" - {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" # Anomalies anomalies = bottlenecks.get("anomalies", []) if anomalies: result_str += "- Response Time Anomalies:\n" for anomaly in anomalies[:3]: # Show only top 3 anomalies result_str += f" - At {anomaly.get('timestamp')}: " result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, " result_str += f"Got {anomaly.get('actual_value'):.2f} ms " result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n" result_str += "\n" # Concurrency impact concurrency = bottlenecks.get("concurrency_impact", {}) if concurrency: result_str += "- Concurrency Impact:\n" correlation = concurrency.get("correlation", 0) result_str += f" - Correlation between threads and response time: {correlation:.2f}\n" if concurrency.get("has_degradation", False): result_str += f" - Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n" else: result_str += " - No significant performance degradation detected with increasing threads\n" result_str += "\n" # Add insights and recommendations insights = detailed_info.get("insights", {}) if insights: result_str += "Insights and Recommendations:\n" # Recommendations recommendations = insights.get("recommendations", []) if recommendations: result_str += "- Top Recommendations:\n" for rec in recommendations[:3]: # Show only top 3 recommendations result_str += f" - [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n" result_str += f" Recommendation: {rec.get('recommendation')}\n" result_str += f" Expected Impact: {rec.get('expected_impact')}\n" result_str += "\n" # Scaling insights scaling_insights = insights.get("scaling_insights", []) if scaling_insights: result_str += "- Scaling Insights:\n" for insight in scaling_insights[:2]: # Show only top 2 insights result_str += f" - {insight.get('topic')}: {insight.get('description')}\n" result_str += "\n" # Add time series information (just a summary) time_series = detailed_info.get("time_series", []) if time_series: result_str += "Time Series Analysis:\n" result_str += f"- Intervals: {len(time_series)}\n" result_str += f"- Interval duration: 5 seconds\n" # Calculate average throughput and response time over intervals avg_throughput = sum(ts.get('throughput', 0) for ts in time_series) / len(time_series) avg_response_time = sum(ts.get('average_response_time', 0) for ts in time_series) / len(time_series) result_str += f"- Average throughput over intervals: {avg_throughput:.2f} requests/second\n" result_str += f"- Average response time over intervals: {avg_response_time:.2f} ms\n\n" return result_str except ValueError as e: return f"Error analyzing JTL file: {str(e)}" except Exception as e: return f"Error analyzing JMeter results: {str(e)}" @mcp.tool() async def identify_performance_bottlenecks(jtl_file: str) -> str: """Identify performance bottlenecks in JMeter test results. Args: jtl_file: Path to the JTL file containing test results Returns: str: Bottleneck analysis results in a formatted string """ try: analyzer = TestResultsAnalyzer() # Validate file exists file_path = Path(jtl_file) if not file_path.exists(): return f"Error: JTL file not found: {jtl_file}" try: # Analyze the file with detailed analysis analysis_results = analyzer.analyze_file(file_path, detailed=True) # Format the results as a string result_str = f"Performance Bottleneck Analysis of {jtl_file}:\n\n" # Add bottleneck information detailed_info = analysis_results.get("detailed", {}) bottlenecks = detailed_info.get("bottlenecks", {}) if not bottlenecks: return f"No bottlenecks identified in {jtl_file}." # Slow endpoints slow_endpoints = bottlenecks.get("slow_endpoints", []) if slow_endpoints: result_str += "Slow Endpoints:\n" for endpoint in slow_endpoints: result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('response_time'):.2f} ms " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" else: result_str += "No slow endpoints identified.\n\n" # Error-prone endpoints error_endpoints = bottlenecks.get("error_prone_endpoints", []) if error_endpoints: result_str += "Error-Prone Endpoints:\n" for endpoint in error_endpoints: result_str += f"- {endpoint.get('endpoint')}: {endpoint.get('error_rate'):.2f}% " result_str += f"(Severity: {endpoint.get('severity')})\n" result_str += "\n" else: result_str += "No error-prone endpoints identified.\n\n" # Anomalies anomalies = bottlenecks.get("anomalies", []) if anomalies: result_str += "Response Time Anomalies:\n" for anomaly in anomalies: result_str += f"- At {anomaly.get('timestamp')}: " result_str += f"Expected {anomaly.get('expected_value'):.2f} ms, " result_str += f"Got {anomaly.get('actual_value'):.2f} ms " result_str += f"({anomaly.get('deviation_percentage'):.2f}% deviation)\n" result_str += "\n" else: result_str += "No response time anomalies detected.\n\n" # Concurrency impact concurrency = bottlenecks.get("concurrency_impact", {}) if concurrency: result_str += "Concurrency Impact:\n" correlation = concurrency.get("correlation", 0) result_str += f"- Correlation between threads and response time: {correlation:.2f}\n" if concurrency.get("has_degradation", False): result_str += f"- Performance degradation detected at {concurrency.get('degradation_threshold')} threads\n" else: result_str += "- No significant performance degradation detected with increasing threads\n" result_str += "\n" # Add recommendations insights = detailed_info.get("insights", {}) recommendations = insights.get("recommendations", []) if recommendations: result_str += "Recommendations:\n" for rec in recommendations[:5]: # Show top 5 recommendations result_str += f"- [{rec.get('priority_level', 'medium').upper()}] {rec.get('recommendation')}\n" else: result_str += "No specific recommendations available.\n" return result_str except ValueError as e: return f"Error analyzing JTL file: {str(e)}" except Exception as e: return f"Error identifying performance bottlenecks: {str(e)}" @mcp.tool() async def get_performance_insights(jtl_file: str) -> str: """Get insights and recommendations for improving performance based on JMeter test results. Args: jtl_file: Path to the JTL file containing test results Returns: str: Performance insights and recommendations in a formatted string """ try: analyzer = TestResultsAnalyzer() # Validate file exists file_path = Path(jtl_file) if not file_path.exists(): return f"Error: JTL file not found: {jtl_file}" try: # Analyze the file with detailed analysis analysis_results = analyzer.analyze_file(file_path, detailed=True) # Format the results as a string result_str = f"Performance Insights for {jtl_file}:\n\n" # Add insights information detailed_info = analysis_results.get("detailed", {}) insights = detailed_info.get("insights", {}) if not insights: return f"No insights available for {jtl_file}." # Recommendations recommendations = insights.get("recommendations", []) if recommendations: result_str += "Recommendations:\n" for i, rec in enumerate(recommendations[:5], 1): # Show top 5 recommendations result_str += f"{i}. [{rec.get('priority_level', 'medium').upper()}] {rec.get('issue')}\n" result_str += f" - Recommendation: {rec.get('recommendation')}\n" result_str += f" - Expected Impact: {rec.get('expected_impact')}\n" result_str += f" - Implementation Difficulty: {rec.get('implementation_difficulty')}\n\n" else: result_str += "No specific recommendations available.\n\n" # Scaling insights scaling_insights = insights.get("scaling_insights", []) if scaling_insights: result_str += "Scaling Insights:\n" for i, insight in enumerate(scaling_insights, 1): result_str += f"{i}. {insight.get('topic')}\n" result_str += f" {insight.get('description')}\n\n" else: result_str += "No scaling insights available.\n\n" # Add summary metrics for context summary = analysis_results.get("summary", {}) result_str += "Test Summary:\n" result_str += f"- Total samples: {summary.get('total_samples', 'N/A')}\n" result_str += f"- Error rate: {summary.get('error_rate', 'N/A'):.2f}%\n" result_str += f"- Average response time: {summary.get('average_response_time', 'N/A'):.2f} ms\n" result_str += f"- 95th percentile: {summary.get('percentile_95', 'N/A'):.2f} ms\n" result_str += f"- Throughput: {summary.get('throughput', 'N/A'):.2f} requests/second\n" return result_str except ValueError as e: return f"Error analyzing JTL file: {str(e)}" except Exception as e: return f"Error getting performance insights: {str(e)}" @mcp.tool() async def generate_visualization(jtl_file: str, visualization_type: str, output_file: str) -> str: """Generate visualizations of JMeter test results. Args: jtl_file: Path to the JTL file containing test results visualization_type: Type of visualization to generate (time_series, distribution, comparison, html_report) output_file: Path to save the visualization Returns: str: Path to the generated visualization file """ try: analyzer = TestResultsAnalyzer() # Validate file exists file_path = Path(jtl_file) if not file_path.exists(): return f"Error: JTL file not found: {jtl_file}" try: # Analyze the file with detailed analysis analysis_results = analyzer.analyze_file(file_path, detailed=True) # Create visualization engine output_dir = os.path.dirname(output_file) if output_file else None engine = VisualizationEngine(output_dir=output_dir) # Generate visualization based on type if visualization_type == "time_series": # Extract time series metrics time_series = analysis_results.get("detailed", {}).get("time_series", []) if not time_series: return "No time series data available for visualization." # Convert to TimeSeriesMetrics objects metrics = [] for ts_data in time_series: metrics.append(TimeSeriesMetrics( timestamp=datetime.datetime.fromisoformat(ts_data["timestamp"]), active_threads=ts_data["active_threads"], throughput=ts_data["throughput"], average_response_time=ts_data["average_response_time"], error_rate=ts_data["error_rate"] )) # Create visualization output_path = engine.create_time_series_graph( metrics, metric_name="average_response_time", output_file=output_file) return f"Time series graph generated: {output_path}" elif visualization_type == "distribution": # Extract response times samples = [] for endpoint, metrics in analysis_results.get("detailed", {}).get("endpoints", {}).items(): samples.extend([metrics["average_response_time"]] * metrics["total_samples"]) if not samples: return "No response time data available for visualization." # Create visualization output_path = engine.create_distribution_graph(samples, output_file=output_file) return f"Distribution graph generated: {output_path}" elif visualization_type == "comparison": # Extract endpoint metrics endpoints = analysis_results.get("detailed", {}).get("endpoints", {}) if not endpoints: return "No endpoint data available for visualization." # Convert to EndpointMetrics objects endpoint_metrics = {} for endpoint, metrics_data in endpoints.items(): endpoint_metrics[endpoint] = EndpointMetrics( endpoint=endpoint, total_samples=metrics_data["total_samples"], error_count=metrics_data["error_count"], error_rate=metrics_data["error_rate"], average_response_time=metrics_data["average_response_time"], median_response_time=metrics_data["median_response_time"], percentile_90=metrics_data["percentile_90"], percentile_95=metrics_data["percentile_95"], percentile_99=metrics_data["percentile_99"], min_response_time=metrics_data["min_response_time"], max_response_time=metrics_data["max_response_time"], throughput=metrics_data["throughput"], test_duration=analysis_results["summary"]["duration"] ) # Create visualization output_path = engine.create_endpoint_comparison_chart( endpoint_metrics, metric_name="average_response_time", output_file=output_file) return f"Endpoint comparison chart generated: {output_path}" elif visualization_type == "html_report": # Create HTML report output_path = engine.create_html_report(analysis_results, output_file) return f"HTML report generated: {output_path}" else: return f"Unknown visualization type: {visualization_type}. " \ f"Supported types: time_series, distribution, comparison, html_report" except ValueError as e: return f"Error generating visualization: {str(e)}" except Exception as e: return f"Error generating visualization: {str(e)}" def generate_unique_id(): """ Generate a unique identifier using timestamp and UUID. Returns: str: A unique identifier string """ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_id = str(uuid.uuid4())[:8] # Use first 8 chars of UUID for brevity return f"{timestamp}_{random_id}" if __name__ == "__main__": mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/QAInsights/jmeter-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server