workflowy_update_node
Modify existing WorkFlowy nodes by updating their name, note content, layout style, or completion status using the node's unique identifier.
Instructions
Update an existing WorkFlowy node
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| node_id | Yes | ||
| name | No | ||
| note | No | ||
| layout_mode | No | ||
| _completed | No |
Implementation Reference
- src/workflowy_mcp/server.py:1024-1064 (handler)MCP tool handler and registration for workflowy_update_node. Wraps client.update_node with rate limiting.@mcp.tool(name="workflowy_update_node", description="Update an existing WorkFlowy node") async def update_node( node_id: str, name: str | None = None, note: str | None = None, layout_mode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = None, _completed: bool | None = None, ) -> WorkFlowyNode: """Update an existing WorkFlowy node. Args: node_id: The ID of the node to update name: New text content for the node (optional) note: New note/description (optional) layout_mode: New layout mode for the node (bullets, todo, h1, h2, h3) (optional) _completed: New completion status (not used - use complete_node/uncomplete_node) Returns: The updated WorkFlowy node """ client = get_client() request = NodeUpdateRequest( # type: ignore[call-arg] name=name, note=note, layoutMode=layout_mode, ) if _rate_limiter: await _rate_limiter.acquire() try: node = await client.update_node(node_id, request) if _rate_limiter: _rate_limiter.on_success() return node except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise
- Pydantic schema NodeUpdateRequest defining input parameters for node updates.class NodeUpdateRequest(BaseModel): """Request payload for updating an existing node.""" name: str | None = Field(None, description="New text content") note: str | None = Field(None, description="New note content") layoutMode: Literal["bullets", "todo", "h1", "h2", "h3"] | None = Field( None, description="New display mode (bullets, todo, h1, h2, h3)" ) def has_updates(self) -> bool: """Check if at least one field is provided for update.""" return any(getattr(self, field) is not None for field in self.model_fields)
- Core API client implementation of update_node: HTTP POST, validation, escaping, retries, error handling.async def update_node(self, node_id: str, request: NodeUpdateRequest, max_retries: int = 10) -> WorkFlowyNode: """Update an existing node with exponential backoff retry. Args: node_id: The ID of the node to update request: Node update request max_retries: Maximum retry attempts (default 10) """ import asyncio logger = _ClientLogger() # Validate and escape name field if being updated if request.name is not None: processed_name, name_warning = self._validate_name_field(request.name) if processed_name is not None: request.name = processed_name if name_warning: logger.info(name_warning) # Validate and escape note field if being updated if request.note is not None: # Note: update_node doesn't have _internal_call flag yet, always validates processed_note, message = self._validate_note_field(request.note) if processed_note is None and message: # Blocking error raise NetworkError(message) # Strip override token if present if processed_note and processed_note.startswith("<<<LITERAL_BACKSLASH_N_INTENTIONAL>>>"): processed_note = processed_note.replace("<<<LITERAL_BACKSLASH_N_INTENTIONAL>>>", "", 1) # Use processed (escaped) note request.note = processed_note # Log warning if escaping occurred if message and "AUTO-ESCAPED" in message: logger.info(message) retry_count = 0 base_delay = 1.0 while retry_count < max_retries: # Force delay at START of each iteration (rate limit protection) await asyncio.sleep(API_RATE_LIMIT_DELAY) try: response = await self.client.post( f"/nodes/{node_id}", json=request.model_dump(exclude_none=True) ) data = await self._handle_response(response) # API returns {"status": "ok"} - fetch updated node if isinstance(data, dict) and data.get('status') == 'ok': get_response = await self.client.get(f"/nodes/{node_id}") node_data = await self._handle_response(get_response) node = WorkFlowyNode(**node_data["node"]) else: # Fallback for unexpected format node = WorkFlowyNode(**data) # Best-effort: mark this node as dirty so that subsequent # /nodes-export-based operations touching this subtree can # trigger a refresh when needed. try: self._mark_nodes_export_dirty([node_id]) except Exception: # Cache dirty marking must never affect API behavior pass return node except RateLimitError as e: retry_count += 1 retry_after = getattr(e, 'retry_after', None) or (base_delay * (2 ** retry_count)) logger.warning( f"Rate limited on update_node. Retry after {retry_after}s. " f"Attempt {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(retry_after) else: raise except NetworkError as e: retry_count += 1 logger.warning( f"Network error on update_node: {e}. Retry {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(base_delay * (2 ** retry_count)) else: raise except httpx.TimeoutException as err: retry_count += 1 logger.warning( f"Timeout error: {err}. Retry {retry_count}/{max_retries}" ) if retry_count < max_retries: await asyncio.sleep(base_delay * (2 ** retry_count)) else: raise TimeoutError("update_node") from err raise NetworkError("update_node failed after maximum retries")