"""
Performance tests for Percepta MCP server.
Tests system performance under various load conditions.
"""
import asyncio
import pytest
import time
from unittest.mock import AsyncMock, Mock, patch
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any
from src.percepta_mcp.server import PerceptaMCPServer
from src.percepta_mcp.config import Settings, AIProviderConfig
@pytest.fixture
def performance_settings():
"""Create settings optimized for performance testing."""
return Settings(
ai_providers=[
AIProviderConfig(
name="fast-provider",
type="openai",
api_key="test-key",
model="gpt-3.5-turbo",
priority=1,
enabled=True
)
],
default_provider="fast-provider"
)
@pytest.fixture
async def performance_server(performance_settings):
"""Create server instance for performance testing."""
with patch('src.percepta_mcp.server.get_ai_router') as mock_router:
mock_ai_router = AsyncMock()
mock_router.return_value = mock_ai_router
server = PerceptaMCPServer(performance_settings)
yield server, mock_ai_router
try:
await server.stop()
except Exception:
pass
class TestPerformance:
"""Performance test cases."""
@pytest.mark.asyncio
async def test_concurrent_browser_operations(self, performance_server):
"""Test performance of concurrent browser operations."""
server, mock_ai_router = performance_server
# Mock browser operations with realistic delays
async def mock_navigate(*args, **kwargs):
await asyncio.sleep(0.1) # Simulate browser startup time
return {"success": True, "url": kwargs.get("url", "https://example.com")}
async def mock_extract_text(*args, **kwargs):
await asyncio.sleep(0.05) # Simulate text extraction time
return "Sample extracted text content"
async def mock_screenshot(*args, **kwargs):
await asyncio.sleep(0.2) # Simulate screenshot capture time
return {"success": True, "screenshot": "base64_image_data"}
with patch.object(server.browser_automation, 'navigate', side_effect=mock_navigate), \
patch.object(server.browser_automation, 'extract_text', side_effect=mock_extract_text), \
patch.object(server.browser_automation, 'screenshot', side_effect=mock_screenshot):
# Create concurrent operations
tasks = []
start_time = time.time()
# Simulate 10 concurrent browser sessions
for i in range(10):
task_group = [
server._execute_tool("browser_navigate", {"url": f"https://example{i}.com"}),
server._execute_tool("browser_extract_text", {"selector": ".content"}),
server._execute_tool("browser_screenshot", {"full_page": True})
]
tasks.extend(task_group)
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
# Performance assertions
execution_time = end_time - start_time
assert execution_time < 5.0, f"Concurrent operations took too long: {execution_time}s"
# Verify all operations completed successfully
successful_results = [r for r in results if not isinstance(r, Exception)]
assert len(successful_results) == 30 # 10 * 3 operations
# Calculate operations per second
ops_per_second = len(successful_results) / execution_time
assert ops_per_second > 10, f"Operations per second too low: {ops_per_second}"
@pytest.mark.asyncio
async def test_ai_router_performance(self, performance_server):
"""Test AI router performance under load."""
server, mock_ai_router = performance_server
from src.percepta_mcp.ai_router import AIResponse
# Mock fast AI responses
async def mock_generate(*args, **kwargs):
await asyncio.sleep(0.1) # Simulate AI processing time
return AIResponse(
content="AI response content",
provider="fast-provider",
model="gpt-3.5-turbo",
tokens_used=50,
cost=0.001,
response_time=0.1,
error=None
)
mock_ai_router.generate.side_effect = mock_generate
# Test concurrent AI requests
start_time = time.time()
tasks = [
server._execute_tool("ai_analyze", {
"content": f"Content {i}",
"task": f"Analyze content {i}"
})
for i in range(20)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
execution_time = end_time - start_time
successful_results = [r for r in results if not isinstance(r, Exception)]
# Performance assertions
assert execution_time < 3.0, f"AI operations took too long: {execution_time}s"
assert len(successful_results) == 20
# Verify response quality
for result in successful_results:
assert result["analysis"] == "AI response content"
assert result["response_time"] == 0.1
@pytest.mark.asyncio
async def test_devtools_monitoring_performance(self, performance_server):
"""Test DevTools monitoring performance."""
server, mock_ai_router = performance_server
# Mock DevTools operations
async def mock_start_monitoring(*args, **kwargs):
await asyncio.sleep(0.05)
return {"success": True, "monitoring_active": True}
async def mock_collect_data(*args, **kwargs):
await asyncio.sleep(0.03)
return {
"success": True,
"data": {
"console_logs": [{"type": "log", "message": "test"}] * 100,
"network_logs": [{"url": "test", "status": 200}] * 50,
"performance_metrics": {"domContentLoaded": 1000}
}
}
async def mock_analyze_anomalies(*args, **kwargs):
await asyncio.sleep(0.02)
return {
"success": True,
"anomalies": [],
"summary": {"total_anomalies": 0}
}
with patch.object(server.devtools_analyzer, 'start_monitoring', side_effect=mock_start_monitoring), \
patch.object(server.devtools_analyzer, 'collect_devtools_data', side_effect=mock_collect_data), \
patch.object(server.devtools_analyzer, 'analyze_anomalies', side_effect=mock_analyze_anomalies):
# Test multiple monitoring sessions
start_time = time.time()
tasks = []
for i in range(5):
# Each monitoring session includes start, collect, and analyze
session_tasks = [
server._execute_tool("start_devtools_monitoring", {"url": f"https://site{i}.com"}),
server._execute_tool("collect_devtools_data", {}),
server._execute_tool("analyze_devtools_anomalies", {})
]
tasks.extend(session_tasks)
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
execution_time = end_time - start_time
successful_results = [r for r in results if not isinstance(r, Exception)]
# Performance assertions
assert execution_time < 2.0, f"DevTools monitoring took too long: {execution_time}s"
assert len(successful_results) == 15 # 5 * 3 operations
@pytest.mark.asyncio
async def test_test_generation_performance(self, performance_server):
"""Test test generation performance."""
server, mock_ai_router = performance_server
from src.percepta_mcp.ai_router import AIResponse
# Mock fast test generation
test_script = """
import pytest
def test_example(): assert True
"""
async def mock_generate(*args, **kwargs):
await asyncio.sleep(0.15) # Simulate AI test generation time
return AIResponse(
content=test_script,
provider="fast-provider",
model="gpt-3.5-turbo",
tokens_used=100,
cost=0.002,
response_time=0.15,
error=None
)
mock_ai_router.generate.side_effect = mock_generate
# Mock browser operations for test generation
with patch.object(server.browser_automation, 'navigate') as mock_navigate, \
patch.object(server.browser_automation, 'get_page_info') as mock_page_info, \
patch.object(server.browser_automation, 'extract_text') as mock_extract:
mock_navigate.return_value = {"success": True}
mock_page_info.return_value = {"title": "Test Page", "url": "https://example.com"}
mock_extract.return_value = "Page content"
# Test concurrent test generation
start_time = time.time()
tasks = [
server._execute_tool("generate_test_case", {
"url": f"https://example{i}.com",
"description": f"Test case {i}",
"test_type": "navigation"
})
for i in range(10)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
execution_time = end_time - start_time
successful_results = [r for r in results if not isinstance(r, Exception)]
# Performance assertions
assert execution_time < 3.0, f"Test generation took too long: {execution_time}s"
assert len(successful_results) == 10
# Verify test quality
for result in successful_results:
assert result["success"] is True
assert "test_case" in result
assert "import pytest" in result["test_case"]["script"]
@pytest.mark.asyncio
async def test_memory_usage_stability(self, performance_server):
"""Test memory usage stability during extended operations."""
server, mock_ai_router = performance_server
import gc
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Mock lightweight operations
with patch.object(server.browser_automation, 'extract_text') as mock_extract:
mock_extract.return_value = "Sample text content"
# Perform many operations to test memory stability
for batch in range(10):
tasks = [
server._execute_tool("browser_extract_text", {"selector": f".content{i}"})
for i in range(20)
]
await asyncio.gather(*tasks, return_exceptions=True)
# Force garbage collection
gc.collect()
# Check memory usage
current_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = current_memory - initial_memory
# Memory should not increase excessively
assert memory_increase < 100, f"Memory usage increased too much: {memory_increase}MB"
# Final memory check
final_memory = process.memory_info().rss / 1024 / 1024 # MB
total_increase = final_memory - initial_memory
assert total_increase < 50, f"Total memory increase too high: {total_increase}MB"
@pytest.mark.asyncio
async def test_error_handling_performance(self, performance_server):
"""Test performance when handling errors."""
server, mock_ai_router = performance_server
# Mock operations that fail fast
async def mock_failing_operation(*args, **kwargs):
await asyncio.sleep(0.01) # Fast failure
raise Exception("Simulated failure")
with patch.object(server.browser_automation, 'navigate', side_effect=mock_failing_operation):
start_time = time.time()
# Test many failing operations
tasks = [
server._execute_tool("browser_navigate", {"url": f"https://fail{i}.com"})
for i in range(50)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
execution_time = end_time - start_time
# Error handling should be fast
assert execution_time < 1.0, f"Error handling took too long: {execution_time}s"
# All operations should fail as expected
exceptions = [r for r in results if isinstance(r, Exception)]
assert len(exceptions) == 50
@pytest.mark.asyncio
async def test_scalability_limits(self, performance_server):
"""Test system behavior at scalability limits."""
server, mock_ai_router = performance_server
# Mock very fast operations
async def mock_fast_operation(*args, **kwargs):
await asyncio.sleep(0.001)
return {"success": True, "data": "result"}
with patch.object(server.browser_automation, 'extract_text', side_effect=mock_fast_operation):
# Test with increasing load
load_levels = [10, 50, 100, 200]
performance_results = []
for load in load_levels:
start_time = time.time()
tasks = [
server._execute_tool("browser_extract_text", {"selector": f".content{i}"})
for i in range(load)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
execution_time = end_time - start_time
successful_ops = len([r for r in results if not isinstance(r, Exception)])
ops_per_second = successful_ops / execution_time
performance_results.append({
"load": load,
"execution_time": execution_time,
"ops_per_second": ops_per_second,
"success_rate": successful_ops / load
})
# Verify performance doesn't degrade dramatically
for i in range(1, len(performance_results)):
prev_ops = performance_results[i-1]["ops_per_second"]
curr_ops = performance_results[i]["ops_per_second"]
# Performance shouldn't drop by more than 50% as load doubles
degradation = (prev_ops - curr_ops) / prev_ops
assert degradation < 0.5, f"Performance degraded too much: {degradation*100}%"
# Success rate should remain high
for result in performance_results:
assert result["success_rate"] > 0.95, f"Success rate too low: {result['success_rate']}"
@pytest.mark.asyncio
async def test_resource_cleanup_performance(self, performance_server):
"""Test performance of resource cleanup operations."""
server, mock_ai_router = performance_server
# Mock resource-intensive operations
resources = []
async def mock_create_resource(*args, **kwargs):
resource = {"id": len(resources), "data": "resource_data"}
resources.append(resource)
await asyncio.sleep(0.05)
return {"success": True, "resource_id": resource["id"]}
async def mock_cleanup_resource(*args, **kwargs):
if resources:
resource = resources.pop()
await asyncio.sleep(0.02)
return {"success": True}
with patch.object(server.browser_automation, 'navigate', side_effect=mock_create_resource), \
patch.object(server.browser_automation, 'close', side_effect=mock_cleanup_resource):
# Create many resources
create_start = time.time()
create_tasks = [
server._execute_tool("browser_navigate", {"url": f"https://resource{i}.com"})
for i in range(20)
]
await asyncio.gather(*create_tasks)
create_time = time.time() - create_start
# Test cleanup performance
cleanup_start = time.time()
# Manually call close multiple times to clean up resources
for _ in range(len(resources)):
await server.browser_automation.close()
cleanup_time = time.time() - cleanup_start
# Cleanup should be reasonably fast
assert cleanup_time < 2.0, f"Cleanup took too long: {cleanup_time}s"
# All resources should be cleaned up
assert len(resources) == 0, f"Resources not properly cleaned up: {len(resources)}"