workflowy_update_node
Modify existing WorkFlowy nodes by updating their name, note content, layout style, or completion status using the node's unique identifier.
Instructions
Update an existing WorkFlowy node
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| node_id | Yes | ||
| name | No | ||
| note | No | ||
| layout_mode | No | ||
| _completed | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ch | No | Child nodes | |
| cp | No | Completion status (for tests) | |
| id | Yes | Unique identifier for the node | |
| data | No | Node data including layoutMode | |
| name | No | Text content of the node | |
| note | No | Note content attached to the node | |
| parentId | No | Parent node ID | |
| priority | No | Sort order | |
| createdAt | No | Creation timestamp (Unix timestamp) | |
| modifiedAt | No | Last modification timestamp | |
| completedAt | No | Completion timestamp (null if not completed) |
Implementation Reference
- src/workflowy_mcp/server.py:1024-1064 (handler)MCP tool handler and registration for workflowy_update_node. Wraps client.update_node with rate limiting.
@mcp.tool(name="workflowy_update_node", description="Update an existing WorkFlowy node") async def update_node( node_id: str, name: str | None = None, note: str | None = None, layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None, _completed: bool | None = None, ) -> WorkFlowyNode: """Update an existing WorkFlowy node. Args: node_id: The ID of the node to update name: New text content for the node (optional) note: New note/description (optional) layout_mode: New layout mode for the node (bullets, todo, h1, h2, h3) (optional) _completed: New completion status (not used - use complete_node/uncomplete_node) Returns: The updated WorkFlowy node """ client = get_client() request = NodeUpdateRequest( # type: ignore[call-arg] name=name, note=note, layoutMode=layout_mode, ) if _rate_limiter: await _rate_limiter.acquire() try: node = await client.update_node(node_id, request) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise - Pydantic schema NodeUpdateRequest defining input parameters for node updates.
class NodeUpdateRequest(BaseModel): """Request payload for updating an existing node.""" name: str | None = Field(None, description="New text content") note: str | None = Field(None, description="New note content") layoutMode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = Field( None, description="New display mode (bullets, todo, h1, h2, h3)" ) def has_updates(self) -> bool: """Check if at least one field is provided for update.""" return any(getattr(self, field) is not None for field in self.model_fields) - Core API client implementation of update_node: HTTP POST, validation, escaping, retries, error handling.
async def update_node(self, node_id: str, request: NodeUpdateRequest, max_retries: int = 10) -> WorkFlowyNode: """Update an existing node with exponential backoff retry. Args: node_id: The ID of the node to update request: Node update request max_retries: Maximum retry attempts (default 10) """ import asyncio logger = _ClientLogger() # Validate and escape name field if being updated if request.name is not None: processed_name, name_warning = self._validate_name_field(request.name) if processed_name is not None: request.name = processed_name if name_warning: logger.info(name_warning) # Validate and escape note field if being updated if request.note is not None: # Note: update_node doesn't have _internal_call flag yet, always validates processed_note, message = self._validate_note_field(request.note) if processed_note is None and message: # Blocking error raise NetworkError(message) # Strip override token if present if processed_note and processed_note.startswith("<<<LITERAL_BACKSLASH_N_INTENTIONAL>>>"): processed_note = processed_note.replace("<<<LITERAL_BACKSLASH_N_INTENTIONAL>>>", "", 1) # Use processed (escaped) note request.note = processed_note # Log warning if escaping occurred if message and "AUTO-ESCAPED" in message: logger.info(message) retry_count = 0 base_delay = 1.0 while retry_count < max_retries: # Force delay at START of each iteration (rate limit protection) await asyncio.sleep(API_RATE_LIMIT_DELAY) try: response = await self.client.post( f"/nodes/{node_id}", json=request.model_dump(exclude_none=True) ) data = await self._handle_response(response) # API returns {"status": "ok"} - fetch updated node if isinstance(data, dict) and data.get('status') == 'ok': get_response = await self.client.get(f"/nodes/{node_id}") node_data = await self._handle_response(get_response) node = WorkFlowyNode(**node_data["node"]) else: # Fallback for unexpected format node = WorkFlowyNode(**data) # Best-effort: mark this node as dirty so that subsequent # /nodes-export-based operations touching this subtree can # trigger a refresh when needed. try: self._mark_nodes_export_dirty([node_id]) except Exception: # Cache dirty marking must never affect API behavior pass return node except RateLimitError as e: retry_count += 1 retry_after = getattr(e, 'retry_after', None) or (base_delay * (2 ** retry_count)) logger.warning( f"Rate limited on update_node. Retry after {retry_after}s. " f"Attempt {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(retry_after) else: raise except NetworkError as e: retry_count += 1 logger.warning( f"Network error on update_node: {e}. Retry {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(base_delay * (2 ** retry_count)) else: raise except httpx.TimeoutException as err: retry_count += 1 logger.warning( f"Timeout error: {err}. Retry {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(base_delay * (2 ** retry_count)) else: raise TimeoutError("update_node") from err raise NetworkError("update_node failed after maximum retries")