# -*- coding: utf-8 -*-
"""Locust load testing scenarios for MCP Gateway.
This module provides comprehensive load testing for MCP Gateway using Locust.
It includes multiple user types simulating different usage patterns.
Usage:
# Web UI mode (interactive)
make load-test-ui
# Headless mode (CI/scripts)
make load-test
# Direct invocation
cd tests/loadtest && locust --host=http://localhost:8080
Environment Variables (also reads from .env file):
LOADTEST_HOST: Target host URL (default: http://localhost:8080)
LOADTEST_USERS: Number of concurrent users (default: 1000)
LOADTEST_SPAWN_RATE: Users spawned per second (default: 100)
LOADTEST_RUN_TIME: Test duration, e.g., "60s", "5m" (default: 5m)
LOADTEST_JWT_EXPIRY_HOURS: JWT token expiry in hours (default: 8760 = 1 year)
MCPGATEWAY_BEARER_TOKEN: JWT token for authenticated requests
BASIC_AUTH_USER: Basic auth username (default: admin)
BASIC_AUTH_PASSWORD: Basic auth password (default: changeme)
JWT_SECRET_KEY: Secret key for JWT signing
JWT_ALGORITHM: JWT algorithm (default: HS256)
JWT_AUDIENCE: JWT audience claim
JWT_ISSUER: JWT issuer claim
LOADTEST_BENCHMARK_START_PORT: First port for benchmark servers (default: 9000)
LOADTEST_BENCHMARK_SERVER_COUNT: Number of benchmark servers available (default: 1000)
LOADTEST_BENCHMARK_HOST: Host where benchmark servers run (default: benchmark_server for Docker, use localhost for native)
Copyright 2025
SPDX-License-Identifier: Apache-2.0
"""
# Standard
import logging
import os
from pathlib import Path
import random
import time
from typing import Any
import uuid
# Third-Party
from locust import between, constant_throughput, events, tag, task
from locust.contrib.fasthttp import FastHttpUser
from locust.runners import MasterRunner, WorkerRunner
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration - Load from .env file and environment variables
# =============================================================================
def _load_env_file() -> dict[str, str]:
"""Load environment variables from .env file.
Searches for .env file in current directory and parent directories.
Returns a dict of key-value pairs from the .env file.
"""
env_vars: dict[str, str] = {}
# Search for .env file
search_paths = [
Path.cwd() / ".env",
Path.cwd().parent / ".env",
Path.cwd().parent.parent / ".env",
Path(__file__).parent.parent.parent / ".env", # Project root
]
env_file = None
for path in search_paths:
if path.exists():
env_file = path
break
if env_file is None:
logger.info("No .env file found, using environment variables only")
return env_vars
logger.info(f"Loading configuration from {env_file}")
try:
with open(env_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Handle key=value pairs
if "=" in line:
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
# Remove quotes if present
if value and value[0] in ('"', "'") and value[-1] == value[0]:
value = value[1:-1]
env_vars[key] = value
except Exception as e:
logger.warning(f"Error reading .env file: {e}")
return env_vars
def _get_config(key: str, default: str = "") -> str:
"""Get configuration value from environment or .env file.
Priority: Environment variable > .env file > default
"""
# First check environment variable
env_value = os.environ.get(key)
if env_value is not None:
return env_value
# Then check .env file
if key in _ENV_FILE_VARS:
return _ENV_FILE_VARS[key]
return default
# Load .env file once at module import
_ENV_FILE_VARS = _load_env_file()
# Authentication settings (from env or .env file)
BEARER_TOKEN = _get_config("MCPGATEWAY_BEARER_TOKEN", "")
BASIC_AUTH_USER = _get_config("BASIC_AUTH_USER", "admin")
BASIC_AUTH_PASSWORD = _get_config("BASIC_AUTH_PASSWORD", "changeme")
# JWT settings for auto-generation (if MCPGATEWAY_BEARER_TOKEN not set)
JWT_SECRET_KEY = _get_config("JWT_SECRET_KEY", "my-test-key")
JWT_ALGORITHM = _get_config("JWT_ALGORITHM", "HS256")
JWT_AUDIENCE = _get_config("JWT_AUDIENCE", "mcpgateway-api")
JWT_ISSUER = _get_config("JWT_ISSUER", "mcpgateway")
# Default to platform admin email for guaranteed authentication
# This matches the PLATFORM_ADMIN_EMAIL default in .env.example
JWT_USERNAME = _get_config("JWT_USERNAME", _get_config("PLATFORM_ADMIN_EMAIL", "admin@example.com"))
# Token expiry in hours - default 8760 (1 year) to avoid expiration during long load tests
# JTI (JWT ID) is automatically generated for each token for proper cache keying
JWT_TOKEN_EXPIRY_HOURS = int(_get_config("LOADTEST_JWT_EXPIRY_HOURS", "8760"))
# Log loaded configuration (masking sensitive values)
logger.info("Configuration loaded:")
logger.info(f" BASIC_AUTH_USER: {BASIC_AUTH_USER}")
logger.info(f" JWT_ALGORITHM: {JWT_ALGORITHM}")
logger.info(f" JWT_AUDIENCE: {JWT_AUDIENCE}")
logger.info(f" JWT_ISSUER: {JWT_ISSUER}")
logger.info(f" JWT_SECRET_KEY: {'*' * len(JWT_SECRET_KEY) if JWT_SECRET_KEY else '(not set)'}")
logger.info(f" JWT_TOKEN_EXPIRY_HOURS: {JWT_TOKEN_EXPIRY_HOURS}")
# Test data pools (populated during test setup)
# IDs for REST API calls (GET /tools/{id}, etc.)
TOOL_IDS: list[str] = []
SERVER_IDS: list[str] = []
GATEWAY_IDS: list[str] = []
RESOURCE_IDS: list[str] = []
PROMPT_IDS: list[str] = []
# Names/URIs for RPC calls (tools/call uses name, resources/read uses uri, etc.)
TOOL_NAMES: list[str] = []
RESOURCE_URIS: list[str] = []
PROMPT_NAMES: list[str] = []
# Tools that require arguments and are tested with proper arguments in specific user classes
# These should be excluded from generic rpc_call_tool to avoid false failures
TOOLS_WITH_REQUIRED_ARGS: set[str] = {
"fast-time-convert-time", # Requires: time, source_timezone, target_timezone
"fast-time-get-system-time", # Requires: timezone
"fast-test-echo", # Requires: message
"fast-test-get-system-time", # Requires: timezone
}
# Tool name prefixes that indicate virtual/dummy tools with no backing MCP server
# These are created during CRUD tests and will fail when called via RPC
VIRTUAL_TOOL_PREFIXES: tuple[str, ...] = (
"test-api-tool-", # Created by ToolsCRUDUser during load tests
"loadtest-tool-", # Created by other load test scenarios
)
# =============================================================================
# Event Handlers
# =============================================================================
@events.init.add_listener
def on_locust_init(environment, **_kwargs): # pylint: disable=unused-argument
"""Initialize test environment."""
if isinstance(environment.runner, MasterRunner):
logger.info("Running as master node")
elif isinstance(environment.runner, WorkerRunner):
logger.info("Running as worker node")
else:
logger.info("Running in standalone mode")
_log_auth_mode()
def _fetch_json(url: str, headers: dict[str, str], timeout: float = 30.0) -> tuple[int, Any]:
"""Fetch JSON from URL using urllib (gevent-safe, no threading issues with Python 3.13).
Args:
url: Full URL to fetch
headers: HTTP headers to include
timeout: Request timeout in seconds
Returns:
Tuple of (status_code, json_data or None)
"""
# Standard
import json # pylint: disable=import-outside-toplevel
import urllib.error # pylint: disable=import-outside-toplevel
import urllib.request # pylint: disable=import-outside-toplevel
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read().decode("utf-8"))
return (resp.status, data)
except urllib.error.HTTPError as e:
return (e.code, None)
except Exception:
return (0, None)
@events.test_start.add_listener
def on_test_start(environment, **_kwargs): # pylint: disable=unused-argument
"""Fetch existing entity IDs for use in tests.
Uses urllib.request instead of httpx to avoid Python 3.13/gevent threading conflicts.
httpx creates threads that trigger '_DummyThread' object has no attribute '_handle' errors.
"""
logger.info("Test starting - fetching entity IDs...")
host = environment.host or "http://localhost:8080"
headers = _get_auth_headers()
try:
# Fetch tools
# API returns {"tools": [...], "nextCursor": ...} or list for legacy
status, data = _fetch_json(f"{host}/tools", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("tools", data.get("items", []))
TOOL_IDS.extend([str(t.get("id")) for t in items[:50] if t.get("id")])
TOOL_NAMES.extend([str(t.get("name")) for t in items[:50] if t.get("name")])
logger.info(f"Loaded {len(TOOL_IDS)} tool IDs, {len(TOOL_NAMES)} tool names")
# Fetch servers
# API returns {"servers": [...], "nextCursor": ...} or list for legacy
status, data = _fetch_json(f"{host}/servers", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("servers", data.get("items", []))
SERVER_IDS.extend([str(s.get("id")) for s in items[:50] if s.get("id")])
logger.info(f"Loaded {len(SERVER_IDS)} server IDs")
# Fetch gateways
# API returns {"gateways": [...], "nextCursor": ...} or list for legacy
status, data = _fetch_json(f"{host}/gateways", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("gateways", data.get("items", []))
GATEWAY_IDS.extend([str(g.get("id")) for g in items[:50] if g.get("id")])
logger.info(f"Loaded {len(GATEWAY_IDS)} gateway IDs")
# Fetch resources
# API returns {"resources": [...], "nextCursor": ...} or list for legacy
status, data = _fetch_json(f"{host}/resources", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("resources", data.get("items", []))
RESOURCE_IDS.extend([str(r.get("id")) for r in items[:50] if r.get("id")])
RESOURCE_URIS.extend([str(r.get("uri")) for r in items[:50] if r.get("uri")])
logger.info(f"Loaded {len(RESOURCE_IDS)} resource IDs, {len(RESOURCE_URIS)} resource URIs")
# Fetch prompts
# API returns {"prompts": [...], "nextCursor": ...} or list for legacy
status, data = _fetch_json(f"{host}/prompts", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("prompts", data.get("items", []))
PROMPT_IDS.extend([str(p.get("id")) for p in items[:50] if p.get("id")])
PROMPT_NAMES.extend([str(p.get("name")) for p in items[:50] if p.get("name")])
logger.info(f"Loaded {len(PROMPT_IDS)} prompt IDs, {len(PROMPT_NAMES)} prompt names")
except Exception as e:
logger.warning(f"Failed to fetch entity IDs: {e}")
logger.info("Tests will continue without pre-fetched IDs")
# Note: All gateways (fast-time, fast-test, benchmark) are registered
# at compose startup via dedicated registration services.
# Locust only performs load testing, not registration.
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs): # pylint: disable=unused-argument
"""Clean up after test and print summary statistics."""
logger.info("Test stopped")
TOOL_IDS.clear()
SERVER_IDS.clear()
GATEWAY_IDS.clear()
RESOURCE_IDS.clear()
PROMPT_IDS.clear()
TOOL_NAMES.clear()
RESOURCE_URIS.clear()
PROMPT_NAMES.clear()
# Print detailed summary statistics
_print_summary_stats(environment)
def _print_summary_stats(environment) -> None:
"""Print detailed summary statistics after test completion."""
stats = environment.stats
if not stats.entries:
logger.info("No statistics recorded")
return
print("\n" + "=" * 100)
print("LOAD TEST SUMMARY")
print("=" * 100)
# Overall totals
total_requests = stats.total.num_requests
total_failures = stats.total.num_failures
total_rps = stats.total.total_rps
failure_rate = (total_failures / total_requests * 100) if total_requests > 0 else 0
print(f"\n{'OVERALL METRICS':^100}")
print("-" * 100)
print(f" Total Requests: {total_requests:,}")
print(f" Total Failures: {total_failures:,} ({failure_rate:.2f}%)")
print(f" Requests/sec (RPS): {total_rps:.2f}")
if stats.total.num_requests > 0:
print("\n Response Times (ms):")
print(f" Average: {stats.total.avg_response_time:.2f}")
print(f" Min: {stats.total.min_response_time:.2f}")
print(f" Max: {stats.total.max_response_time:.2f}")
print(f" Median (p50): {stats.total.get_response_time_percentile(0.50):.2f}")
print(f" p90: {stats.total.get_response_time_percentile(0.90):.2f}")
print(f" p95: {stats.total.get_response_time_percentile(0.95):.2f}")
print(f" p99: {stats.total.get_response_time_percentile(0.99):.2f}")
# Per-endpoint breakdown (top 15 by request count)
print(f"\n{'ENDPOINT BREAKDOWN (Top 15 by request count)':^100}")
print("-" * 100)
print(f"{'Endpoint':<40} {'Reqs':>8} {'Fails':>7} {'Avg':>8} {'Min':>8} {'Max':>8} {'p95':>8} {'RPS':>8}")
print("-" * 100)
# Sort by request count, get top 15
sorted_entries = sorted(stats.entries.values(), key=lambda x: x.num_requests, reverse=True)[:15]
for entry in sorted_entries:
name = entry.name[:38] + ".." if len(entry.name) > 40 else entry.name
reqs = entry.num_requests
fails = entry.num_failures
avg = entry.avg_response_time if reqs > 0 else 0
min_rt = entry.min_response_time if reqs > 0 else 0
max_rt = entry.max_response_time if reqs > 0 else 0
p95 = entry.get_response_time_percentile(0.95) if reqs > 0 else 0
rps = entry.total_rps
print(f"{name:<40} {reqs:>8,} {fails:>7,} {avg:>8.1f} {min_rt:>8.1f} {max_rt:>8.1f} {p95:>8.1f} {rps:>8.2f}")
# Slowest endpoints (by average response time)
slow_entries = sorted(
[e for e in stats.entries.values() if e.num_requests >= 10],
key=lambda x: x.avg_response_time,
reverse=True,
)[:5]
if slow_entries:
print(f"\n{'SLOWEST ENDPOINTS (min 10 requests)':^100}")
print("-" * 100)
print(f"{'Endpoint':<50} {'Avg (ms)':>12} {'p95 (ms)':>12} {'Requests':>12}")
print("-" * 100)
for entry in slow_entries:
name = entry.name[:48] + ".." if len(entry.name) > 50 else entry.name
print(f"{name:<50} {entry.avg_response_time:>12.2f} {entry.get_response_time_percentile(0.95):>12.2f} {entry.num_requests:>12,}")
# Error summary
if stats.errors:
print(f"\n{'ERRORS':^100}")
print("-" * 100)
for _error_key, error in list(stats.errors.items())[:10]:
print(f" [{error.occurrences}x] {error.method} {error.name}: {str(error.error)[:80]}")
print("\n" + "=" * 100)
print("END OF SUMMARY")
print("=" * 100 + "\n")
# =============================================================================
# Helper Functions
# =============================================================================
def _generate_jwt_token() -> str:
"""Generate a JWT token for API authentication.
Uses PyJWT to create a token with the configured secret and algorithm.
Reads JWT settings from .env file or environment variables.
The token includes:
- sub: User email (JWT_USERNAME)
- exp: Expiration time (configurable via LOADTEST_JWT_EXPIRY_HOURS, default 1 year)
- iat: Issued at time
- aud: Audience (JWT_AUDIENCE)
- iss: Issuer (JWT_ISSUER)
- jti: JWT ID - unique identifier for cache keying and token revocation
"""
try:
# Standard
from datetime import datetime, timedelta, timezone # pylint: disable=import-outside-toplevel
# Third-Party
import jwt # pylint: disable=import-outside-toplevel
jti = str(uuid.uuid4())
payload = {
"sub": JWT_USERNAME,
"exp": datetime.now(timezone.utc) + timedelta(hours=JWT_TOKEN_EXPIRY_HOURS),
"iat": datetime.now(timezone.utc),
"aud": JWT_AUDIENCE,
"iss": JWT_ISSUER,
"jti": jti, # JWT ID for auth cache keying and token revocation support
}
token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
logger.info(f"Generated JWT token for user: {JWT_USERNAME} (aud={JWT_AUDIENCE}, iss={JWT_ISSUER}, jti={jti[:8]}..., expires_in={JWT_TOKEN_EXPIRY_HOURS}h)")
return token
except ImportError:
logger.warning("PyJWT not installed, falling back to basic auth. Install with: pip install pyjwt")
return ""
except Exception as e:
logger.warning(f"Failed to generate JWT token: {e}, falling back to basic auth")
return ""
# Cache the generated token
_CACHED_TOKEN: str | None = None
def _get_auth_headers() -> dict[str, str]:
"""Get authentication headers.
Priority:
1. MCPGATEWAY_BEARER_TOKEN env var (if set)
2. Auto-generated JWT token (if PyJWT available)
3. Basic auth fallback (for admin UI only)
"""
global _CACHED_TOKEN # pylint: disable=global-statement
headers = {"Accept": "application/json"}
if BEARER_TOKEN:
headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
else:
# Try to generate/use JWT token
if _CACHED_TOKEN is None:
_CACHED_TOKEN = _generate_jwt_token()
if _CACHED_TOKEN:
headers["Authorization"] = f"Bearer {_CACHED_TOKEN}"
else:
# Fallback to basic auth (works for admin UI but not REST API)
# Standard
import base64 # pylint: disable=import-outside-toplevel
credentials = base64.b64encode(f"{BASIC_AUTH_USER}:{BASIC_AUTH_PASSWORD}".encode()).decode()
headers["Authorization"] = f"Basic {credentials}"
logger.warning("Using basic auth - REST API endpoints may fail. Set MCPGATEWAY_BEARER_TOKEN or install PyJWT.")
return headers
def _log_auth_mode() -> None:
"""Log which authentication mode the load test will use."""
headers = _get_auth_headers()
auth_header = headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
if BEARER_TOKEN:
logger.info("Auth mode: Bearer (MCPGATEWAY_BEARER_TOKEN)")
else:
logger.info("Auth mode: Bearer (auto-generated JWT via PyJWT)")
elif auth_header.startswith("Basic "):
logger.warning("!!! WARNING !!! BASIC AUTH IN USE - /rpc calls will 401. Set MCPGATEWAY_BEARER_TOKEN or install PyJWT.")
else:
logger.warning("!!! WARNING !!! NO AUTH HEADER - /rpc calls will 401. Set MCPGATEWAY_BEARER_TOKEN or install PyJWT.")
def _json_rpc_request(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Create a JSON-RPC 2.0 request."""
return {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
"params": params or {},
}
# =============================================================================
# User Classes
# =============================================================================
class BaseUser(FastHttpUser):
"""Base user class with common configuration.
Uses FastHttpUser (gevent-based) for maximum throughput.
Optimized for 4000+ concurrent users.
"""
abstract = True
wait_time = between(0.1, 0.5)
# Connection tuning for high concurrency
connection_timeout = 30.0
network_timeout = 30.0
def __init__(self, *args, **kwargs):
"""Initialize base user with auth headers."""
super().__init__(*args, **kwargs)
self.auth_headers: dict[str, str] = {}
self.admin_headers: dict[str, str] = {}
def on_start(self):
"""Set up authentication for the user."""
self.auth_headers = _get_auth_headers()
self.admin_headers = {
**self.auth_headers,
"Accept": "text/html",
}
def _validate_json_response(self, response, allowed_codes: list[int] | None = None):
"""Validate response is successful and contains valid JSON.
Args:
response: The response object from catch_response=True context
allowed_codes: List of acceptable status codes (default: [200])
"""
allowed = allowed_codes or [200]
if response.status_code not in allowed:
response.failure(f"Expected {allowed}, got {response.status_code}")
return False
try:
data = response.json()
if data is None:
response.failure("Response JSON is null")
return False
except Exception as e:
response.failure(f"Invalid JSON: {e}")
return False
response.success()
return True
def _validate_html_response(self, response, allowed_codes: list[int] | None = None):
"""Validate response is successful HTML.
Args:
response: The response object from catch_response=True context
allowed_codes: List of acceptable status codes (default: [200])
"""
allowed = allowed_codes or [200]
if response.status_code not in allowed:
response.failure(f"Expected {allowed}, got {response.status_code}")
return False
content_type = response.headers.get("content-type", "")
if "text/html" not in content_type:
response.failure(f"Expected HTML, got {content_type}")
return False
response.success()
return True
def _validate_status(self, response, allowed_codes: list[int] | None = None):
"""Validate response status code only.
Args:
response: The response object from catch_response=True context
allowed_codes: List of acceptable status codes (default: [200])
"""
allowed = allowed_codes or [200]
if response.status_code not in allowed:
response.failure(f"Expected {allowed}, got {response.status_code}")
return False
response.success()
return True
def _validate_jsonrpc_response(self, response, allowed_codes: list[int] | None = None):
"""Validate response is successful JSON-RPC (no error field).
JSON-RPC 2.0 errors are returned with HTTP 200 but contain an "error" field.
This method detects such errors and marks them as failures in Locust.
Args:
response: The response object from catch_response=True context
allowed_codes: List of acceptable status codes (default: [200])
Returns:
bool: True if response is valid JSON-RPC success, False otherwise
"""
allowed = allowed_codes or [200]
if response.status_code not in allowed:
response.failure(f"Expected {allowed}, got {response.status_code}")
return False
try:
data = response.json()
if data is None:
response.failure("Response JSON is null")
return False
# Check for JSON-RPC error field
if "error" in data:
error_obj = data["error"]
error_code = error_obj.get("code", "unknown")
error_msg = error_obj.get("message", "Unknown error")
error_data = str(error_obj.get("data", ""))[:100]
response.failure(f"JSON-RPC error [{error_code}]: {error_msg} - {error_data}")
return False
except Exception as e:
response.failure(f"Invalid JSON: {e}")
return False
response.success()
return True
class HealthCheckUser(BaseUser):
"""User that only performs health checks.
Simulates monitoring systems and health probes.
Weight: Low (monitoring traffic)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(10)
@tag("health", "critical")
def health_check(self):
"""Check the health endpoint (no auth required)."""
with self.client.get("/health", name="/health", catch_response=True) as response:
self._validate_status(response)
@task(5)
@tag("health")
def readiness_check(self):
"""Check readiness endpoint (no auth required)."""
with self.client.get("/ready", name="/ready", catch_response=True) as response:
self._validate_status(response)
@task(2)
@tag("health")
def metrics_endpoint(self):
"""Check Prometheus metrics endpoint."""
with self.client.get("/metrics", headers=self.auth_headers, name="/metrics", catch_response=True) as response:
self._validate_status(response)
@task(1)
@tag("health")
def openapi_schema(self):
"""Fetch OpenAPI schema."""
with self.client.get("/openapi.json", headers=self.auth_headers, name="/openapi.json", catch_response=True) as response:
self._validate_json_response(response)
class ReadOnlyAPIUser(BaseUser):
"""User that performs read-only API operations.
Simulates API consumers reading data without modifications.
Weight: High (most common usage pattern)
"""
weight = 5
wait_time = between(0.3, 1.5)
@task(10)
@tag("api", "tools")
def list_tools(self):
"""List all tools."""
with self.client.get("/tools", headers=self.auth_headers, name="/tools", catch_response=True) as response:
self._validate_json_response(response)
@task(8)
@tag("api", "servers")
def list_servers(self):
"""List all servers."""
with self.client.get("/servers", headers=self.auth_headers, name="/servers", catch_response=True) as response:
self._validate_json_response(response)
@task(6)
@tag("api", "gateways")
def list_gateways(self):
"""List all gateways."""
with self.client.get("/gateways", headers=self.auth_headers, name="/gateways", catch_response=True) as response:
self._validate_json_response(response)
@task(5)
@tag("api", "resources")
def list_resources(self):
"""List all resources."""
with self.client.get("/resources", headers=self.auth_headers, name="/resources", catch_response=True) as response:
self._validate_json_response(response)
@task(5)
@tag("api", "prompts")
def list_prompts(self):
"""List all prompts."""
with self.client.get("/prompts", headers=self.auth_headers, name="/prompts", catch_response=True) as response:
self._validate_json_response(response)
@task(4)
@tag("api", "a2a")
def list_a2a_agents(self):
"""List A2A agents."""
with self.client.get("/a2a", headers=self.auth_headers, name="/a2a", catch_response=True) as response:
self._validate_json_response(response)
@task(3)
@tag("api", "tags")
def list_tags(self):
"""List all tags."""
with self.client.get("/tags", headers=self.auth_headers, name="/tags", catch_response=True) as response:
self._validate_json_response(response)
@task(2)
@tag("api", "metrics")
def get_metrics(self):
"""Get application metrics."""
with self.client.get("/metrics", headers=self.auth_headers, name="/metrics [api]", catch_response=True) as response:
self._validate_status(response)
@task(3)
@tag("api", "tools")
def get_single_tool(self):
"""Get a specific tool by ID."""
if TOOL_IDS:
tool_id = random.choice(TOOL_IDS)
with self.client.get(
f"/tools/{tool_id}",
headers=self.auth_headers,
name="/tools/[id]",
catch_response=True,
) as response:
# 200=Success, 404=Not found (acceptable)
self._validate_json_response(response, allowed_codes=[200, 404])
@task(3)
@tag("api", "servers")
def get_single_server(self):
"""Get a specific server by ID."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.get(
f"/servers/{server_id}",
headers=self.auth_headers,
name="/servers/[id]",
catch_response=True,
) as response:
# 200=Success, 404=Not found (acceptable)
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("api", "gateways")
def get_single_gateway(self):
"""Get a specific gateway by ID."""
if GATEWAY_IDS:
gateway_id = random.choice(GATEWAY_IDS)
with self.client.get(
f"/gateways/{gateway_id}",
headers=self.auth_headers,
name="/gateways/[id]",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("api", "roots")
def list_roots(self):
"""List roots."""
with self.client.get(
"/roots",
headers=self.auth_headers,
name="/roots",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(2)
@tag("api", "resources")
def get_single_resource(self):
"""Get a specific resource by ID."""
if RESOURCE_IDS:
resource_id = random.choice(RESOURCE_IDS)
with self.client.get(
f"/resources/{resource_id}",
headers=self.auth_headers,
name="/resources/[id]",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden (read-only), 404=Not found
self._validate_json_response(response, allowed_codes=[200, 403, 404])
@task(2)
@tag("api", "prompts")
def get_single_prompt(self):
"""Get a specific prompt by ID."""
if PROMPT_IDS:
prompt_id = random.choice(PROMPT_IDS)
with self.client.get(
f"/prompts/{prompt_id}",
headers=self.auth_headers,
name="/prompts/[id]",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden (read-only), 404=Not found
self._validate_json_response(response, allowed_codes=[200, 403, 404])
@task(2)
@tag("api", "servers")
def get_server_tools(self):
"""Get tools for a specific server."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.get(f"/servers/{server_id}/tools", headers=self.auth_headers, name="/servers/[id]/tools", catch_response=True) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("api", "servers")
def get_server_resources(self):
"""Get resources for a specific server."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.get(f"/servers/{server_id}/resources", headers=self.auth_headers, name="/servers/[id]/resources", catch_response=True) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(1)
@tag("api", "discovery")
def well_known_robots(self):
"""Check robots.txt (always available)."""
with self.client.get(
"/.well-known/robots.txt",
headers=self.auth_headers,
name="/.well-known/robots.txt",
catch_response=True,
) as response:
# 200=Success, 404=Not configured
self._validate_status(response, allowed_codes=[200, 404])
@task(1)
@tag("api", "discovery")
def well_known_security(self):
"""Check security.txt."""
with self.client.get(
"/.well-known/security.txt",
headers=self.auth_headers,
name="/.well-known/security.txt",
catch_response=True,
) as response:
# 200=Success, 404=Not configured
self._validate_status(response, allowed_codes=[200, 404])
class AdminUIUser(BaseUser):
"""User that browses the Admin UI.
Simulates administrators using the web interface.
Weight: Medium (admin traffic)
"""
weight = 3
wait_time = between(1.0, 3.0)
@task(10)
@tag("admin", "dashboard")
def admin_dashboard(self):
"""Load admin dashboard."""
with self.client.get("/admin/", headers=self.admin_headers, name="/admin/", catch_response=True) as response:
self._validate_html_response(response)
@task(8)
@tag("admin", "tools")
def admin_tools_page(self):
"""Load tools list (JSON API)."""
with self.client.get("/admin/tools", headers=self.admin_headers, name="/admin/tools", catch_response=True) as response:
self._validate_json_response(response)
@task(7)
@tag("admin", "servers")
def admin_servers_page(self):
"""Load servers list (JSON API)."""
with self.client.get("/admin/servers", headers=self.admin_headers, name="/admin/servers", catch_response=True) as response:
self._validate_json_response(response)
@task(6)
@tag("admin", "gateways")
def admin_gateways_page(self):
"""Load gateways list (JSON API)."""
with self.client.get("/admin/gateways", headers=self.admin_headers, name="/admin/gateways", catch_response=True) as response:
self._validate_json_response(response)
@task(5)
@tag("admin", "resources")
def admin_resources_page(self):
"""Load resources list (JSON API)."""
with self.client.get("/admin/resources", headers=self.admin_headers, name="/admin/resources", catch_response=True) as response:
self._validate_json_response(response)
@task(5)
@tag("admin", "prompts")
def admin_prompts_page(self):
"""Load prompts list (JSON API)."""
with self.client.get("/admin/prompts", headers=self.admin_headers, name="/admin/prompts", catch_response=True) as response:
self._validate_json_response(response)
@task(4)
@tag("admin", "a2a")
def admin_a2a_list(self):
"""Load A2A agents list (JSON API)."""
with self.client.get("/admin/a2a", headers=self.auth_headers, name="/admin/a2a", catch_response=True) as response:
self._validate_json_response(response)
@task(3)
@tag("admin", "performance")
def admin_performance(self):
"""Load performance stats (if enabled)."""
with self.client.get(
"/admin/performance/stats",
headers={**self.admin_headers, "HX-Request": "true"},
name="/admin/performance/stats",
catch_response=True,
) as response:
# 404 is acceptable if performance tracking is disabled
self._validate_status(response, allowed_codes=[200, 404])
@task(2)
@tag("admin", "logs")
def admin_logs(self):
"""Load logs (JSON API)."""
with self.client.get("/admin/logs", headers=self.auth_headers, name="/admin/logs", catch_response=True) as response:
self._validate_json_response(response)
@task(2)
@tag("admin", "config")
def admin_config_settings(self):
"""Load config settings (JSON API)."""
with self.client.get("/admin/config/settings", headers=self.auth_headers, name="/admin/config/settings", catch_response=True) as response:
self._validate_json_response(response)
@task(2)
@tag("admin", "metrics")
def admin_metrics(self):
"""Load metrics (JSON API)."""
with self.client.get("/admin/metrics", headers=self.admin_headers, name="/admin/metrics", catch_response=True) as response:
self._validate_json_response(response)
@task(2)
@tag("admin", "teams")
def admin_teams(self):
"""Load teams management page."""
with self.client.get("/admin/teams", headers=self.admin_headers, name="/admin/teams", catch_response=True) as response:
self._validate_html_response(response)
@task(2)
@tag("admin", "users")
def admin_users(self):
"""Load users management page."""
headers = {**self.admin_headers, "HX-Request": "true"}
with self.client.get("/admin/users/partial", headers=headers, name="/admin/users/partial", catch_response=True) as response:
self._validate_html_response(response)
@task(1)
@tag("admin", "export")
def admin_export_config(self):
"""Load export configuration (JSON API)."""
with self.client.get("/admin/export/configuration", headers=self.admin_headers, name="/admin/export/configuration", catch_response=True) as response:
self._validate_json_response(response)
@task(1)
@tag("admin", "htmx", "tools")
def admin_tools_partial(self):
"""Fetch tools partial via HTMX."""
headers = {**self.admin_headers, "HX-Request": "true"}
with self.client.get("/admin/tools/partial", headers=headers, name="/admin/tools/partial", catch_response=True) as response:
self._validate_html_response(response)
@task(1)
@tag("admin", "htmx", "resources")
def admin_resources_partial(self):
"""Fetch resources partial via HTMX."""
headers = {**self.admin_headers, "HX-Request": "true"}
with self.client.get("/admin/resources/partial", headers=headers, name="/admin/resources/partial", catch_response=True) as response:
self._validate_html_response(response)
@task(1)
@tag("admin", "htmx", "prompts")
def admin_prompts_partial(self):
"""Fetch prompts partial via HTMX."""
headers = {**self.admin_headers, "HX-Request": "true"}
with self.client.get("/admin/prompts/partial", headers=headers, name="/admin/prompts/partial", catch_response=True) as response:
self._validate_html_response(response)
@task(1)
@tag("admin", "htmx", "metrics")
def admin_metrics_partial(self):
"""Fetch metrics partial via HTMX."""
headers = {**self.admin_headers, "HX-Request": "true"}
with self.client.get("/admin/metrics/partial", headers=headers, name="/admin/metrics/partial", catch_response=True) as response:
self._validate_html_response(response)
@task(1)
@tag("admin", "htmx")
def admin_htmx_refresh(self):
"""Simulate HTMX partial refresh."""
headers = {**self.admin_headers, "HX-Request": "true"}
endpoint = random.choice(["/admin/tools/partial", "/admin/resources/partial", "/admin/prompts/partial"])
with self.client.get(endpoint, headers=headers, name=f"{endpoint} [htmx]", catch_response=True) as response:
self._validate_html_response(response)
class MCPJsonRpcUser(BaseUser):
"""User that makes MCP JSON-RPC requests.
Simulates MCP clients (Claude Desktop, etc.) making protocol requests.
Weight: High (core MCP traffic)
"""
weight = 4
wait_time = between(0.2, 1.0)
def _rpc_request(self, payload: dict, name: str):
"""Make an RPC request with proper error handling.
Uses JSON-RPC validation to detect errors returned with HTTP 200.
"""
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name=name,
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
@task(10)
@tag("mcp", "rpc", "tools")
def rpc_list_tools(self):
"""JSON-RPC: List tools."""
payload = _json_rpc_request("tools/list")
self._rpc_request(payload, "/rpc tools/list")
@task(8)
@tag("mcp", "rpc", "resources")
def rpc_list_resources(self):
"""JSON-RPC: List resources."""
payload = _json_rpc_request("resources/list")
self._rpc_request(payload, "/rpc resources/list")
@task(8)
@tag("mcp", "rpc", "prompts")
def rpc_list_prompts(self):
"""JSON-RPC: List prompts."""
payload = _json_rpc_request("prompts/list")
self._rpc_request(payload, "/rpc prompts/list")
@task(5)
@tag("mcp", "rpc", "tools")
def rpc_call_tool(self):
"""JSON-RPC: Call a tool with empty arguments.
Note: Tools that require arguments are excluded here and tested
separately in dedicated user classes (e.g., FastTimeUser) with proper arguments.
Virtual tools (test-api-tool-*, loadtest-tool-*) are also excluded as they
have no backing MCP server.
"""
# Filter out tools that require arguments or are virtual (no MCP server)
callable_tools = [
t for t in TOOL_NAMES
if t not in TOOLS_WITH_REQUIRED_ARGS
and not any(t.startswith(prefix) for prefix in VIRTUAL_TOOL_PREFIXES)
]
if callable_tools:
tool_name = random.choice(callable_tools)
payload = _json_rpc_request("tools/call", {"name": tool_name, "arguments": {}})
self._rpc_request(payload, "/rpc tools/call")
@task(4)
@tag("mcp", "rpc", "resources")
def rpc_read_resource(self):
"""JSON-RPC: Read a resource."""
if RESOURCE_URIS:
resource_uri = random.choice(RESOURCE_URIS)
payload = _json_rpc_request("resources/read", {"uri": resource_uri})
self._rpc_request(payload, "/rpc resources/read")
@task(4)
@tag("mcp", "rpc", "prompts")
def rpc_get_prompt(self):
"""JSON-RPC: Get a prompt."""
if PROMPT_NAMES:
prompt_name = random.choice(PROMPT_NAMES)
payload = _json_rpc_request("prompts/get", {"name": prompt_name})
self._rpc_request(payload, "/rpc prompts/get")
@task(3)
@tag("mcp", "rpc", "initialize")
def rpc_initialize(self):
"""JSON-RPC: Initialize session."""
payload = _json_rpc_request(
"initialize",
{
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"clientInfo": {"name": "locust-load-test", "version": "1.0.0"},
},
)
self._rpc_request(payload, "/rpc initialize")
@task(2)
@tag("mcp", "rpc", "ping")
def rpc_ping(self):
"""JSON-RPC: Ping."""
payload = _json_rpc_request("ping")
self._rpc_request(payload, "/rpc ping")
@task(3)
@tag("mcp", "rpc", "resources")
def rpc_list_resource_templates(self):
"""JSON-RPC: List resource templates."""
payload = _json_rpc_request("resources/templates/list")
self._rpc_request(payload, "/rpc resources/templates/list")
@task(2)
@tag("mcp", "protocol")
def protocol_initialize(self):
"""Protocol endpoint: Initialize."""
payload = {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}, "resources": {}, "prompts": {}},
"clientInfo": {"name": "locust-load-test", "version": "1.0.0"},
}
with self.client.post(
"/protocol/initialize",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/protocol/initialize",
catch_response=True,
) as response:
self._validate_status(response)
@task(2)
@tag("mcp", "protocol")
def protocol_ping(self):
"""Protocol endpoint: Ping (JSON-RPC format)."""
payload = _json_rpc_request("ping")
with self.client.post(
"/protocol/ping",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/protocol/ping",
catch_response=True,
) as response:
self._validate_status(response)
class WriteAPIUser(BaseUser):
"""User that performs write operations.
Simulates administrators or automated systems creating/updating entities.
Weight: Low (writes are less common than reads)
"""
weight = 1
wait_time = between(2.0, 5.0)
def __init__(self, *args, **kwargs):
"""Initialize with tracking for cleanup."""
super().__init__(*args, **kwargs)
self.created_tools: list[str] = []
self.created_servers: list[str] = []
def on_stop(self):
"""Clean up created entities."""
# Clean up tools
for tool_id in self.created_tools:
try:
self.client.delete(f"/tools/{tool_id}", headers=self.auth_headers, name="/tools/[id] [cleanup]")
except Exception:
pass
# Clean up servers
for server_id in self.created_servers:
try:
self.client.delete(f"/servers/{server_id}", headers=self.auth_headers, name="/servers/[id] [cleanup]")
except Exception:
pass
@task(5)
@tag("api", "write", "tools")
def create_and_delete_tool(self):
"""Create a tool and then delete it."""
tool_name = f"loadtest-tool-{uuid.uuid4().hex[:8]}"
tool_data = {
"name": tool_name,
"description": "Load test tool - will be deleted",
"integration_type": "MCP",
"input_schema": {"type": "object", "properties": {"input": {"type": "string"}}},
}
# Create
with self.client.post(
"/tools",
json=tool_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/tools [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
tool_id = data.get("id") or data.get("name") or tool_name
# Delete immediately
time.sleep(0.1)
self.client.delete(f"/tools/{tool_id}", headers=self.auth_headers, name="/tools/[id] [delete]")
except Exception:
pass
elif response.status_code in (409, 422):
response.success() # Conflict or validation error is acceptable for load test
@task(3)
@tag("api", "write", "servers")
def create_and_delete_server(self):
"""Create a virtual server and then delete it."""
server_name = f"loadtest-server-{uuid.uuid4().hex[:8]}"
server_data = {
"name": server_name,
"description": "Load test virtual server - will be deleted",
}
# Create
with self.client.post(
"/servers",
json=server_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/servers [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
server_id = data.get("id") or data.get("name") or server_name
# Delete immediately
time.sleep(0.1)
self.client.delete(f"/servers/{server_id}", headers=self.auth_headers, name="/servers/[id] [delete]")
except Exception:
pass
elif response.status_code in (409, 422):
response.success() # Conflict or validation error is acceptable for load test
@task(2)
@tag("api", "write", "state")
def set_server_state(self):
"""Set a server's enabled state."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.post(
f"/servers/{server_id}/state",
headers=self.auth_headers,
name="/servers/[id]/state",
catch_response=True,
) as response:
# 403/404 acceptable - entity may not exist or may be read-only
# 409 acceptable - concurrent state changes due to optimistic locking
self._validate_json_response(response, allowed_codes=[200, 403, 404, 409])
@task(2)
@tag("api", "write", "state")
def set_tool_state(self):
"""Set a tool's enabled state."""
if TOOL_IDS:
tool_id = random.choice(TOOL_IDS)
with self.client.post(
f"/tools/{tool_id}/state",
headers=self.auth_headers,
name="/tools/[id]/state",
catch_response=True,
) as response:
# 403/404 acceptable - entity may not exist or may be read-only
# 409 acceptable - concurrent state changes due to optimistic locking
self._validate_json_response(response, allowed_codes=[200, 403, 404, 409])
@task(2)
@tag("api", "write", "state")
def set_resource_state(self):
"""Set a resource's enabled state."""
if RESOURCE_IDS:
resource_id = random.choice(RESOURCE_IDS)
with self.client.post(
f"/resources/{resource_id}/state",
headers=self.auth_headers,
name="/resources/[id]/state",
catch_response=True,
) as response:
# 403/404 acceptable - entity may not exist or may be read-only
# 409 acceptable - concurrent state changes due to optimistic locking
self._validate_json_response(response, allowed_codes=[200, 403, 404, 409])
@task(2)
@tag("api", "write", "state")
def set_prompt_state(self):
"""Set a prompt's enabled state."""
if PROMPT_IDS:
prompt_id = random.choice(PROMPT_IDS)
with self.client.post(
f"/prompts/{prompt_id}/state",
headers=self.auth_headers,
name="/prompts/[id]/state",
catch_response=True,
) as response:
# 403/404 acceptable - entity may not exist or may be read-only
# 409 acceptable - concurrent state changes due to optimistic locking
self._validate_json_response(response, allowed_codes=[200, 403, 404, 409])
@task(2)
@tag("api", "write", "state")
def set_gateway_state(self):
"""Set a gateway's enabled state."""
if GATEWAY_IDS:
gateway_id = random.choice(GATEWAY_IDS)
with self.client.post(
f"/gateways/{gateway_id}/state",
headers=self.auth_headers,
name="/gateways/[id]/state",
catch_response=True,
) as response:
# 403/404 acceptable - gateway may not exist or may be unreachable
# 409 acceptable - concurrent state changes due to optimistic locking
self._validate_json_response(response, allowed_codes=[200, 403, 404, 409])
@task(2)
@tag("api", "write", "resources")
def create_and_delete_resource(self):
"""Create a resource and then delete it."""
resource_hex = uuid.uuid4().hex[:8]
resource_uri = f"file:///tmp/loadtest-{resource_hex}.txt"
resource_data = {
"uri": resource_uri,
"name": f"loadtest-resource-{resource_hex}",
"description": "Load test resource - will be deleted",
"mime_type": "text/plain",
"content": "Load test resource content",
}
with self.client.post(
"/resources",
json=resource_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/resources [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
res_id = data.get("id") or data.get("uri") or resource_uri
time.sleep(0.1)
self.client.delete(f"/resources/{res_id}", headers=self.auth_headers, name="/resources/[id] [delete]")
except Exception:
pass
elif response.status_code in (409, 422):
response.success() # Conflict or validation error is acceptable for load test
@task(2)
@tag("api", "write", "prompts")
def create_and_delete_prompt(self):
"""Create a prompt and then delete it."""
prompt_name = f"loadtest-prompt-{uuid.uuid4().hex[:8]}"
prompt_data = {
"name": prompt_name,
"description": "Load test prompt - will be deleted",
"template": "This is a load test prompt template with input: {{input}}",
"arguments": [{"name": "input", "description": "Input text", "required": False}],
}
with self.client.post(
"/prompts",
json=prompt_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/prompts [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
prompt_id = data.get("id") or data.get("name") or prompt_name
time.sleep(0.1)
self.client.delete(f"/prompts/{prompt_id}", headers=self.auth_headers, name="/prompts/[id] [delete]")
except Exception:
pass
elif response.status_code in (409, 422):
response.success() # Conflict or validation error is acceptable for load test
@task(1)
@tag("api", "write", "gateways")
def read_and_refresh_gateway(self):
"""Read existing gateway and trigger a refresh."""
# First, get list of gateways
# API returns {"gateways": [...], "nextCursor": ...} or list for legacy
with self.client.get(
"/gateways",
headers=self.auth_headers,
name="/gateways [list for refresh]",
catch_response=True,
) as response:
if response.status_code != 200:
response.failure(f"Failed to list gateways: {response.status_code}")
return
try:
data = response.json()
# Extract gateways list from paginated response
gateways = data if isinstance(data, list) else data.get("gateways", data.get("items", []))
if not gateways:
response.success()
return
response.success()
except Exception as e:
response.failure(f"Invalid JSON: {e}")
return
# Pick a gateway and read its details
gateway = random.choice(gateways)
gateway_id = gateway.get("id")
if gateway_id:
with self.client.get(
f"/gateways/{gateway_id}",
headers=self.auth_headers,
name="/gateways/[id] [read]",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
class StressTestUser(BaseUser):
"""User for stress testing with predictable request rate.
Uses constant_throughput for predictable RPS instead of minimal wait times.
Weight: Very low (only for stress tests)
Target RPS calculation: rps_per_user = target_total_rps / num_users
Example: 8000 RPS target with 4000 users = constant_throughput(2)
"""
weight = 1
# 2 requests/second per user. With 4000 users = 8000 RPS theoretical max.
# Adjust based on server capacity. Start conservative and increase.
wait_time = constant_throughput(2)
@task(10)
@tag("stress", "health")
def rapid_health_check(self):
"""Rapid health checks."""
self.client.get("/health", name="/health [stress]")
@task(8)
@tag("stress", "api")
def rapid_tools_list(self):
"""Rapid tools listing."""
self.client.get("/tools", headers=self.auth_headers, name="/tools [stress]")
@task(5)
@tag("stress", "rpc")
def rapid_rpc_ping(self):
"""Rapid RPC pings."""
payload = _json_rpc_request("ping")
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/rpc ping [stress]",
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
class FastTimeUser(BaseUser):
"""User that calls the fast_time MCP server tools.
Tests the fast-time-get-system-time tool via JSON-RPC.
Weight: High (main MCP tool testing)
NOTE: These tests require the fast_time MCP server to be running.
502 errors are expected if no MCP server is connected.
"""
weight = 5
wait_time = between(0.1, 0.5)
def _rpc_request(self, payload: dict, name: str):
"""Make an RPC request with proper error handling.
Uses JSON-RPC validation to detect errors returned with HTTP 200.
"""
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name=name,
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
@task(10)
@tag("mcp", "fasttime", "tools")
def call_get_system_time(self):
"""Call fast-time-get-system-time with Europe/Dublin timezone."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-time-get-system-time",
"arguments": {"timezone": "Europe/Dublin"},
},
)
self._rpc_request(payload, "/rpc fast-time-get-system-time")
@task(5)
@tag("mcp", "fasttime", "tools")
def call_get_system_time_utc(self):
"""Call fast-time-get-system-time with UTC timezone."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-time-get-system-time",
"arguments": {"timezone": "UTC"},
},
)
self._rpc_request(payload, "/rpc fast-time-get-system-time [UTC]")
@task(3)
@tag("mcp", "fasttime", "tools")
def call_convert_time(self):
"""Call fast-time-convert-time to convert between timezones."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-time-convert-time",
"arguments": {
"time": "2025-01-01T12:00:00",
"source_timezone": "UTC",
"target_timezone": "Europe/Dublin",
},
},
)
self._rpc_request(payload, "/rpc fast-time-convert-time")
@task(2)
@tag("mcp", "fasttime", "list")
def list_tools(self):
"""List tools via JSON-RPC."""
payload = _json_rpc_request("tools/list")
self._rpc_request(payload, "/rpc tools/list [fasttime]")
class FastTestEchoUser(BaseUser):
"""User that calls the fast_test MCP server echo tool.
Tests the fast-test-echo tool via JSON-RPC.
Weight: Medium (echo testing)
NOTE: These tests require the fast_test MCP server to be running.
Start with: make testing-up
502 errors are expected if no MCP server is connected.
"""
weight = 3
wait_time = between(0.5, 1.5)
# Test messages for echo
ECHO_MESSAGES = [
"Hello, World!",
"Testing MCP protocol",
"Load test in progress",
"Performance benchmark",
"Echo echo echo",
"The quick brown fox jumps over the lazy dog",
"Lorem ipsum dolor sit amet",
"MCP Gateway load test message",
]
def _rpc_request(self, payload: dict, name: str):
"""Make an RPC request with proper error handling."""
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name=name,
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
@task(10)
@tag("mcp", "fasttest", "echo")
def call_echo(self):
"""Call fast-test-echo with a random message."""
message = random.choice(self.ECHO_MESSAGES)
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-echo",
"arguments": {"message": message},
},
)
self._rpc_request(payload, "/rpc fast-test-echo")
@task(5)
@tag("mcp", "fasttest", "echo")
def call_echo_short(self):
"""Call fast-test-echo with a short message."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-echo",
"arguments": {"message": "ping"},
},
)
self._rpc_request(payload, "/rpc fast-test-echo [short]")
@task(3)
@tag("mcp", "fasttest", "echo")
def call_echo_long(self):
"""Call fast-test-echo with a longer message."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-echo",
"arguments": {"message": "A" * 1000},
},
)
self._rpc_request(payload, "/rpc fast-test-echo [long]")
@task(2)
@tag("mcp", "fasttest", "list")
def list_tools(self):
"""List tools via JSON-RPC."""
payload = _json_rpc_request("tools/list")
self._rpc_request(payload, "/rpc tools/list [fasttest]")
class FastTestTimeUser(BaseUser):
"""User that calls the fast_test MCP server get_system_time tool.
Tests the fast-test-get-system-time tool via JSON-RPC.
Weight: Medium (time testing)
NOTE: These tests require the fast_test MCP server to be running.
Start with: make testing-up
502 errors are expected if no MCP server is connected.
"""
weight = 3
wait_time = between(0.5, 1.5)
# Test timezones
TIMEZONES = [
"UTC",
"America/New_York",
"America/Los_Angeles",
"Europe/London",
"Europe/Paris",
"Europe/Dublin",
"Asia/Tokyo",
"Asia/Shanghai",
"Australia/Sydney",
]
def _rpc_request(self, payload: dict, name: str):
"""Make an RPC request with proper error handling."""
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name=name,
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
@task(10)
@tag("mcp", "fasttest", "time")
def call_get_system_time(self):
"""Call fast-time-get-system-time with a random timezone."""
timezone = random.choice(self.TIMEZONES)
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-get-system-time",
"arguments": {"timezone": timezone},
},
)
self._rpc_request(payload, "/rpc fast-test-get-system-time")
@task(5)
@tag("mcp", "fasttest", "time")
def call_get_system_time_utc(self):
"""Call fast-test-get-system-time with UTC timezone."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-get-system-time",
"arguments": {"timezone": "UTC"},
},
)
self._rpc_request(payload, "/rpc fast-test-get-system-time [UTC]")
@task(3)
@tag("mcp", "fasttest", "time")
def call_get_system_time_local(self):
"""Call fast-test-get-system-time with America/New_York timezone."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-get-system-time",
"arguments": {"timezone": "America/New_York"},
},
)
self._rpc_request(payload, "/rpc fast-test-get-system-time [NYC]")
@task(2)
@tag("mcp", "fasttest", "stats")
def call_get_stats(self):
"""Call fast-test-get-stats to get server statistics."""
payload = _json_rpc_request(
"tools/call",
{
"name": "fast-test-get-stats",
"arguments": {},
},
)
self._rpc_request(payload, "/rpc fast-test-get-stats")
@task(2)
@tag("mcp", "fasttest", "list")
def list_tools(self):
"""List tools via JSON-RPC."""
payload = _json_rpc_request("tools/list")
self._rpc_request(payload, "/rpc tools/list [fasttest]")
# =============================================================================
# Batch 1: High Priority - Version, Export/Import, A2A CRUD, Gateway CRUD
# =============================================================================
class VersionMetaUser(BaseUser):
"""User that checks version and extended health endpoints.
Tests metadata and diagnostic endpoints that provide system information.
These are typically used by monitoring systems and debugging tools.
Endpoints tested:
- GET /version - Application version and build information
- GET /health/security - Security-focused health check
Weight: Very low (infrequent monitoring checks)
"""
weight = 1
wait_time = between(2.0, 5.0)
@task(5)
@tag("meta", "version")
def get_version(self):
"""GET /version - Get application version and build info."""
with self.client.get(
"/version",
headers=self.auth_headers,
name="/version",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(3)
@tag("health", "security")
def health_security(self):
"""GET /health/security - Security-focused health check."""
with self.client.get(
"/health/security",
headers=self.auth_headers,
name="/health/security",
catch_response=True,
) as response:
# May return 404 if security health not configured
self._validate_json_response(response, allowed_codes=[200, 404])
class ExportImportUser(BaseUser):
"""User that tests configuration export and import functionality.
Tests the backup and restore capabilities of the gateway.
These operations are typically used for:
- Configuration backup before upgrades
- Migrating configurations between environments
- Disaster recovery
Endpoints tested:
- GET /export - Export full configuration
- POST /export/selective - Export selected entity types
- POST /import - Import configuration (with cleanup)
- GET /import/status - Check import job status
- POST /import/cleanup - Clean up old import jobs
Weight: Very low (administrative operations)
"""
weight = 1
wait_time = between(3.0, 8.0)
@task(5)
@tag("export", "backup")
def export_full(self):
"""GET /export - Export full gateway configuration."""
with self.client.get(
"/export",
headers=self.auth_headers,
name="/export",
catch_response=True,
) as response:
self._validate_json_response(response)
# NOTE: /export/selective disabled due to application bug:
# "'Server' object has no attribute 'is_active'" - needs fix in export_service.py
# @task(3)
# @tag("export", "selective")
# def export_selective(self):
# """POST /export/selective - Export selected entities by ID/name."""
# pass
@task(2)
@tag("import", "status")
def import_status_list(self):
"""GET /import/status - List all import job statuses."""
with self.client.get(
"/import/status",
headers=self.auth_headers,
name="/import/status",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(1)
@tag("import", "cleanup")
def import_cleanup(self):
"""POST /import/cleanup - Clean up old import jobs."""
with self.client.post(
"/import/cleanup",
headers=self.auth_headers,
name="/import/cleanup",
catch_response=True,
) as response:
self._validate_json_response(response)
class A2AFullCRUDUser(BaseUser):
"""User that performs full CRUD operations on A2A (Agent-to-Agent) agents.
Tests the complete lifecycle of A2A agents including creation, updates,
state changes, and deletion. A2A agents enable agent-to-agent communication
following the A2A protocol specification.
Endpoints tested:
- GET /a2a/{agent_id} - Get single agent details
- POST /a2a - Create new A2A agent
- PUT /a2a/{agent_id} - Update agent configuration
- POST /a2a/{agent_id}/state - Toggle agent enabled state
- DELETE /a2a/{agent_id} - Remove agent
Weight: Low (administrative CRUD operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
def __init__(self, *args, **kwargs):
"""Initialize with tracking for cleanup."""
super().__init__(*args, **kwargs)
self.created_agents: list[str] = []
def on_stop(self):
"""Clean up created agents on test stop."""
for agent_id in self.created_agents:
try:
self.client.delete(
f"/a2a/{agent_id}",
headers=self.auth_headers,
name="/a2a/[id] [cleanup]",
)
except Exception:
pass
@task(5)
@tag("a2a", "read")
def get_single_agent(self):
"""GET /a2a/{agent_id} - Get details of a specific A2A agent."""
with self.client.get(
"/a2a",
headers=self.auth_headers,
name="/a2a [list for get]",
catch_response=True,
) as response:
if response.status_code != 200:
response.success()
return
try:
data = response.json()
agents = data if isinstance(data, list) else data.get("agents", data.get("items", []))
if agents:
agent_id = random.choice(agents).get("id")
if agent_id:
self.client.get(
f"/a2a/{agent_id}",
headers=self.auth_headers,
name="/a2a/[id]",
)
response.success()
except Exception:
response.success()
@task(3)
@tag("a2a", "write", "create")
def create_and_delete_agent(self):
"""POST /a2a - Create an A2A agent, then DELETE it."""
agent_name = f"loadtest-a2a-{uuid.uuid4().hex[:8]}"
agent_data = {
"name": agent_name,
"description": "Load test A2A agent - will be deleted",
"url": "http://localhost:9999", # Fake URL for testing
}
with self.client.post(
"/a2a",
json=agent_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/a2a [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
agent_id = data.get("id") or data.get("name") or agent_name
time.sleep(0.1)
self.client.delete(
f"/a2a/{agent_id}",
headers=self.auth_headers,
name="/a2a/[id] [delete]",
)
response.success()
except Exception:
response.success()
elif response.status_code in (409, 422):
response.success() # Conflict or validation error acceptable
# NOTE: GatewayFullCRUDUser removed - causes instability under load
# Gateway CRUD operations (create, update, refresh, delete) trigger slow network
# calls to external MCP servers, causing timeouts and cascading failures.
# TODO: Implement proper gateway load testing with mock MCP servers
# =============================================================================
# Batch 2: Extended Resources, Tags, Protocol, Server Endpoints
# =============================================================================
class ResourcesExtendedUser(BaseUser):
"""User that tests extended resource endpoints beyond basic CRUD.
Tests resource template listing and detailed resource info retrieval.
Resource templates define parameterized resources that can be instantiated.
Endpoints tested:
- GET /resources/templates/list - List available resource templates
- GET /resources/{resource_id}/info - Get detailed resource metadata
Weight: Low (supplementary resource operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(5)
@tag("resources", "templates")
def list_resource_templates(self):
"""GET /resources/templates/list - List resource templates via REST."""
with self.client.get(
"/resources/templates/list",
headers=self.auth_headers,
name="/resources/templates/list",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(3)
@tag("resources", "info")
def get_resource_info(self):
"""GET /resources/{resource_id}/info - Get detailed resource info."""
if RESOURCE_IDS:
resource_id = random.choice(RESOURCE_IDS)
with self.client.get(
f"/resources/{resource_id}/info",
headers=self.auth_headers,
name="/resources/[id]/info",
catch_response=True,
) as response:
# 200=Success, 404=Not found
self._validate_json_response(response, allowed_codes=[200, 404])
# NOTE: TagsExtendedUser removed - /tags/{name}/entities has app bug:
# "function json_extract(json, character varying) does not exist"
# SQLite function used with PostgreSQL - needs fix in tag service
# NOTE: AdvancedProtocolUser removed - endpoints have issues:
# - /protocol/notifications - Returns null/empty response
# - /protocol/completion/complete - Requires existing prompt name
# - /protocol/sampling/createMessage - Complex payload validation
# TODO: Re-implement with proper test fixtures
class ServerExtendedUser(BaseUser):
"""User that tests extended virtual server endpoints.
Tests server-specific endpoints for accessing prompts and other
server-scoped resources.
Endpoints tested:
- GET /servers/{server_id}/prompts - Get prompts from a specific server
Weight: Low (server-scoped operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(5)
@tag("servers", "prompts")
def get_server_prompts(self):
"""GET /servers/{server_id}/prompts - Get prompts from a server."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.get(
f"/servers/{server_id}/prompts",
headers=self.auth_headers,
name="/servers/[id]/prompts",
catch_response=True,
) as response:
# 200=Success, 404=Server not found
self._validate_json_response(response, allowed_codes=[200, 404])
# =============================================================================
# Batch 3: Teams, Tokens, RBAC
# =============================================================================
# NOTE: TeamsUser removed - endpoints have app bugs:
# - GET /teams - 500: 'NoneType' object has no attribute 'execute' (db session is None)
# - GET /teams/discover - 401: Requires specific authentication
# TODO: Fix teams router db session handling and re-enable
class TokensUser(BaseUser):
"""User that tests API token management endpoints.
Tests token listing and usage statistics retrieval.
API tokens provide programmatic access to the gateway.
Endpoints tested:
- GET /tokens - List user's tokens
- GET /tokens/{token_id}/usage - Get token usage statistics
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(5)
@tag("tokens", "list")
def list_tokens(self):
"""GET /tokens - List user's API tokens."""
with self.client.get(
"/tokens",
headers=self.auth_headers,
name="/tokens",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
class RBACUser(BaseUser):
"""User that tests Role-Based Access Control endpoints.
Tests role listing, permission discovery, and user permission queries.
RBAC provides fine-grained access control to gateway resources.
Endpoints tested:
- GET /rbac/roles - List all roles
- GET /rbac/my/roles - Get current user's roles
- GET /rbac/my/permissions - Get current user's permissions
- GET /rbac/permissions/available - List all available permissions
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(4)
@tag("rbac", "roles")
def list_roles(self):
"""GET /rbac/roles - List all RBAC roles."""
with self.client.get(
"/rbac/roles",
headers=self.auth_headers,
name="/rbac/roles",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(3)
@tag("rbac", "my")
def get_my_roles(self):
"""GET /rbac/my/roles - Get current user's roles."""
with self.client.get(
"/rbac/my/roles",
headers=self.auth_headers,
name="/rbac/my/roles",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(3)
@tag("rbac", "my")
def get_my_permissions(self):
"""GET /rbac/my/permissions - Get current user's permissions."""
with self.client.get(
"/rbac/my/permissions",
headers=self.auth_headers,
name="/rbac/my/permissions",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(2)
@tag("rbac", "permissions")
def list_available_permissions(self):
"""GET /rbac/permissions/available - List all available permissions."""
with self.client.get(
"/rbac/permissions/available",
headers=self.auth_headers,
name="/rbac/permissions/available",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
# =============================================================================
# Batch 4: Authentication & OAuth User Classes
# =============================================================================
class AuthUser(BaseUser):
"""User that tests email authentication admin endpoints.
Tests user management and authentication event logging endpoints.
These are read-only admin endpoints for monitoring auth activity.
Endpoints tested:
- GET /auth/email/events - Get current user's auth events
- GET /auth/email/admin/events - Admin view of all auth events
- GET /auth/email/admin/users - Admin list of email users
Skipped endpoints:
- POST /auth/login - Write operation (creates session)
- POST /auth/email/login - Write operation
- POST /auth/email/register - Write operation (creates user)
- GET /auth/email/me - Requires email session, not JWT auth
- SSO endpoints - Not available (404)
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(3)
@tag("auth", "events")
def get_auth_events(self):
"""GET /auth/email/events - Get current user's authentication events."""
with self.client.get(
"/auth/email/events",
headers=self.auth_headers,
name="/auth/email/events",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(2)
@tag("auth", "admin", "events")
def get_admin_auth_events(self):
"""GET /auth/email/admin/events - Admin view of all authentication events."""
with self.client.get(
"/auth/email/admin/events",
headers=self.auth_headers,
name="/auth/email/admin/events",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("auth", "admin", "users")
def list_admin_users(self):
"""GET /auth/email/admin/users - Admin list of registered email users."""
with self.client.get(
"/auth/email/admin/users",
headers=self.auth_headers,
name="/auth/email/admin/users",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
class OAuthUser(BaseUser):
"""User that tests OAuth client management endpoints.
Tests OAuth client registration and authorization status endpoints.
These endpoints support OAuth 2.0 flows for gateway authentication.
Endpoints tested:
- GET /oauth/registered-clients - List registered OAuth clients
Skipped endpoints:
- GET /oauth/authorize/{gateway_id} - Requires valid gateway with OAuth
- GET /oauth/status/{gateway_id} - Requires valid gateway
- GET /oauth/callback - Part of OAuth flow, not directly callable
- DELETE /oauth/registered-clients/{client_id} - Write operation
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(5)
@tag("oauth", "clients")
def list_registered_clients(self):
"""GET /oauth/registered-clients - List registered OAuth clients."""
with self.client.get(
"/oauth/registered-clients",
headers=self.auth_headers,
name="/oauth/registered-clients",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
# =============================================================================
# Batch 5: Logs, Metrics, Observability User Classes
# =============================================================================
class LogSearchUser(BaseUser):
"""User that tests structured log search and audit endpoints.
Tests log search, security events, audit trails, and performance metrics.
These endpoints provide visibility into system activity and security.
Endpoints tested:
- GET /api/logs/security-events - Security event log
- GET /api/logs/audit-trails - Audit trail entries
- GET /api/logs/performance-metrics - Performance metrics log
Skipped endpoints:
- POST /api/logs/search - Complex search payload
- GET /api/logs/trace/{correlation_id} - Requires valid correlation ID
Weight: Low (monitoring operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(3)
@tag("logs", "security")
def get_security_events(self):
"""GET /api/logs/security-events - Get security event log."""
with self.client.get(
"/api/logs/security-events",
headers=self.auth_headers,
name="/api/logs/security-events",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(3)
@tag("logs", "audit")
def get_audit_trails(self):
"""GET /api/logs/audit-trails - Get audit trail entries."""
with self.client.get(
"/api/logs/audit-trails",
headers=self.auth_headers,
name="/api/logs/audit-trails",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("logs", "performance")
def get_performance_metrics(self):
"""GET /api/logs/performance-metrics - Get performance metrics log."""
with self.client.get(
"/api/logs/performance-metrics",
headers=self.auth_headers,
name="/api/logs/performance-metrics",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
class MetricsUser(BaseUser):
"""User that tests metrics and statistics endpoints.
Tests system metrics, configuration, and Prometheus export endpoints.
These endpoints provide operational visibility and monitoring integration.
Endpoints tested:
- GET /metrics - Aggregated system metrics
- GET /api/metrics/stats - Detailed metrics statistics
- GET /api/metrics/config - Metrics configuration
- GET /metrics/prometheus - Prometheus-format metrics export
Skipped endpoints:
- POST /api/metrics/cleanup - Write operation
- POST /api/metrics/rollup - Write operation
- POST /metrics/reset - Write operation
Weight: Low (monitoring operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(4)
@tag("metrics", "aggregated")
def get_metrics(self):
"""GET /metrics - Get aggregated system metrics."""
with self.client.get(
"/metrics",
headers=self.auth_headers,
name="/metrics",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(3)
@tag("metrics", "stats")
def get_metrics_stats(self):
"""GET /api/metrics/stats - Get detailed metrics statistics."""
with self.client.get(
"/api/metrics/stats",
headers=self.auth_headers,
name="/api/metrics/stats",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("metrics", "config")
def get_metrics_config(self):
"""GET /api/metrics/config - Get metrics configuration."""
with self.client.get(
"/api/metrics/config",
headers=self.auth_headers,
name="/api/metrics/config",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("metrics", "prometheus")
def get_prometheus_metrics(self):
"""GET /metrics/prometheus - Get Prometheus-format metrics."""
with self.client.get(
"/metrics/prometheus",
headers=self.auth_headers,
name="/metrics/prometheus",
catch_response=True,
) as response:
# 200=Success - Prometheus format is plain text, not JSON
if response.status_code == 200:
response.success()
else:
response.failure(f"Expected [200], got {response.status_code}")
class ObservabilityUser(BaseUser):
"""User that tests admin observability JSON endpoints.
Tests observability endpoints that return JSON data (not HTML templates).
These provide tool usage, performance, and volume analytics.
Endpoints tested:
- GET /admin/observability/tools/usage - Tool usage statistics
- GET /admin/observability/tools/performance - Tool performance data
- GET /admin/observability/metrics/top-volume - Top volume endpoints
Skipped endpoints:
- HTML-returning endpoints (already covered by admin UI tests)
- POST /admin/observability/queries - Write operation
- Endpoints requiring specific IDs
Weight: Low (admin analytics)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(3)
@tag("observability", "tools")
def get_tools_usage(self):
"""GET /admin/observability/tools/usage - Get tool usage statistics."""
with self.client.get(
"/admin/observability/tools/usage",
headers=self.auth_headers,
name="/admin/observability/tools/usage",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(3)
@tag("observability", "performance")
def get_tools_performance(self):
"""GET /admin/observability/tools/performance - Get tool performance data."""
with self.client.get(
"/admin/observability/tools/performance",
headers=self.auth_headers,
name="/admin/observability/tools/performance",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("observability", "volume")
def get_top_volume(self):
"""GET /admin/observability/metrics/top-volume - Get top volume endpoints."""
with self.client.get(
"/admin/observability/metrics/top-volume",
headers=self.auth_headers,
name="/admin/observability/metrics/top-volume",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
# =============================================================================
# Batch 6: LLM, Reverse Proxy User Classes
# =============================================================================
class LLMUser(BaseUser):
"""User that tests LLM provider and model configuration endpoints.
Tests LLM gateway models and provider configuration endpoints.
These endpoints provide LLM integration capabilities.
Endpoints tested:
- GET /llm/gateway/models - List gateway-available models
- GET /llmchat/gateway/models - List chat gateway models
- GET /admin/llm/provider-configs - LLM provider configurations
- GET /admin/llm/provider-defaults - Default provider settings
Skipped endpoints:
- GET /llm/providers - 500 (requires LLM providers configured)
- GET /llm/models - 500 (requires LLM providers configured)
- POST endpoints - Write operations
- LLMChat status/config - Require specific user ID
Weight: Low (configuration endpoints)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(3)
@tag("llm", "models")
def get_gateway_models(self):
"""GET /llm/gateway/models - List gateway-available LLM models."""
with self.client.get(
"/llm/gateway/models",
headers=self.auth_headers,
name="/llm/gateway/models",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(3)
@tag("llm", "chat", "models")
def get_chat_gateway_models(self):
"""GET /llmchat/gateway/models - List chat gateway LLM models."""
with self.client.get(
"/llmchat/gateway/models",
headers=self.auth_headers,
name="/llmchat/gateway/models",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized
self._validate_json_response(response, allowed_codes=[200, 401])
@task(2)
@tag("llm", "admin", "config")
def get_provider_configs(self):
"""GET /admin/llm/provider-configs - Get LLM provider configurations."""
with self.client.get(
"/admin/llm/provider-configs",
headers=self.auth_headers,
name="/admin/llm/provider-configs",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
@task(2)
@tag("llm", "admin", "defaults")
def get_provider_defaults(self):
"""GET /admin/llm/provider-defaults - Get default LLM provider settings."""
with self.client.get(
"/admin/llm/provider-defaults",
headers=self.auth_headers,
name="/admin/llm/provider-defaults",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
class ReverseProxyUser(BaseUser):
"""User that tests reverse proxy session management endpoints.
Tests reverse proxy session listing for managing proxy connections.
Endpoints tested:
- GET /reverse-proxy/sessions - List active proxy sessions
Skipped endpoints:
- DELETE /reverse-proxy/sessions/{session_id} - Write operation
- POST /reverse-proxy/sessions/{session_id}/request - Write operation
- GET /reverse-proxy/sse/{session_id} - SSE streaming
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(1.0, 3.0)
@task(5)
@tag("reverse-proxy", "sessions")
def list_sessions(self):
"""GET /reverse-proxy/sessions - List active reverse proxy sessions."""
with self.client.get(
"/reverse-proxy/sessions",
headers=self.auth_headers,
name="/reverse-proxy/sessions",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 403=Forbidden
self._validate_json_response(response, allowed_codes=[200, 401, 403])
# =============================================================================
# Batch 1 Phase 1: Teams, Tokens, RBAC, Cancellation (Priority 1)
# =============================================================================
# Global pools for team/token IDs (populated at test start)
TEAM_IDS: list[str] = []
ROLE_IDS: list[str] = []
@events.test_start.add_listener
def on_test_start_batch1(environment, **_kwargs):
"""Fetch team and role IDs for batch 1 tests."""
host = environment.host or "http://localhost:8080"
headers = _get_auth_headers()
try:
# Fetch teams
status, data = _fetch_json(f"{host}/teams/", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("teams", data.get("items", []))
TEAM_IDS.extend([str(t.get("id")) for t in items[:20] if t.get("id")])
logger.info(f"Loaded {len(TEAM_IDS)} team IDs")
# Fetch RBAC roles
status, data = _fetch_json(f"{host}/rbac/roles", headers)
if status == 200 and data:
items = data if isinstance(data, list) else data.get("roles", data.get("items", []))
ROLE_IDS.extend([str(r.get("id")) for r in items[:20] if r.get("id")])
logger.info(f"Loaded {len(ROLE_IDS)} role IDs")
except Exception as e:
logger.warning(f"Failed to fetch batch1 IDs: {e}")
@events.test_stop.add_listener
def on_test_stop_batch1(environment, **kwargs):
"""Clean up batch 1 pools."""
TEAM_IDS.clear()
ROLE_IDS.clear()
class TeamsCRUDUser(BaseUser):
"""User that performs CRUD operations on Teams.
Tests the complete Teams API for collaboration features including
team management, membership, invitations, and join requests.
Endpoints tested:
- GET /teams/ - List teams
- POST /teams/ - Create team
- GET /teams/{team_id} - Get team details
- PUT /teams/{team_id} - Update team
- DELETE /teams/{team_id} - Delete team
- GET /teams/discover - Discover public teams
- GET /teams/{team_id}/members - List team members
- GET /teams/{team_id}/invitations - List invitations
- GET /teams/{team_id}/join-requests - List join requests
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
def __init__(self, *args, **kwargs):
"""Initialize with tracking for cleanup."""
super().__init__(*args, **kwargs)
self.created_teams: list[str] = []
def on_stop(self):
"""Clean up created teams on test stop."""
for team_id in self.created_teams:
try:
self.client.delete(
f"/teams/{team_id}",
headers=self.auth_headers,
name="/teams/[id] [cleanup]",
)
except Exception:
pass
@task(5)
@tag("teams", "list")
def list_teams(self):
"""GET /teams/ - List all teams."""
with self.client.get(
"/teams/",
headers=self.auth_headers,
name="/teams/",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden, 500=Server error (teams may not be configured)
self._validate_json_response(response, allowed_codes=[200, 403, 500])
@task(3)
@tag("teams", "discover")
def discover_teams(self):
"""GET /teams/discover - Discover public teams."""
with self.client.get(
"/teams/discover",
headers=self.auth_headers,
name="/teams/discover",
catch_response=True,
) as response:
# 200=Success, 401=Auth issue, 403=Forbidden, 500=Server error
self._validate_json_response(response, allowed_codes=[200, 401, 403, 500])
@task(4)
@tag("teams", "read")
def get_team_details(self):
"""GET /teams/{team_id} - Get team details."""
if TEAM_IDS:
team_id = random.choice(TEAM_IDS)
with self.client.get(
f"/teams/{team_id}",
headers=self.auth_headers,
name="/teams/[id]",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden, 404=Not found, 500=Server error
self._validate_json_response(response, allowed_codes=[200, 403, 404, 500])
@task(3)
@tag("teams", "members")
def list_team_members(self):
"""GET /teams/{team_id}/members - List team members."""
if TEAM_IDS:
team_id = random.choice(TEAM_IDS)
with self.client.get(
f"/teams/{team_id}/members",
headers=self.auth_headers,
name="/teams/[id]/members",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden, 404=Not found, 500=Server error
self._validate_json_response(response, allowed_codes=[200, 403, 404, 500])
@task(2)
@tag("teams", "invitations")
def list_team_invitations(self):
"""GET /teams/{team_id}/invitations - List team invitations."""
if TEAM_IDS:
team_id = random.choice(TEAM_IDS)
with self.client.get(
f"/teams/{team_id}/invitations",
headers=self.auth_headers,
name="/teams/[id]/invitations",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden, 404=Not found, 500=Server error
self._validate_json_response(response, allowed_codes=[200, 403, 404, 500])
@task(2)
@tag("teams", "join-requests")
def list_join_requests(self):
"""GET /teams/{team_id}/join-requests - List join requests."""
if TEAM_IDS:
team_id = random.choice(TEAM_IDS)
with self.client.get(
f"/teams/{team_id}/join-requests",
headers=self.auth_headers,
name="/teams/[id]/join-requests",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden, 404=Not found, 500=Server error
self._validate_json_response(response, allowed_codes=[200, 403, 404, 500])
@task(2)
@tag("teams", "write", "create")
def create_and_delete_team(self):
"""POST /teams/ - Create a team, then DELETE it."""
team_name = f"loadtest-team-{uuid.uuid4().hex[:8]}"
team_data = {
"name": team_name,
"description": "Load test team - will be deleted",
"visibility": "private",
}
with self.client.post(
"/teams/",
json=team_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/teams/ [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
team_id = data.get("id") or data.get("name") or team_name
time.sleep(0.1)
self.client.delete(
f"/teams/{team_id}",
headers=self.auth_headers,
name="/teams/[id] [delete]",
)
response.success()
except Exception:
response.success()
elif response.status_code in (403, 409, 422, 500):
# 403=Forbidden, 409=Conflict, 422=Validation error, 500=Server error
response.success()
class TokenCatalogCRUDUser(BaseUser):
"""User that performs CRUD operations on JWT Token Catalog.
Tests the complete Token Catalog API for managing API tokens including
creation, listing, updates, usage stats, and deletion.
Endpoints tested:
- GET /tokens - List user's tokens
- POST /tokens - Create token
- GET /tokens/{token_id} - Get token details
- PUT /tokens/{token_id} - Update token
- DELETE /tokens/{token_id} - Delete token
- GET /tokens/{token_id}/usage - Get token usage stats
- GET /tokens/admin/all - Admin: list all tokens
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
def __init__(self, *args, **kwargs):
"""Initialize with tracking for cleanup."""
super().__init__(*args, **kwargs)
self.created_tokens: list[str] = []
def on_stop(self):
"""Clean up created tokens on test stop."""
for token_id in self.created_tokens:
try:
self.client.delete(
f"/tokens/{token_id}",
headers=self.auth_headers,
name="/tokens/[id] [cleanup]",
)
except Exception:
pass
@task(5)
@tag("tokens", "list")
def list_tokens(self):
"""GET /tokens - List user's tokens."""
with self.client.get(
"/tokens",
headers=self.auth_headers,
name="/tokens",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(2)
@tag("tokens", "admin", "list")
def list_all_tokens_admin(self):
"""GET /tokens/admin/all - Admin: list all tokens."""
with self.client.get(
"/tokens/admin/all",
headers=self.auth_headers,
name="/tokens/admin/all",
catch_response=True,
) as response:
# 200=Success, 403=Forbidden (non-admin)
self._validate_json_response(response, allowed_codes=[200, 403])
@task(3)
@tag("tokens", "write", "create")
def create_and_manage_token(self):
"""POST /tokens - Create a token, get details, usage, then DELETE."""
token_name = f"loadtest-token-{uuid.uuid4().hex[:8]}"
token_data = {
"name": token_name,
"description": "Load test token - will be deleted",
"expires_in_days": 1,
}
with self.client.post(
"/tokens",
json=token_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/tokens [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
token_id = data.get("id")
if token_id:
# Get token details
time.sleep(0.05)
self.client.get(
f"/tokens/{token_id}",
headers=self.auth_headers,
name="/tokens/[id]",
)
# Get usage stats
time.sleep(0.05)
self.client.get(
f"/tokens/{token_id}/usage",
headers=self.auth_headers,
name="/tokens/[id]/usage",
)
# Delete token
time.sleep(0.05)
self.client.delete(
f"/tokens/{token_id}",
headers=self.auth_headers,
name="/tokens/[id] [delete]",
)
response.success()
except Exception:
response.success()
elif response.status_code in (409, 422):
response.success() # Conflict or validation error acceptable
@task(2)
@tag("tokens", "teams")
def list_team_tokens(self):
"""GET /tokens/teams/{team_id} - List team tokens."""
if TEAM_IDS:
team_id = random.choice(TEAM_IDS)
with self.client.get(
f"/tokens/teams/{team_id}",
headers=self.auth_headers,
name="/tokens/teams/[id]",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 403, 404])
class RBACCRUDUser(BaseUser):
"""User that performs CRUD operations on RBAC (Role-Based Access Control).
Tests the complete RBAC API for managing roles and permissions including
role creation, permission assignment, and user-role mappings.
Endpoints tested:
- GET /rbac/roles - List roles
- POST /rbac/roles - Create role
- GET /rbac/roles/{role_id} - Get role details
- PUT /rbac/roles/{role_id} - Update role
- DELETE /rbac/roles/{role_id} - Delete role
- POST /rbac/permissions/check - Check permission
- GET /rbac/permissions/user/{user_email} - Get user's permissions
- POST /rbac/users/{user_email}/roles - Assign role to user
- GET /rbac/users/{user_email}/roles - List user's roles
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
def __init__(self, *args, **kwargs):
"""Initialize with tracking for cleanup."""
super().__init__(*args, **kwargs)
self.created_roles: list[str] = []
def on_stop(self):
"""Clean up created roles on test stop."""
for role_id in self.created_roles:
try:
self.client.delete(
f"/rbac/roles/{role_id}",
headers=self.auth_headers,
name="/rbac/roles/[id] [cleanup]",
)
except Exception:
pass
@task(5)
@tag("rbac", "roles", "list")
def list_roles(self):
"""GET /rbac/roles - List all roles."""
with self.client.get(
"/rbac/roles",
headers=self.auth_headers,
name="/rbac/roles",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(3)
@tag("rbac", "roles", "read")
def get_role_details(self):
"""GET /rbac/roles/{role_id} - Get role details."""
if ROLE_IDS:
role_id = random.choice(ROLE_IDS)
with self.client.get(
f"/rbac/roles/{role_id}",
headers=self.auth_headers,
name="/rbac/roles/[id]",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(3)
@tag("rbac", "permissions", "check")
def check_permission(self):
"""POST /rbac/permissions/check - Check if user has permission."""
check_data = {
"user_email": "admin@example.com",
"permission": "tools:read",
}
with self.client.post(
"/rbac/permissions/check",
json=check_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/rbac/permissions/check",
catch_response=True,
) as response:
self._validate_json_response(response)
@task(2)
@tag("rbac", "permissions", "user")
def get_user_permissions(self):
"""GET /rbac/permissions/user/{user_email} - Get user's permissions."""
with self.client.get(
"/rbac/permissions/user/admin@example.com",
headers=self.auth_headers,
name="/rbac/permissions/user/[email]",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("rbac", "users", "roles")
def get_user_roles(self):
"""GET /rbac/users/{user_email}/roles - Get user's assigned roles."""
with self.client.get(
"/rbac/users/admin@example.com/roles",
headers=self.auth_headers,
name="/rbac/users/[email]/roles",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(1)
@tag("rbac", "roles", "write", "create")
def create_and_delete_role(self):
"""POST /rbac/roles - Create a role, then DELETE it."""
role_name = f"loadtest-role-{uuid.uuid4().hex[:8]}"
role_data = {
"name": role_name,
"description": "Load test role - will be deleted",
"permissions": ["tools:read"],
}
with self.client.post(
"/rbac/roles",
json=role_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/rbac/roles [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
data = response.json()
role_id = data.get("id") or data.get("name") or role_name
time.sleep(0.1)
self.client.delete(
f"/rbac/roles/{role_id}",
headers=self.auth_headers,
name="/rbac/roles/[id] [delete]",
)
response.success()
except Exception:
response.success()
elif response.status_code in (409, 422):
response.success() # Conflict or validation error acceptable
class CancellationAPIUser(BaseUser):
"""User that tests the Cancellation API for request management.
Tests the ability to cancel in-progress requests and check cancellation status.
Endpoints tested:
- POST /cancellation/cancel - Cancel a request
- GET /cancellation/status/{request_id} - Get cancellation status
Weight: Very low (rarely used in production)
"""
weight = 1
wait_time = between(3.0, 8.0)
@task(3)
@tag("cancellation", "status")
def check_cancellation_status(self):
"""GET /cancellation/status/{request_id} - Check cancellation status."""
# Use a random UUID as request_id (will likely return 404)
request_id = str(uuid.uuid4())
with self.client.get(
f"/cancellation/status/{request_id}",
headers=self.auth_headers,
name="/cancellation/status/[id]",
catch_response=True,
) as response:
# 200=Found, 404=Not found (expected for random ID)
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("cancellation", "cancel")
def cancel_request(self):
"""POST /cancellation/cancel - Attempt to cancel a request."""
# Use a random UUID as request_id (will likely fail gracefully)
cancel_data = {
"request_id": str(uuid.uuid4()),
"reason": "Load test cancellation",
}
with self.client.post(
"/cancellation/cancel",
json=cancel_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/cancellation/cancel",
catch_response=True,
) as response:
# 200=Success, 404=Not found, 422=Invalid request
self._validate_json_response(response, allowed_codes=[200, 404, 422])
# =============================================================================
# Batch 2 Phase 2: LLM Configuration & Integration (REMOVED - requires LLM setup)
# =============================================================================
# The following classes were REMOVED because they fail when LLM is not configured:
#
# - LLMConfigCRUDUser: Tests /llm/providers, /llm/models endpoints
# Endpoints: GET /llm/providers, GET /llm/models, GET /llm/providers/{id},
# POST /llm/providers/{id}/health
#
# - LLMChatUser: Tests /llmchat/* endpoints
# Endpoints: GET /llmchat/gateway/models, GET /llmchat/config/{user_id},
# GET /llmchat/status/{user_id}
#
# - LLMProxyUser: Tests /v1/* OpenAI-compatible endpoints
# Endpoints: GET /v1/models, POST /v1/chat/completions
#
# To re-enable: Configure LLM providers in the gateway and uncomment these classes.
# =============================================================================
# =============================================================================
# Batch 3 Phase 3: Observability, Protocol, & Extended Operations (Priority 3)
# =============================================================================
# ProtocolExtendedUser REMOVED - returns empty/invalid JSON responses
# Endpoints removed:
# - POST /protocol/completion/complete - Returns empty response
# - POST /protocol/notifications - Returns null JSON
# To re-enable: Fix the protocol endpoints to return valid JSON responses
class RootsExtendedUser(BaseUser):
"""User that tests extended Roots API endpoints.
Tests root management operations including creation and deletion.
Endpoints tested:
- GET /roots - List roots (already covered, included for context)
- POST /roots - Create root
- DELETE /roots/{uri} - Delete root
Note: GET /roots/changes was REMOVED - returns SSE stream, not JSON.
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
# GET /roots/changes REMOVED - endpoint returns text/event-stream (SSE), not JSON
# This is a streaming endpoint not suitable for standard load testing
@task(3)
@tag("roots", "list")
def list_roots(self):
"""GET /roots - List all roots."""
with self.client.get(
"/roots",
headers=self.auth_headers,
name="/roots",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200])
@task(2)
@tag("roots", "write", "create")
def create_and_delete_root(self):
"""POST /roots - Create a root, then DELETE it."""
root_uri = f"file:///tmp/loadtest-root-{uuid.uuid4().hex[:8]}"
root_data = {
"uri": root_uri,
"name": f"loadtest-root-{uuid.uuid4().hex[:8]}",
}
with self.client.post(
"/roots",
json=root_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/roots [create]",
catch_response=True,
) as response:
if response.status_code in (200, 201):
try:
time.sleep(0.1)
# URL-encode the URI for deletion
encoded_uri = root_uri.replace("/", "%2F").replace(":", "%3A")
# Delete may return 404 (already deleted) or 500 (server bug)
with self.client.delete(
f"/roots/{encoded_uri}",
headers=self.auth_headers,
name="/roots/[uri] [delete]",
catch_response=True,
) as del_response:
# Accept 200, 204, 404 (not found), 500 (server issues)
if del_response.status_code in (200, 204, 404, 500):
del_response.success()
else:
del_response.failure(f"Unexpected status: {del_response.status_code}")
response.success()
except Exception:
response.success()
elif response.status_code in (409, 422):
response.success() # Conflict or validation error acceptable
class TagsExtendedUser(BaseUser):
"""User that tests extended Tags API endpoints.
Tests tag-based entity discovery.
Endpoints tested:
- GET /tags/{tag_name}/entities - Get entities by tag
Weight: Low (read operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
@task(5)
@tag("tags", "entities")
def get_entities_by_tag(self):
"""GET /tags/{tag_name}/entities - Get entities tagged with a specific tag."""
# Common tag names that might exist
tag_names = ["mcp", "tool", "server", "gateway", "test", "loadtest"]
tag_name = random.choice(tag_names)
with self.client.get(
f"/tags/{tag_name}/entities",
headers=self.auth_headers,
name="/tags/[name]/entities",
catch_response=True,
) as response:
# 200=Success, 404=Tag not found (expected for random tags)
self._validate_json_response(response, allowed_codes=[200, 404])
class LogSearchExtendedUser(BaseUser):
"""User that tests extended Log Search API endpoints.
Tests log search and trace retrieval operations.
Endpoints tested:
- POST /api/logs/search - Search logs
- GET /api/logs/trace/{correlation_id} - Get trace by correlation
Weight: Low (administrative operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
@task(3)
@tag("logs", "search")
def search_logs(self):
"""POST /api/logs/search - Search logs."""
search_data = {
"query": "error",
"level": "INFO",
"limit": 10,
}
with self.client.post(
"/api/logs/search",
json=search_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/api/logs/search",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 400, 422])
@task(2)
@tag("logs", "trace")
def get_trace_by_correlation(self):
"""GET /api/logs/trace/{correlation_id} - Get trace by correlation ID."""
correlation_id = str(uuid.uuid4()) # Random ID (will likely return 404)
with self.client.get(
f"/api/logs/trace/{correlation_id}",
headers=self.auth_headers,
name="/api/logs/trace/[correlation_id]",
catch_response=True,
) as response:
# 200=Found, 404=Not found (expected for random ID)
self._validate_json_response(response, allowed_codes=[200, 404])
class MetricsMaintenanceUser(BaseUser):
"""User that tests Metrics Maintenance API endpoints.
Tests metrics cleanup and rollup operations.
Endpoints tested:
- POST /api/metrics/cleanup - Cleanup old metrics
- POST /api/metrics/rollup - Rollup metrics
Weight: Very low (maintenance operations)
"""
weight = 1
wait_time = between(5.0, 10.0)
@task(2)
@tag("metrics", "cleanup")
def cleanup_metrics(self):
"""POST /api/metrics/cleanup - Cleanup old metrics."""
with self.client.post(
"/api/metrics/cleanup",
headers=self.auth_headers,
name="/api/metrics/cleanup",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 202, 403])
@task(2)
@tag("metrics", "rollup")
def rollup_metrics(self):
"""POST /api/metrics/rollup - Rollup metrics."""
with self.client.post(
"/api/metrics/rollup",
headers=self.auth_headers,
name="/api/metrics/rollup",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 202, 403])
class AuthExtendedUser(BaseUser):
"""User that tests extended Authentication endpoints.
Tests authentication and user management operations.
Endpoints tested:
- POST /auth/login - Main login endpoint
- GET /auth/email/me - Get current user info
- POST /auth/email/change-password - Change password (test validation only)
Weight: Very low (sensitive operations)
"""
weight = 1
wait_time = between(3.0, 8.0)
@task(3)
@tag("auth", "me")
def get_current_user(self):
"""GET /auth/email/me - Get current authenticated user info."""
with self.client.get(
"/auth/email/me",
headers=self.auth_headers,
name="/auth/email/me",
catch_response=True,
) as response:
# 200=Success, 401=Unauthorized, 404=Not found, 422=Validation error
self._validate_json_response(response, allowed_codes=[200, 401, 404, 422])
@task(2)
@tag("auth", "login")
def test_login(self):
"""POST /auth/login - Test main login endpoint."""
login_data = {
"username": "admin@example.com",
"password": "admin", # Default test password
}
with self.client.post(
"/auth/login",
data=login_data, # Form data, not JSON
headers={**self.auth_headers, "Content-Type": "application/x-www-form-urlencoded"},
name="/auth/login",
catch_response=True,
) as response:
# 200=Success, 401=Invalid credentials, 422=Validation error
self._validate_json_response(response, allowed_codes=[200, 401, 422])
class EntityToggleUser(BaseUser):
"""User that tests toggle operations across all entity types.
Tests the toggle endpoints that switch entity enabled state.
Endpoints tested:
- POST /tools/{tool_id}/toggle
- POST /servers/{server_id}/toggle
- POST /gateways/{gateway_id}/toggle
- POST /resources/{resource_id}/toggle
- POST /prompts/{prompt_id}/toggle
- POST /a2a/{agent_id}/toggle
Weight: Low (state operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
@task(3)
@tag("tools", "toggle")
def toggle_tool(self):
"""POST /tools/{tool_id}/toggle - Toggle tool enabled state."""
if TOOL_IDS:
tool_id = random.choice(TOOL_IDS)
with self.client.post(
f"/tools/{tool_id}/toggle",
headers=self.auth_headers,
name="/tools/[id]/toggle",
catch_response=True,
) as response:
# 200=Success, 401=Auth issue, 403=Forbidden, 404=Not found, 409=Conflict
self._validate_json_response(response, allowed_codes=[200, 401, 403, 404, 409])
@task(3)
@tag("servers", "toggle")
def toggle_server(self):
"""POST /servers/{server_id}/toggle - Toggle server enabled state."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.post(
f"/servers/{server_id}/toggle",
headers=self.auth_headers,
name="/servers/[id]/toggle",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 401, 403, 404, 409])
@task(2)
@tag("resources", "toggle")
def toggle_resource(self):
"""POST /resources/{resource_id}/toggle - Toggle resource enabled state."""
if RESOURCE_IDS:
resource_id = random.choice(RESOURCE_IDS)
with self.client.post(
f"/resources/{resource_id}/toggle",
headers=self.auth_headers,
name="/resources/[id]/toggle",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 401, 403, 404, 409])
@task(2)
@tag("prompts", "toggle")
def toggle_prompt(self):
"""POST /prompts/{prompt_id}/toggle - Toggle prompt enabled state."""
if PROMPT_IDS:
prompt_id = random.choice(PROMPT_IDS)
with self.client.post(
f"/prompts/{prompt_id}/toggle",
headers=self.auth_headers,
name="/prompts/[id]/toggle",
catch_response=True,
) as response:
self._validate_json_response(response, allowed_codes=[200, 401, 403, 404, 409])
class EntityUpdateUser(BaseUser):
"""User that tests PUT/UPDATE operations across entity types.
Tests the update endpoints for modifying existing entities.
Endpoints tested:
- PUT /tools/{tool_id}
- PUT /servers/{server_id}
- PUT /resources/{resource_id}
- PUT /prompts/{prompt_id}
- PUT /gateways/{gateway_id}
- PUT /a2a/{agent_id}
Weight: Low (write operations)
"""
weight = 1
wait_time = between(2.0, 5.0)
@task(2)
@tag("tools", "update")
def update_tool(self):
"""PUT /tools/{tool_id} - Update a tool."""
if TOOL_IDS:
tool_id = random.choice(TOOL_IDS)
# First get current tool data
with self.client.get(
f"/tools/{tool_id}",
headers=self.auth_headers,
name="/tools/[id] [for update]",
catch_response=True,
) as response:
if response.status_code == 200:
try:
tool_data = response.json()
# Update description only (safe operation)
tool_data["description"] = f"Updated by load test at {time.time()}"
time.sleep(0.05)
with self.client.put(
f"/tools/{tool_id}",
json=tool_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/tools/[id] [update]",
catch_response=True,
) as put_response:
self._validate_json_response(put_response, allowed_codes=[200, 403, 404, 409, 422])
response.success()
except Exception:
response.success()
else:
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("resources", "update")
def update_resource(self):
"""PUT /resources/{resource_id} - Update a resource."""
if RESOURCE_IDS:
resource_id = random.choice(RESOURCE_IDS)
with self.client.get(
f"/resources/{resource_id}",
headers=self.auth_headers,
name="/resources/[id] [for update]",
catch_response=True,
) as response:
if response.status_code == 200:
try:
resource_data = response.json()
resource_data["description"] = f"Updated by load test at {time.time()}"
time.sleep(0.05)
with self.client.put(
f"/resources/{resource_id}",
json=resource_data,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/resources/[id] [update]",
catch_response=True,
) as put_response:
self._validate_json_response(put_response, allowed_codes=[200, 403, 404, 409, 422])
response.success()
except Exception:
response.success()
else:
self._validate_json_response(response, allowed_codes=[200, 404])
# =============================================================================
# Combined User (Realistic Traffic Pattern)
# =============================================================================
class RealisticUser(BaseUser):
"""User that simulates realistic mixed traffic.
Combines behaviors from all user types with realistic weights.
This is the default user for most load tests.
"""
weight = 10
wait_time = between(0.5, 2.0)
@task(15)
@tag("realistic", "health")
def health_check(self):
"""Health check."""
self.client.get("/health", name="/health")
@task(20)
@tag("realistic", "api")
def list_tools(self):
"""List tools."""
self.client.get("/tools", headers=self.auth_headers, name="/tools")
@task(15)
@tag("realistic", "api")
def list_servers(self):
"""List servers."""
self.client.get("/servers", headers=self.auth_headers, name="/servers")
@task(10)
@tag("realistic", "api")
def list_gateways(self):
"""List gateways."""
self.client.get("/gateways", headers=self.auth_headers, name="/gateways")
@task(10)
@tag("realistic", "api")
def list_resources(self):
"""List resources."""
self.client.get("/resources", headers=self.auth_headers, name="/resources")
@task(10)
@tag("realistic", "rpc")
def rpc_list_tools(self):
"""JSON-RPC list tools."""
payload = _json_rpc_request("tools/list")
with self.client.post(
"/rpc",
json=payload,
headers={**self.auth_headers, "Content-Type": "application/json"},
name="/rpc tools/list",
catch_response=True,
) as response:
self._validate_jsonrpc_response(response)
@task(8)
@tag("realistic", "admin")
def admin_dashboard(self):
"""Load admin dashboard."""
with self.client.get(
"/admin/",
headers=self.admin_headers,
name="/admin/",
catch_response=True,
) as response:
# 200=Success, 502=Bad Gateway (server under high load)
self._validate_status(response)
@task(5)
@tag("realistic", "api")
def get_single_tool(self):
"""Get specific tool."""
if TOOL_IDS:
tool_id = random.choice(TOOL_IDS)
with self.client.get(
f"/tools/{tool_id}",
headers=self.auth_headers,
name="/tools/[id]",
catch_response=True,
) as response:
# 200=Success, 404=Not found, 502=Bad Gateway
self._validate_json_response(response, allowed_codes=[200, 404])
@task(5)
@tag("realistic", "api")
def get_single_server(self):
"""Get specific server."""
if SERVER_IDS:
server_id = random.choice(SERVER_IDS)
with self.client.get(
f"/servers/{server_id}",
headers=self.auth_headers,
name="/servers/[id]",
catch_response=True,
) as response:
# 200=Success, 404=Not found, 502=Bad Gateway
self._validate_json_response(response, allowed_codes=[200, 404])
@task(2)
@tag("realistic", "admin")
def admin_tools_page(self):
"""Admin tools page."""
self.client.get("/admin/tools", headers=self.admin_headers, name="/admin/tools")
# =============================================================================
# Custom Shape (Optional - for advanced load patterns)
# =============================================================================
# Uncomment to use custom load shape instead of fixed user count
#
# from locust import LoadTestShape
#
# class StagesShape(LoadTestShape):
# """Custom load shape with stages: ramp up, sustain, spike, cooldown."""
#
# stages = [
# {"duration": 60, "users": 10, "spawn_rate": 2}, # Warm up
# {"duration": 120, "users": 50, "spawn_rate": 10}, # Ramp up
# {"duration": 180, "users": 50, "spawn_rate": 10}, # Sustain
# {"duration": 200, "users": 100, "spawn_rate": 20}, # Spike
# {"duration": 240, "users": 50, "spawn_rate": 10}, # Recovery
# {"duration": 300, "users": 10, "spawn_rate": 5}, # Cool down
# ]
#
# def tick(self):
# run_time = self.get_run_time()
#
# for stage in self.stages:
# if run_time < stage["duration"]:
# return (stage["users"], stage["spawn_rate"])
#
# return None # Stop test