locustfile.py•10.8 kB
"""
Load testing script for MCP KYC Server using Locust.
Usage:
locust -f tests/load/locustfile.py --host=http://localhost:8000
# With specific users and spawn rate:
locust -f tests/load/locustfile.py --host=http://localhost:8000 --users 100 --spawn-rate 10
# Headless mode with duration:
locust -f tests/load/locustfile.py --host=http://localhost:8000 --users 50 --spawn-rate 5 --run-time 5m --headless
"""
import json
import random
import time
from locust import HttpUser, task, between, events
from locust.exception import StopUser
class MCPKYCUser(HttpUser):
"""
Simulated user for MCP KYC Server load testing.
"""
# Wait time between tasks (1-3 seconds)
wait_time = between(1, 3)
# Test data
valid_pans = [
"ABCDE1234F",
"XYZPQ5678R",
"LMNOP9012S",
"DEFGH3456K",
"PQRST7890U"
]
valid_names = [
"John Doe",
"Jane Smith",
"Rajesh Kumar",
"Priya Sharma",
"Michael Brown"
]
valid_dobs = [
"01/01/1990",
"15/06/1985",
"31/12/2000",
"10/03/1995",
"25/08/1988"
]
valid_aadhaar = [
"123456789012",
"987654321098",
"555555555555",
"111111111111",
"999999999999"
]
def on_start(self):
"""Called when a simulated user starts."""
self.request_count = 0
self.error_count = 0
@task(3)
def verify_pan(self):
"""
Test PAN verification tool.
Weight: 3 (60% of requests)
"""
payload = {
"tool": "verify_pan",
"params": {
"pan": random.choice(self.valid_pans),
"name_as_per_pan": random.choice(self.valid_names),
"date_of_birth": random.choice(self.valid_dobs),
"consent": "Y",
"reason": "Load testing"
}
}
with self.client.post(
"/tools/execute",
json=payload,
catch_response=True,
name="PAN Verification"
) as response:
self.request_count += 1
if response.status_code == 200:
try:
data = response.json()
if "result" in data or "pan" in data:
response.success()
else:
response.failure("Invalid response format")
self.error_count += 1
except json.JSONDecodeError:
response.failure("Invalid JSON response")
self.error_count += 1
elif response.status_code == 429:
response.failure("Rate limit exceeded")
self.error_count += 1
else:
response.failure(f"HTTP {response.status_code}")
self.error_count += 1
@task(2)
def check_pan_aadhaar_link(self):
"""
Test PAN-Aadhaar link check tool.
Weight: 2 (40% of requests)
"""
# Generate individual PAN (4th character must be 'P')
pan_prefix = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=3))
pan_suffix = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=1))
pan_number = ''.join(random.choices('0123456789', k=4))
pan = f"{pan_prefix}P{pan_suffix}{pan_number}A"
payload = {
"tool": "check_pan_aadhaar_link",
"params": {
"pan": pan,
"aadhaar_number": random.choice(self.valid_aadhaar),
"consent": "Y",
"reason": "Load testing"
}
}
with self.client.post(
"/tools/execute",
json=payload,
catch_response=True,
name="PAN-Aadhaar Link Check"
) as response:
self.request_count += 1
if response.status_code == 200:
try:
data = response.json()
if "linked" in data or "status" in data:
response.success()
else:
response.failure("Invalid response format")
self.error_count += 1
except json.JSONDecodeError:
response.failure("Invalid JSON response")
self.error_count += 1
elif response.status_code == 429:
response.failure("Rate limit exceeded")
self.error_count += 1
else:
response.failure(f"HTTP {response.status_code}")
self.error_count += 1
@task(1)
def list_tools(self):
"""
Test listing available tools.
Weight: 1 (occasional requests)
"""
with self.client.get(
"/tools",
catch_response=True,
name="List Tools"
) as response:
if response.status_code == 200:
try:
data = response.json()
if isinstance(data, list) and len(data) > 0:
response.success()
else:
response.failure("No tools returned")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"HTTP {response.status_code}")
@task(1)
def health_check(self):
"""
Test health check endpoint.
Weight: 1 (occasional requests)
"""
with self.client.get(
"/health",
catch_response=True,
name="Health Check"
) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"HTTP {response.status_code}")
class BurstTrafficUser(HttpUser):
"""
Simulated user for burst traffic testing.
Makes rapid successive requests to test rate limiting.
"""
wait_time = between(0.1, 0.5) # Very short wait time
@task
def burst_requests(self):
"""Make rapid requests to test rate limiting."""
payload = {
"tool": "verify_pan",
"params": {
"pan": "ABCDE1234F",
"name_as_per_pan": "Test User",
"date_of_birth": "01/01/1990",
"consent": "Y",
"reason": "Burst test"
}
}
with self.client.post(
"/tools/execute",
json=payload,
catch_response=True,
name="Burst Request"
) as response:
if response.status_code == 429:
# Rate limit is working
response.success()
elif response.status_code == 200:
response.success()
else:
response.failure(f"HTTP {response.status_code}")
class CacheTestUser(HttpUser):
"""
Simulated user for cache testing.
Makes repeated identical requests to test caching.
"""
wait_time = between(0.5, 1)
def on_start(self):
"""Set up fixed test data."""
self.test_pan = "ABCDE1234F"
self.test_name = "Cache Test User"
self.test_dob = "01/01/1990"
@task
def cached_request(self):
"""Make repeated identical requests to test caching."""
payload = {
"tool": "verify_pan",
"params": {
"pan": self.test_pan,
"name_as_per_pan": self.test_name,
"date_of_birth": self.test_dob,
"consent": "Y",
"reason": "Cache test"
}
}
start_time = time.time()
with self.client.post(
"/tools/execute",
json=payload,
catch_response=True,
name="Cached Request"
) as response:
response_time = (time.time() - start_time) * 1000 # Convert to ms
if response.status_code == 200:
# Cached responses should be faster
if response_time < 100: # Less than 100ms suggests cache hit
response.success()
else:
response.success() # Still success, just not cached
else:
response.failure(f"HTTP {response.status_code}")
# Event handlers for custom metrics
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
"""Called when load test starts."""
print("Load test starting...")
print(f"Target host: {environment.host}")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
"""Called when load test stops."""
print("\nLoad test completed!")
print(f"Total requests: {environment.stats.total.num_requests}")
print(f"Total failures: {environment.stats.total.num_failures}")
print(f"Average response time: {environment.stats.total.avg_response_time:.2f}ms")
print(f"Requests per second: {environment.stats.total.total_rps:.2f}")
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
"""Called for each request."""
# Log slow requests
if response_time > 5000: # 5 seconds
print(f"SLOW REQUEST: {name} took {response_time:.2f}ms")
# Log errors
if exception:
print(f"ERROR: {name} - {exception}")
# Custom load test shapes
from locust import LoadTestShape
class StepLoadShape(LoadTestShape):
"""
Step load pattern: gradually increase users in steps.
Steps:
- 0-60s: 10 users
- 60-120s: 25 users
- 120-180s: 50 users
- 180-240s: 75 users
- 240-300s: 100 users
"""
step_time = 60
step_load = 10
spawn_rate = 5
time_limit = 300
def tick(self):
run_time = self.get_run_time()
if run_time > self.time_limit:
return None
current_step = run_time // self.step_time
return (self.step_load * (current_step + 1), self.spawn_rate)
class SpikeLoadShape(LoadTestShape):
"""
Spike load pattern: sudden traffic spikes.
Pattern:
- 0-30s: 10 users
- 30-60s: 100 users (spike)
- 60-90s: 10 users
- 90-120s: 100 users (spike)
"""
def tick(self):
run_time = self.get_run_time()
if run_time > 120:
return None
if 30 <= run_time < 60 or 90 <= run_time < 120:
return (100, 20) # Spike
else:
return (10, 5) # Normal