Skip to main content
Glama

WorkFlowy MCP Server

by daniel347x
conftest.py6.07 kB
"""Configuration for integration tests.""" from unittest.mock import AsyncMock, patch import pytest from workflowy_mcp.models import WorkFlowyNode @pytest.fixture def mock_workflowy_client(): """Create a properly configured mock WorkFlowy client for integration tests.""" from workflowy_mcp.client import WorkFlowyClient client = AsyncMock(spec=WorkFlowyClient) # Storage for created nodes to simulate stateful behavior # IMPORTANT: These are reset for each test to avoid cross-test contamination created_nodes = {} deleted_nodes = set() # Track deleted nodes node_counter = [0] # Using list to avoid closure issues parent_child_map = {} # Track parent-child relationships async def mock_create_node(request): """Mock create_node with unique IDs for each call.""" node_counter[0] += 1 node_id = f"node-{node_counter[0]:03d}" node = WorkFlowyNode( id=node_id, name=getattr(request, "name", None) or f"Node {node_counter[0]}", note=getattr(request, "note", ""), completedAt=None, createdAt=1704067200, modifiedAt=1704067200, ) created_nodes[node_id] = node # Track parent-child relationship if hasattr(request, "parent_id") and request.parent_id: if request.parent_id not in parent_child_map: parent_child_map[request.parent_id] = [] parent_child_map[request.parent_id].append(node_id) return node async def mock_get_node(node_id): """Mock get_node that returns the requested node.""" from workflowy_mcp.models import NodeNotFoundError if node_id in deleted_nodes: raise NodeNotFoundError(f"Node {node_id} not found") if node_id in created_nodes: return created_nodes[node_id] # Return a default node if not found return WorkFlowyNode( id=node_id, name="Test Node", note="Test note", completedAt=None, createdAt=1704067200, modifiedAt=1704067200, ) async def mock_update_node(node_id, request): """Mock update_node that updates the node.""" if node_id in created_nodes: node = created_nodes[node_id] if hasattr(request, "name") and request.name is not None: node.name = request.name if hasattr(request, "note") and request.note is not None: node.note = request.note return node # Return updated node even if not in storage return WorkFlowyNode( id=node_id, name=getattr(request, "name", "Updated Node"), note=getattr(request, "note", "Updated note"), completedAt=None, createdAt=1704067200, modifiedAt=1704067200, ) async def mock_complete_node(node_id): """Mock complete_node that marks node as completed.""" if node_id in created_nodes: node = created_nodes[node_id] node.completedAt = 1704067200 return node return WorkFlowyNode( id=node_id, name="Test Node", note="Test note", completedAt=1704067200, createdAt=1704067200, modifiedAt=1704067200, ) async def mock_uncomplete_node(node_id): """Mock uncomplete_node that marks node as uncompleted.""" if node_id in created_nodes: node = created_nodes[node_id] node.completedAt = None return node return WorkFlowyNode( id=node_id, name="Test Node", note="Test note", completedAt=None, createdAt=1704067200, modifiedAt=1704067200, ) async def mock_list_nodes(request): """Mock list_nodes that returns appropriate nodes.""" # Start with all nodes or empty list if not created_nodes: # Return empty for tests that haven't created anything return ([], 0) nodes = list(created_nodes.values()) # Filter by parent_id if provided (request has parentId field) if hasattr(request, "parentId") and request.parentId: # Only return children of the specified parent child_ids = parent_child_map.get(request.parentId, []) nodes = [n for n in nodes if n.id in child_ids] total = len(nodes) return (nodes, total) async def mock_delete_node(node_id): """Mock delete_node that marks node as deleted.""" deleted_nodes.add(node_id) if node_id in created_nodes: del created_nodes[node_id] # Clean up parent-child relationships for parent_id in list(parent_child_map.keys()): if node_id in parent_child_map[parent_id]: parent_child_map[parent_id].remove(node_id) if not parent_child_map[parent_id]: del parent_child_map[parent_id] return True # Set up mock methods client.create_node.side_effect = mock_create_node client.get_node.side_effect = mock_get_node client.update_node.side_effect = mock_update_node client.delete_node.side_effect = mock_delete_node client.list_nodes.side_effect = mock_list_nodes client.complete_node.side_effect = mock_complete_node client.uncomplete_node.side_effect = mock_uncomplete_node return client @pytest.fixture(autouse=True) def initialize_server(mock_workflowy_client): """Initialize the server with a mock client for all integration tests.""" import workflowy_mcp.server as server # Set the global client server._client = mock_workflowy_client server._rate_limiter = None # Ensure get_client returns our mock with patch.object(server, "get_client", return_value=mock_workflowy_client): yield mock_workflowy_client # Clean up server._client = None server._rate_limiter = None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server