Skip to main content
Glama

workflowy_export_node

Export a WorkFlowy node and its children for backup, sharing, or integration with other applications.

Instructions

Export a WorkFlowy node with all its children

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
node_idNo

Implementation Reference

  • MCP tool registration decorator for 'workflowy_export_node' tool.
    @mcp.tool(name="workflowy_export_node", description="Export a WorkFlowy node with all its children") async def export_node(
  • Primary handler function for the MCP tool. Applies rate limiting and delegates to WorkFlowyClient.export_nodes.
    node_id: str | None = None, ) -> dict: """Export all nodes or filter to specific node's subtree. Args: node_id: ID of the node to export (omit to export all nodes). If provided, exports only that node and all its descendants. Returns: Dictionary containing 'nodes' list with exported node data. Rate limit: 1 request per minute for full export. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: data = await client.export_nodes(node_id) if _rate_limiter: _rate_limiter.on_success() return data except Exception as e: if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise
  • WorkFlowyClient.export_nodes method - thin wrapper delegating to export_nodes_impl.
    self, node_id: str | None = None, max_retries: int = 10, use_cache: bool = True, force_refresh: bool = False, ) -> dict[str, Any]: """Export all nodes or filter to specific node's subtree. Delegates to module-level export_nodes_impl to avoid circular imports. """ return await export_nodes_impl( self, node_id=node_id, max_retries=max_retries, use_cache=use_cache, force_refresh=force_refresh, )
  • Core implementation: handles /nodes-export API calls with caching, dirty ID tracking, retries, dewhitening names/notes, and subtree filtering.
    async def export_nodes_impl( client: WorkFlowyClientCore, node_id: str | None = None, max_retries: int = 10, use_cache: bool = True, force_refresh: bool = False, ) -> dict[str, Any]: """Export all nodes or filter to specific node's subtree (implementation). This is extracted as a module-level function to avoid circular imports. """ import asyncio logger = _ClientLogger() async def fetch_and_cache() -> dict[str, Any]: """Call /nodes-export with retries and update cache.""" retry_count = 0 base_delay = 1.0 while retry_count < max_retries: await asyncio.sleep(API_RATE_LIMIT_DELAY) try: response = await client.client.get("/nodes-export") data = await client._handle_response(response) all_nodes = data.get("nodes", []) or [] # Dewhiten names/notes for node in all_nodes: for key in ("name", "nm", "note", "no"): if key in node: node[key] = client._dewhiten_text(node.get(key)) total_before_filter = len(all_nodes) data["_total_fetched_from_api"] = total_before_filter # Update cache client._nodes_export_cache = data client._nodes_export_cache_timestamp = datetime.now() client._nodes_export_dirty_ids.clear() if retry_count > 0: success_msg = f"export_nodes succeeded after {retry_count + 1}/{max_retries} attempts" logger.info(success_msg) _log_to_file_helper(success_msg, "reconcile") return data except Exception as e: retry_count += 1 logger.warning(f"Export error: {e}. Retry {retry_count}/{max_retries}") if retry_count < max_retries: await asyncio.sleep(base_delay * (2 ** retry_count)) else: raise raise NetworkError("export_nodes failed after maximum retries") # Decide cache vs fetch if (not use_cache) or force_refresh or client._nodes_export_cache is None: data = await fetch_and_cache() else: data = client._nodes_export_cache # Check dirty IDs for subtree requests if node_id is not None and client._nodes_export_dirty_ids: all_nodes = data.get("nodes", []) or [] nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")} if node_id not in nodes_by_id: logger.info(f"Node {node_id} not in cache; refreshing") data = await fetch_and_cache() else: dirty = client._nodes_export_dirty_ids path_hits_dirty = False cur = node_id visited: set[str] = set() while cur and cur not in visited: visited.add(cur) if cur in dirty or "*" in dirty: path_hits_dirty = True break parent_id = ( nodes_by_id[cur].get("parent_id") or nodes_by_id[cur].get("parentId") ) cur = parent_id if path_hits_dirty: logger.info(f"Path from {node_id} hits dirty; refreshing") data = await fetch_and_cache() else: logger.info(f"Using cached export for {node_id}") # Filter if needed all_nodes = data.get("nodes", []) or [] total_before_filter = len(all_nodes) if node_id is None: if "_total_fetched_from_api" not in data: data["_total_fetched_from_api"] = total_before_filter return data # Filter to subtree included_ids = {node_id} nodes_by_id = {node["id"]: node for node in all_nodes if node.get("id")} def add_descendants(parent_id: str) -> None: for node in all_nodes: if node.get("parent_id") == parent_id and node["id"] not in included_ids: included_ids.add(node["id"]) add_descendants(node["id"]) if node_id in nodes_by_id: add_descendants(node_id) filtered_nodes = [node for node in all_nodes if node["id"] in included_ids] return { "nodes": filtered_nodes, "_total_fetched_from_api": data.get("_total_fetched_from_api", total_before_filter), "_filtered_count": len(filtered_nodes), }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server