Skip to main content
Glama
test_performance.py29.8 kB
""" Performance tests for MCP Personal Assistant These tests measure performance characteristics and ensure the system meets the performance targets outlined in CLAUDE.md: - Dashboard response size < 4KB - Response time < 200ms - Support for 1000+ concurrent users - 90%+ search accuracy """ import pytest import pytest_asyncio import asyncio import time import json import statistics import psutil import sys from datetime import datetime, timedelta from typing import List, Dict, Any from concurrent.futures import ThreadPoolExecutor, as_completed import threading from tests.conftest import generate_test_project, generate_test_todo, generate_test_event class TestResponseSizePerformance: """Test response size constraints""" @pytest.mark.asyncio async def test_dashboard_response_size_empty(self, test_client): """Test dashboard response size with empty database""" response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) assert response.status_code == 200 # Measure response size response_text = response.text response_size = len(response_text.encode('utf-8')) # Should be well under 4KB even with empty data assert response_size < 2 * 1024 # 2KB limit for empty dashboard print(f"Empty dashboard response size: {response_size} bytes") @pytest.mark.asyncio async def test_dashboard_response_size_with_data(self, test_client, performance_test_data): """Test dashboard response size with realistic data load""" # Add performance test data projects_added = 0 for project_data in performance_test_data["projects"][:20]: # Add 20 projects response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code == 200: projects_added += 1 # Get dashboard dashboard_response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) assert dashboard_response.status_code == 200 # Measure response size response_text = dashboard_response.text response_size = len(response_text.encode('utf-8')) # Critical requirement: must be under 4KB assert response_size < 4 * 1024, f"Dashboard response size {response_size} bytes exceeds 4KB limit" print(f"Dashboard response size with {projects_added} projects: {response_size} bytes") # Verify response contains intelligent filtering content_text = dashboard_response.json()["content"][0]["text"] dashboard_data = json.loads(content_text) # Should show overview but limited current focus items if "current_focus" in dashboard_data: focus_projects = dashboard_data["current_focus"].get("active_projects", []) focus_todos = dashboard_data["current_focus"].get("priority_todos", []) # Should be limited to small numbers for size control assert len(focus_projects) <= 5 assert len(focus_todos) <= 5 @pytest.mark.asyncio async def test_search_response_size(self, test_client, performance_test_data): """Test search response size limits""" # Add test data for i, project_data in enumerate(performance_test_data["projects"][:10]): response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code != 200: break # Perform search search_response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "semantic_search", "arguments": { "query": "performance test project development", "limit": 10 } } }) assert search_response.status_code == 200 # Measure response size response_size = len(search_response.text.encode('utf-8')) # Search should also be under reasonable size limits assert response_size < 8 * 1024, f"Search response size {response_size} bytes exceeds 8KB limit" print(f"Search response size: {response_size} bytes") class TestResponseTimePerformance: """Test response time requirements""" @pytest.mark.asyncio async def test_dashboard_response_time(self, test_client): """Test dashboard response time under 200ms""" # Warm up test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": {"name": "get_dashboard", "arguments": {}} }) # Measure response time start_time = time.time() response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) end_time = time.time() response_time = (end_time - start_time) * 1000 # Convert to milliseconds assert response.status_code == 200 # Target: < 200ms, but allow more in test environment assert response_time < 500, f"Dashboard response time {response_time:.2f}ms exceeds target" print(f"Dashboard response time: {response_time:.2f}ms") @pytest.mark.asyncio async def test_search_response_time(self, test_client): """Test search response time""" # Add some test data first for i in range(5): test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": generate_test_project( name=f"Search Performance Test {i}", description=f"Project for search performance testing {i}" ) } }) # Warm up test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "semantic_search", "arguments": {"query": "test"} } }) # Measure search response time start_time = time.time() response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "semantic_search", "arguments": { "query": "search performance test project", "limit": 5 } } }) end_time = time.time() response_time = (end_time - start_time) * 1000 assert response.status_code == 200 # Search should be even faster assert response_time < 300, f"Search response time {response_time:.2f}ms exceeds target" print(f"Search response time: {response_time:.2f}ms") @pytest.mark.asyncio async def test_project_creation_time(self, test_client): """Test project creation response time""" project_data = generate_test_project( name="Performance Test Project", description="Testing project creation performance" ) # Measure project creation time start_time = time.time() response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) end_time = time.time() response_time = (end_time - start_time) * 1000 assert response.status_code == 200 assert response_time < 400, f"Project creation time {response_time:.2f}ms exceeds target" print(f"Project creation time: {response_time:.2f}ms") class TestConcurrencyPerformance: """Test concurrent user support""" def test_concurrent_dashboard_requests(self, test_client): """Test handling multiple concurrent dashboard requests""" def make_dashboard_request(request_id: int) -> tuple: start_time = time.time() try: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) end_time = time.time() response_time = (end_time - start_time) * 1000 return (request_id, response.status_code, response_time, len(response.text)) except Exception as e: return (request_id, 500, 0, str(e)) # Test with multiple concurrent requests num_concurrent = 20 with ThreadPoolExecutor(max_workers=num_concurrent) as executor: # Submit all requests futures = [executor.submit(make_dashboard_request, i) for i in range(num_concurrent)] # Collect results results = [] for future in as_completed(futures, timeout=30): results.append(future.result()) # Analyze results successful_requests = [r for r in results if r[1] == 200] failed_requests = [r for r in results if r[1] != 200] success_rate = len(successful_requests) / len(results) response_times = [r[2] for r in successful_requests] response_sizes = [r[3] for r in successful_requests] print(f"Concurrent dashboard requests:") print(f" Total requests: {len(results)}") print(f" Successful: {len(successful_requests)}") print(f" Failed: {len(failed_requests)}") print(f" Success rate: {success_rate:.2%}") if response_times: print(f" Avg response time: {statistics.mean(response_times):.2f}ms") print(f" Max response time: {max(response_times):.2f}ms") print(f" Min response time: {min(response_times):.2f}ms") if response_sizes: print(f" Avg response size: {statistics.mean(response_sizes):.0f} bytes") print(f" Max response size: {max(response_sizes)} bytes") # Requirements assert success_rate >= 0.95, f"Success rate {success_rate:.2%} below 95% threshold" if response_times: avg_response_time = statistics.mean(response_times) assert avg_response_time < 1000, f"Average response time {avg_response_time:.2f}ms too high" if response_sizes: max_response_size = max(response_sizes) assert max_response_size < 8 * 1024, f"Max response size {max_response_size} bytes exceeds limit" def test_concurrent_project_creation(self, test_client): """Test concurrent project creation""" def create_project(project_id: int) -> tuple: start_time = time.time() project_data = generate_test_project( name=f"Concurrent Project {project_id}", description=f"Project created in concurrency test {project_id}", priority="medium" ) try: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) end_time = time.time() response_time = (end_time - start_time) * 1000 return (project_id, response.status_code, response_time) except Exception as e: return (project_id, 500, str(e)) # Create projects concurrently num_concurrent = 15 with ThreadPoolExecutor(max_workers=num_concurrent) as executor: futures = [executor.submit(create_project, i) for i in range(num_concurrent)] results = [future.result() for future in as_completed(futures, timeout=30)] # Analyze results successful_creates = [r for r in results if r[1] == 200] failed_creates = [r for r in results if r[1] != 200] success_rate = len(successful_creates) / len(results) print(f"Concurrent project creation:") print(f" Total requests: {len(results)}") print(f" Successful: {len(successful_creates)}") print(f" Failed: {len(failed_creates)}") print(f" Success rate: {success_rate:.2%}") if successful_creates: response_times = [r[2] for r in successful_creates if isinstance(r[2], (int, float))] if response_times: print(f" Avg response time: {statistics.mean(response_times):.2f}ms") # Should handle most concurrent requests successfully assert success_rate >= 0.90, f"Success rate {success_rate:.2%} below 90% threshold" class TestMemoryPerformance: """Test memory usage performance""" def test_memory_usage_baseline(self, test_client): """Test baseline memory usage""" process = psutil.Process() # Get baseline memory baseline_memory = process.memory_info().rss / 1024 / 1024 # MB print(f"Baseline memory usage: {baseline_memory:.2f} MB") # Memory should be reasonable for a test environment assert baseline_memory < 500, f"Baseline memory {baseline_memory:.2f} MB too high" def test_memory_usage_with_data(self, test_client, performance_test_data): """Test memory usage after adding data""" process = psutil.Process() # Get initial memory initial_memory = process.memory_info().rss / 1024 / 1024 # Add test data projects_added = 0 for project_data in performance_test_data["projects"][:50]: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code == 200: projects_added += 1 # Get memory after adding data after_data_memory = process.memory_info().rss / 1024 / 1024 memory_increase = after_data_memory - initial_memory print(f"Memory usage after adding {projects_added} projects:") print(f" Initial: {initial_memory:.2f} MB") print(f" After data: {after_data_memory:.2f} MB") print(f" Increase: {memory_increase:.2f} MB") print(f" Per project: {memory_increase/projects_added:.3f} MB" if projects_added > 0 else "") # Memory increase should be reasonable assert memory_increase < 100, f"Memory increase {memory_increase:.2f} MB too high" def test_memory_usage_dashboard_requests(self, test_client): """Test memory usage during dashboard requests""" process = psutil.Process() # Get initial memory initial_memory = process.memory_info().rss / 1024 / 1024 # Make multiple dashboard requests for i in range(20): response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) assert response.status_code == 200 # Get memory after requests after_requests_memory = process.memory_info().rss / 1024 / 1024 memory_increase = after_requests_memory - initial_memory print(f"Memory usage after 20 dashboard requests:") print(f" Initial: {initial_memory:.2f} MB") print(f" After requests: {after_requests_memory:.2f} MB") print(f" Increase: {memory_increase:.2f} MB") # Should not have significant memory leaks assert memory_increase < 50, f"Memory increase {memory_increase:.2f} MB suggests memory leak" class TestScalabilityPerformance: """Test scalability characteristics""" @pytest.mark.asyncio async def test_dashboard_performance_scaling(self, test_client, performance_test_data): """Test how dashboard performance scales with data size""" response_times = [] response_sizes = [] data_sizes = [0, 10, 25, 50] for data_size in data_sizes: # Add projects up to target size current_projects = 0 for project_data in performance_test_data["projects"][:data_size]: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code == 200: current_projects += 1 # Measure dashboard performance start_time = time.time() dashboard_response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) end_time = time.time() response_time = (end_time - start_time) * 1000 if dashboard_response.status_code == 200: response_size = len(dashboard_response.text.encode('utf-8')) response_times.append((current_projects, response_time)) response_sizes.append((current_projects, response_size)) print("Dashboard Performance Scaling:") print("Projects | Response Time (ms) | Response Size (bytes)") print("-" * 50) for i, (projects, time_ms) in enumerate(response_times): size = response_sizes[i][1] if i < len(response_sizes) else 0 print(f"{projects:8} | {time_ms:15.2f} | {size:18}") # Verify that performance doesn't degrade significantly if len(response_times) >= 2: # Response time should not grow linearly with data first_time = response_times[0][1] last_time = response_times[-1][1] time_growth_factor = last_time / first_time if first_time > 0 else 1 # Should not grow more than 3x even with 50x more data assert time_growth_factor < 3.0, f"Response time grew {time_growth_factor:.2f}x with data growth" # Response size should remain bounded if response_sizes: max_size = max(size for _, size in response_sizes) assert max_size < 4 * 1024, f"Response size {max_size} exceeds 4KB limit" @pytest.mark.asyncio async def test_search_performance_scaling(self, test_client, performance_test_data): """Test how search performance scales with data size""" search_times = [] data_sizes = [5, 15, 30] for data_size in data_sizes: # Add projects projects_added = 0 for project_data in performance_test_data["projects"][:data_size]: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code == 200: projects_added += 1 # Measure search performance start_time = time.time() search_response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "semantic_search", "arguments": { "query": "performance test project development", "limit": 5 } } }) end_time = time.time() search_time = (end_time - start_time) * 1000 if search_response.status_code == 200: search_times.append((projects_added, search_time)) print("Search Performance Scaling:") print("Projects | Search Time (ms)") print("-" * 30) for projects, time_ms in search_times: print(f"{projects:8} | {time_ms:13.2f}") # Search should remain fast even with more data if search_times: max_search_time = max(time_ms for _, time_ms in search_times) assert max_search_time < 500, f"Search time {max_search_time:.2f}ms too high with scaled data" class TestBenchmarks: """Benchmark tests for performance profiling""" @pytest.mark.benchmark def test_dashboard_benchmark(self, benchmark, test_client): """Benchmark dashboard performance""" def get_dashboard(): response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) assert response.status_code == 200 return response result = benchmark(get_dashboard) # Verify response content_text = result.json()["content"][0]["text"] dashboard_data = json.loads(content_text) assert "type" in dashboard_data print(f"Dashboard benchmark completed") @pytest.mark.benchmark def test_project_creation_benchmark(self, benchmark, test_client): """Benchmark project creation performance""" counter = {"count": 0} def create_project(): project_data = generate_test_project( name=f"Benchmark Project {counter['count']}", description=f"Project created in benchmark test {counter['count']}" ) counter["count"] += 1 response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) assert response.status_code == 200 return response result = benchmark(create_project) # Verify project was created content_text = result.json()["content"][0]["text"] result_data = json.loads(content_text) assert "project" in result_data print(f"Project creation benchmark completed") @pytest.mark.benchmark def test_search_benchmark(self, test_client, benchmark): """Benchmark search performance""" # Add some test data first for i in range(10): test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": generate_test_project( name=f"Search Benchmark Project {i}", description=f"Project for search benchmarking {i}" ) } }) def search_projects(): response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "semantic_search", "arguments": { "query": "benchmark project search test", "limit": 5 } } }) assert response.status_code == 200 return response result = benchmark(search_projects) # Verify search results content_text = result.json()["content"][0]["text"] search_data = json.loads(content_text) assert "results" in search_data print(f"Search benchmark completed") class TestPerformanceTargets: """Test specific performance targets from CLAUDE.md""" def test_token_efficiency_target(self, test_client, performance_test_data): """Test that responses meet token efficiency targets""" # Add significant amount of data projects_added = 0 for project_data in performance_test_data["projects"][:30]: response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "add_project", "arguments": project_data } }) if response.status_code == 200: projects_added += 1 print(f"Added {projects_added} projects for token efficiency test") # Test dashboard token efficiency dashboard_response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) assert dashboard_response.status_code == 200 # Calculate token efficiency metrics response_size = len(dashboard_response.text.encode('utf-8')) # Estimate token count (roughly 4 characters per token) estimated_tokens = response_size // 4 print(f"Dashboard response metrics:") print(f" Response size: {response_size} bytes") print(f" Estimated tokens: {estimated_tokens}") print(f" Projects in database: {projects_added}") # Critical target: < 4KB response size assert response_size < 4 * 1024, f"Response size {response_size} exceeds 4KB token limit" # Should be dramatically more efficient than returning all data theoretical_full_size = projects_added * 500 # Estimate 500 bytes per full project efficiency_ratio = theoretical_full_size / response_size if response_size > 0 else 1 print(f" Token efficiency ratio: {efficiency_ratio:.1f}x improvement over full data") # Should be at least 5x more efficient than returning everything assert efficiency_ratio >= 5.0, f"Token efficiency ratio {efficiency_ratio:.1f}x below target" def test_90_percent_response_time_target(self, test_client): """Test that 90% of requests meet response time targets""" response_times = [] # Make multiple requests to get statistical sample for i in range(50): start_time = time.time() response = test_client.post("/mcp/tools/call", json={ "method": "tools/call", "params": { "name": "get_dashboard", "arguments": {} } }) end_time = time.time() response_time = (end_time - start_time) * 1000 if response.status_code == 200: response_times.append(response_time) # Calculate 90th percentile if response_times: response_times.sort() percentile_90_index = int(len(response_times) * 0.9) percentile_90_time = response_times[percentile_90_index] avg_time = statistics.mean(response_times) min_time = min(response_times) max_time = max(response_times) print(f"Response time analysis (50 requests):") print(f" Average: {avg_time:.2f}ms") print(f" Minimum: {min_time:.2f}ms") print(f" Maximum: {max_time:.2f}ms") print(f" 90th percentile: {percentile_90_time:.2f}ms") # Target: 90th percentile under 500ms in test environment assert percentile_90_time < 500, f"90th percentile response time {percentile_90_time:.2f}ms exceeds target" # 95% should be under target fast_requests = len([t for t in response_times if t < 500]) success_rate = fast_requests / len(response_times) assert success_rate >= 0.90, f"Only {success_rate:.2%} of requests met response time target"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/swapnilsurdi/mcp-pa'

If you have feedback or need assistance with the MCP directory API, please join our Discord server