Skip to main content
Glama
cbcoutinho

Nextcloud MCP Server

by cbcoutinho
benchmark.py15.3 kB
#!/usr/bin/env python3 """ Load testing benchmark for Nextcloud MCP Server. Usage: uv run python -m tests.load.benchmark --concurrency 10 --duration 30 uv run python -m tests.load.benchmark -c 50 -d 300 --output results.json """ import json import logging import signal import statistics import sys import time from collections import Counter from contextlib import asynccontextmanager from typing import Any import anyio import click from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from tests.load.workloads import MixedWorkload, OperationResult, WorkloadOperations logging.basicConfig( level=logging.WARNING, format="%(levelname)s [%(asctime)s] %(name)s - %(message)s" ) logger = logging.getLogger(__name__) class BenchmarkMetrics: """Collect and analyze benchmark metrics.""" def __init__(self): self.results: list[OperationResult] = [] self.start_time: float | None = None self.end_time: float | None = None self._operation_counts: Counter = Counter() self._operation_errors: Counter = Counter() def add_result(self, result: OperationResult): """Add a single operation result.""" self.results.append(result) self._operation_counts[result.operation] += 1 if not result.success: self._operation_errors[result.operation] += 1 def start(self): """Mark the start of the benchmark.""" self.start_time = time.time() def stop(self): """Mark the end of the benchmark.""" self.end_time = time.time() @property def duration(self) -> float: """Total benchmark duration in seconds.""" if self.start_time is None or self.end_time is None: return 0.0 return self.end_time - self.start_time @property def total_requests(self) -> int: """Total number of requests made.""" return len(self.results) @property def successful_requests(self) -> int: """Number of successful requests.""" return sum(1 for r in self.results if r.success) @property def failed_requests(self) -> int: """Number of failed requests.""" return sum(1 for r in self.results if not r.success) @property def error_rate(self) -> float: """Error rate as a percentage.""" if self.total_requests == 0: return 0.0 return (self.failed_requests / self.total_requests) * 100 @property def requests_per_second(self) -> float: """Average requests per second.""" if self.duration == 0: return 0.0 return self.total_requests / self.duration def latency_stats(self) -> dict[str, float]: """Calculate latency statistics.""" if not self.results: return { "min": 0.0, "max": 0.0, "mean": 0.0, "median": 0.0, "p90": 0.0, "p95": 0.0, "p99": 0.0, } durations = [r.duration for r in self.results] sorted_durations = sorted(durations) def percentile(data: list[float], p: float) -> float: k = (len(data) - 1) * p f = int(k) c = f + 1 if c >= len(data): return data[-1] return data[f] + (k - f) * (data[c] - data[f]) return { "min": min(durations), "max": max(durations), "mean": statistics.mean(durations), "median": statistics.median(durations), "p90": percentile(sorted_durations, 0.90), "p95": percentile(sorted_durations, 0.95), "p99": percentile(sorted_durations, 0.99), } def operation_breakdown(self) -> dict[str, dict[str, Any]]: """Get per-operation statistics.""" breakdown = {} for op_name in self._operation_counts: op_results = [r for r in self.results if r.operation == op_name] op_durations = [r.duration for r in op_results if r.success] if op_durations: sorted_durations = sorted(op_durations) p50 = statistics.median(sorted_durations) p95_idx = int(len(sorted_durations) * 0.95) p95 = sorted_durations[min(p95_idx, len(sorted_durations) - 1)] else: p50 = p95 = 0.0 breakdown[op_name] = { "count": self._operation_counts[op_name], "errors": self._operation_errors[op_name], "success_rate": ( (self._operation_counts[op_name] - self._operation_errors[op_name]) / self._operation_counts[op_name] * 100 ), "p50_latency": p50, "p95_latency": p95, } return breakdown def to_dict(self) -> dict[str, Any]: """Convert metrics to dictionary for JSON export.""" return { "summary": { "duration": self.duration, "total_requests": self.total_requests, "successful_requests": self.successful_requests, "failed_requests": self.failed_requests, "error_rate": self.error_rate, "requests_per_second": self.requests_per_second, }, "latency": self.latency_stats(), "operations": self.operation_breakdown(), } def print_report(self): """Print human-readable benchmark report.""" print("\n" + "=" * 80) print("BENCHMARK RESULTS") print("=" * 80) print(f"\nDuration: {self.duration:.2f}s") print(f"Total Requests: {self.total_requests}") print(f"Successful: {self.successful_requests}") print(f"Failed: {self.failed_requests}") print(f"Error Rate: {self.error_rate:.2f}%") print(f"Requests/Second: {self.requests_per_second:.2f}") print("\n" + "-" * 80) print("LATENCY (seconds)") print("-" * 80) latency = self.latency_stats() print(f"Min: {latency['min']:.4f}s") print(f"Mean: {latency['mean']:.4f}s") print(f"Median: {latency['median']:.4f}s") print(f"P90: {latency['p90']:.4f}s") print(f"P95: {latency['p95']:.4f}s") print(f"P99: {latency['p99']:.4f}s") print(f"Max: {latency['max']:.4f}s") print("\n" + "-" * 80) print("OPERATION BREAKDOWN") print("-" * 80) print( f"{'Operation':<25} {'Count':>8} {'Errors':>8} {'Success':>9} {'P50':>10} {'P95':>10}" ) print("-" * 80) breakdown = self.operation_breakdown() for op_name, stats in sorted(breakdown.items()): print( f"{op_name:<25} {stats['count']:>8} {stats['errors']:>8} " f"{stats['success_rate']:>8.1f}% {stats['p50_latency']:>9.4f}s {stats['p95_latency']:>9.4f}s" ) print("=" * 80 + "\n") @asynccontextmanager async def create_mcp_session(url: str): """Create an MCP client session with proper cleanup.""" logger.info(f"Creating MCP client session for {url}") streamable_context = streamablehttp_client(url) session_context = None try: read_stream, write_stream, _ = await streamable_context.__aenter__() session_context = ClientSession(read_stream, write_stream) session = await session_context.__aenter__() await session.initialize() logger.info("MCP client session initialized") yield session finally: if session_context is not None: try: await session_context.__aexit__(None, None, None) except Exception as e: logger.debug(f"Error closing session: {e}") try: await streamable_context.__aexit__(None, None, None) except Exception as e: logger.debug(f"Error closing streamable context: {e}") async def wait_for_mcp_server(url: str, max_attempts: int = 10) -> bool: """Wait for MCP server to be ready.""" logger.info(f"Waiting for MCP server at {url}...") for attempt in range(1, max_attempts + 1): try: async with create_mcp_session(url) as session: # Try to get capabilities await session.read_resource("nc://capabilities") logger.info("MCP server is ready") return True except Exception as e: if attempt < max_attempts: logger.debug(f"Attempt {attempt}/{max_attempts}: {e}") await anyio.sleep(2) else: logger.error(f"MCP server not ready after {max_attempts} attempts") return False return False async def benchmark_worker( worker_id: int, url: str, duration: float, metrics: BenchmarkMetrics, stop_event: anyio.Event, ): """Single worker that runs operations for the specified duration.""" logger.info(f"Worker {worker_id} starting...") try: async with create_mcp_session(url) as session: ops = WorkloadOperations(session) workload = MixedWorkload(ops) # Warmup await workload.warmup(count=5) # Run operations until duration expires or stop event is set start_time = time.time() operation_count = 0 while not stop_event.is_set(): if time.time() - start_time >= duration: break result = await workload.run_operation() metrics.add_result(result) operation_count += 1 # Small delay to prevent overwhelming the server await anyio.sleep(0.01) # Cleanup await ops.cleanup() logger.info(f"Worker {worker_id} completed {operation_count} operations") except Exception as e: logger.error(f"Worker {worker_id} error: {e}", exc_info=True) async def run_benchmark( url: str, concurrency: int, duration: float, warmup: float = 5.0, ) -> BenchmarkMetrics: """Run the benchmark with specified parameters.""" metrics = BenchmarkMetrics() stop_event = anyio.Event() # Setup signal handlers for graceful shutdown def signal_handler(sig, frame): logger.warning("Received interrupt signal, stopping benchmark...") stop_event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print( f"\nStarting benchmark with {concurrency} concurrent workers for {duration}s..." ) print(f"Target: {url}") print(f"Warmup period: {warmup}s\n") # Warmup period if warmup > 0: print("Warming up...") await anyio.sleep(warmup) # Start metrics collection metrics.start() # Create and run workers using anyio task groups async with anyio.create_task_group() as tg: # Start all workers for i in range(concurrency): tg.start_soon(benchmark_worker, i, url, duration, metrics, stop_event) # Show progress tg.start_soon(show_progress, duration, metrics, stop_event) # Stop metrics (tasks already completed when task group exits) metrics.stop() return metrics async def show_progress( duration: float, metrics: BenchmarkMetrics, stop_event: anyio.Event, ): """Show real-time progress during benchmark.""" start_time = time.time() while not stop_event.is_set(): elapsed = time.time() - start_time if elapsed >= duration: break # Calculate progress progress = min(elapsed / duration * 100, 100) rps = metrics.total_requests / max(elapsed, 0.1) # Print progress bar bar_length = 40 filled = int(bar_length * progress / 100) bar = "█" * filled + "░" * (bar_length - filled) print( f"\r[{bar}] {progress:5.1f}% | " f"Requests: {metrics.total_requests:6d} | " f"RPS: {rps:6.1f} | " f"Errors: {metrics.failed_requests:4d}", end="", flush=True, ) await anyio.sleep(0.5) print() # New line after progress @click.command() @click.option( "--concurrency", "-c", type=int, default=10, show_default=True, help="Number of concurrent workers", ) @click.option( "--duration", "-d", type=float, default=30.0, show_default=True, help="Test duration in seconds", ) @click.option( "--warmup", "-w", type=float, default=5.0, show_default=True, help="Warmup duration before collecting metrics (seconds)", ) @click.option( "--url", "-u", default="http://localhost:8000/mcp", show_default=True, help="MCP server URL", ) @click.option( "--output", "-o", type=click.Path(), help="Output file for JSON results (optional)", ) @click.option( "--wait-for-server/--no-wait", default=True, show_default=True, help="Wait for MCP server to be ready before starting", ) @click.option( "--verbose", "-v", is_flag=True, help="Enable verbose logging", ) def main( concurrency: int, duration: float, warmup: float, url: str, output: str | None, wait_for_server: bool, verbose: bool, ): """ Load testing benchmark for Nextcloud MCP Server. Runs a mixed workload of realistic MCP operations against the server and reports detailed performance metrics. Examples: # Quick 30-second test with 10 workers uv run python -m tests.load.benchmark --concurrency 10 --duration 30 # Extended test with 50 workers for 5 minutes uv run python -m tests.load.benchmark -c 50 -d 300 # Export results to JSON uv run python -m tests.load.benchmark -c 20 -d 60 --output results.json # Test OAuth server on port 8001 uv run python -m tests.load.benchmark --url http://localhost:8001/mcp """ if verbose: logging.getLogger().setLevel(logging.DEBUG) logging.getLogger("tests.load").setLevel(logging.DEBUG) async def run(): # Wait for server if requested if wait_for_server: if not await wait_for_mcp_server(url): print("ERROR: MCP server is not ready", file=sys.stderr) sys.exit(1) # Run benchmark metrics = await run_benchmark(url, concurrency, duration, warmup) # Print report metrics.print_report() # Export to JSON if requested if output: with open(output, "w") as f: json.dump(metrics.to_dict(), f, indent=2) print(f"Results exported to: {output}") try: anyio.run(run) except KeyboardInterrupt: print("\nBenchmark interrupted by user") sys.exit(130) except Exception as e: print(f"ERROR: {e}", file=sys.stderr) if verbose: raise sys.exit(1) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbcoutinho/nextcloud-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server