Skip to main content
Glama

WorkFlowy MCP Server

by daniel347x
test_response_times.py8.49 kB
"""Performance tests for API operations.""" import asyncio import statistics import time from unittest.mock import AsyncMock, patch import pytest from workflowy_mcp.models import WorkFlowyNode from workflowy_mcp.server import ( create_node, get_node, list_nodes, ) class TestResponseTimes: """Test that API operations meet performance requirements (<500ms).""" @pytest.mark.asyncio async def test_create_node_performance(self): """Test create_node response time.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client # Mock fast API response mock_client.create_node.return_value = WorkFlowyNode( id="test-123", name="Test Node", createdAt=int(time.time()), modifiedAt=int(time.time()), ) # Measure response time start = time.perf_counter() result = await create_node.fn(name="Test Node") elapsed = (time.perf_counter() - start) * 1000 # Convert to ms assert result.id == "test-123" assert result.name == "Test Node" assert elapsed < 500, f"Response time {elapsed:.2f}ms exceeds 500ms limit" @pytest.mark.asyncio async def test_get_node_performance(self): """Test get_node response time.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_client.get_node.return_value = WorkFlowyNode( id="test-123", name="Test Node", createdAt=int(time.time()), modifiedAt=int(time.time()), ) start = time.perf_counter() result = await get_node.fn(node_id="test-123") elapsed = (time.perf_counter() - start) * 1000 assert result.id == "test-123" assert elapsed < 500, f"Response time {elapsed:.2f}ms exceeds 500ms limit" @pytest.mark.asyncio async def test_list_nodes_performance(self): """Test list_nodes response time with pagination.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client # Mock list of nodes mock_nodes = [ WorkFlowyNode( id=f"node-{i}", nm=f"Node {i}", createdAt=int(time.time()), modifiedAt=int(time.time()), ) for i in range(100) ] mock_client.list_nodes.return_value = (mock_nodes[:50], 100) start = time.perf_counter() result = await list_nodes.fn() elapsed = (time.perf_counter() - start) * 1000 assert "nodes" in result assert result["total"] == 100 assert len(result["nodes"]) == 50 assert elapsed < 500, f"Response time {elapsed:.2f}ms exceeds 500ms limit" @pytest.mark.asyncio async def test_concurrent_operations_performance(self): """Test performance under concurrent load.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client # Mock all operations mock_node = WorkFlowyNode( id="test", name="Node", createdAt=int(time.time()), modifiedAt=int(time.time()) ) mock_client.get_node.return_value = mock_node mock_client.list_nodes.return_value = ([mock_node], 1) # Run concurrent operations async def operation(op_type: str): start = time.perf_counter() if op_type == "get": await get_node.fn(node_id="test-123") elif op_type == "list": await list_nodes.fn() return (time.perf_counter() - start) * 1000 # Run concurrent operations tasks = [] for _ in range(5): tasks.extend( [ operation("get"), operation("list"), ] ) response_times = await asyncio.gather(*tasks) # All should complete within 500ms assert all( t < 500 for t in response_times ), f"Some operations exceeded 500ms: {[t for t in response_times if t >= 500]}" # Average should be well under 500ms avg_time = statistics.mean(response_times) assert avg_time < 300, f"Average response time {avg_time:.2f}ms is too high" @pytest.mark.asyncio async def test_bulk_operation_performance(self): """Test performance of bulk operations.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_client.create_node.return_value = WorkFlowyNode( id="test", name="Node", createdAt=int(time.time()), modifiedAt=int(time.time()) ) # Create 50 nodes sequentially start = time.perf_counter() for i in range(50): await create_node.fn(name=f"Node {i}") total_elapsed = (time.perf_counter() - start) * 1000 # Average per operation avg_per_op = total_elapsed / 50 # Each operation should average under 100ms for bulk assert avg_per_op < 100, f"Average per operation {avg_per_op:.2f}ms is too high" @pytest.mark.asyncio async def test_response_time_percentiles(self): """Test response time percentiles (p50, p95, p99).""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client mock_client.get_node.return_value = WorkFlowyNode( id="test", name="Node", createdAt=int(time.time()), modifiedAt=int(time.time()) ) # Collect response times response_times = [] for _ in range(100): start = time.perf_counter() await get_node.fn(node_id="test-123") elapsed = (time.perf_counter() - start) * 1000 response_times.append(elapsed) # Calculate percentiles sorted_times = sorted(response_times) p50 = sorted_times[50] # Median p95 = sorted_times[95] p99 = sorted_times[99] # Performance requirements assert p50 < 200, f"p50 {p50:.2f}ms exceeds 200ms" assert p95 < 400, f"p95 {p95:.2f}ms exceeds 400ms" assert p99 < 500, f"p99 {p99:.2f}ms exceeds 500ms" @pytest.mark.asyncio async def test_cache_performance(self): """Test that caching improves performance.""" with patch("workflowy_mcp.server.get_client") as mock_get_client: mock_client = AsyncMock() mock_get_client.return_value = mock_client # Simulate cache behavior - first call slower call_count = 0 async def mock_get_node(*_args, **_kwargs): nonlocal call_count call_count += 1 if call_count == 1: await asyncio.sleep(0.1) # Simulate network delay return WorkFlowyNode( id="test", name="Node", createdAt=int(time.time()), modifiedAt=int(time.time()) ) mock_client.get_node = mock_get_node # First call (cache miss) start1 = time.perf_counter() await get_node.fn(node_id="test-123") time1 = (time.perf_counter() - start1) * 1000 # Second call (should be cached) start2 = time.perf_counter() await get_node.fn(node_id="test-123") time2 = (time.perf_counter() - start2) * 1000 # Second call should be significantly faster assert ( time2 < time1 / 2 ), f"Cache not improving performance: {time1:.2f}ms vs {time2:.2f}ms"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server