Skip to main content
Glama

workflowy_export_node

Export a WorkFlowy node and its children for backup, sharing, or integration with other applications.

Instructions

Export a WorkFlowy node with all its children

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
node_idNo

Implementation Reference

  • MCP tool registration decorator for 'workflowy_export_node' tool.
    @mcp.tool(name="workflowy_export_node", description="Export a WorkFlowy node with all its children")
    async def export_node(
  • Primary handler function for the MCP tool. Applies rate limiting and delegates to WorkFlowyClient.export_nodes.
        node_id: str | None = None,
    ) -> dict:
        """Export all nodes or filter to specific node's subtree.
    
        Args:
            node_id: ID of the node to export (omit to export all nodes).
                     If provided, exports only that node and all its descendants.
    
        Returns:
            Dictionary containing 'nodes' list with exported node data.
            Rate limit: 1 request per minute for full export.
        """
        client = get_client()
    
        if _rate_limiter:
            await _rate_limiter.acquire()
    
        try:
            data = await client.export_nodes(node_id)
            if _rate_limiter:
                _rate_limiter.on_success()
            return data
        except Exception as e:
            if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError":
                _rate_limiter.on_rate_limit(getattr(e, "retry_after", None))
            raise
  • WorkFlowyClient.export_nodes method - thin wrapper delegating to export_nodes_impl.
        self,
        node_id: str | None = None,
        max_retries: int = 10,
        use_cache: bool = True,
        force_refresh: bool = False,
    ) -> dict[str, Any]:
        """Export all nodes or filter to specific node's subtree.
        
        Delegates to module-level export_nodes_impl to avoid circular imports.
        """
        return await export_nodes_impl(
            self,
            node_id=node_id,
            max_retries=max_retries,
            use_cache=use_cache,
            force_refresh=force_refresh,
        )
  • Core implementation: handles /nodes-export API calls with caching, dirty ID tracking, retries, dewhitening names/notes, and subtree filtering.
    async def export_nodes_impl(
        client: WorkFlowyClientCore,
        node_id: str | None = None,
        max_retries: int = 10,
        use_cache: bool = True,
        force_refresh: bool = False,
    ) -> dict[str, Any]:
        """Export all nodes or filter to specific node's subtree (implementation).
        
        This is extracted as a module-level function to avoid circular imports.
        """
        import asyncio
    
        logger = _ClientLogger()
    
        async def fetch_and_cache() -> dict[str, Any]:
            """Call /nodes-export with retries and update cache."""
            retry_count = 0
            base_delay = 1.0
    
            while retry_count < max_retries:
                await asyncio.sleep(API_RATE_LIMIT_DELAY)
    
                try:
                    response = await client.client.get("/nodes-export")
                    data = await client._handle_response(response)
    
                    all_nodes = data.get("nodes", []) or []
    
                    # Dewhiten names/notes
                    for node in all_nodes:
                        for key in ("name", "nm", "note", "no"):
                            if key in node:
                                node[key] = client._dewhiten_text(node.get(key))
    
                    total_before_filter = len(all_nodes)
                    data["_total_fetched_from_api"] = total_before_filter
    
                    # Update cache
                    client._nodes_export_cache = data
                    client._nodes_export_cache_timestamp = datetime.now()
                    client._nodes_export_dirty_ids.clear()
    
                    if retry_count > 0:
                        success_msg = f"export_nodes succeeded after {retry_count + 1}/{max_retries} attempts"
                        logger.info(success_msg)
                        _log_to_file_helper(success_msg, "reconcile")
    
                    return data
    
                except Exception as e:
                    retry_count += 1
                    logger.warning(f"Export error: {e}. Retry {retry_count}/{max_retries}")
                    if retry_count < max_retries:
                        await asyncio.sleep(base_delay * (2 ** retry_count))
                    else:
                        raise
    
            raise NetworkError("export_nodes failed after maximum retries")
    
        # Decide cache vs fetch
        if (not use_cache) or force_refresh or client._nodes_export_cache is None:
            data = await fetch_and_cache()
        else:
            data = client._nodes_export_cache
    
            # Check dirty IDs for subtree requests
            if node_id is not None and client._nodes_export_dirty_ids:
                all_nodes = data.get("nodes", []) or []
                nodes_by_id = {n.get("id"): n for n in all_nodes if n.get("id")}
    
                if node_id not in nodes_by_id:
                    logger.info(f"Node {node_id} not in cache; refreshing")
                    data = await fetch_and_cache()
                else:
                    dirty = client._nodes_export_dirty_ids
                    path_hits_dirty = False
                    cur = node_id
                    visited: set[str] = set()
    
                    while cur and cur not in visited:
                        visited.add(cur)
                        if cur in dirty or "*" in dirty:
                            path_hits_dirty = True
                            break
                        parent_id = (
                            nodes_by_id[cur].get("parent_id")
                            or nodes_by_id[cur].get("parentId")
                        )
                        cur = parent_id
    
                    if path_hits_dirty:
                        logger.info(f"Path from {node_id} hits dirty; refreshing")
                        data = await fetch_and_cache()
                    else:
                        logger.info(f"Using cached export for {node_id}")
    
        # Filter if needed
        all_nodes = data.get("nodes", []) or []
        total_before_filter = len(all_nodes)
    
        if node_id is None:
            if "_total_fetched_from_api" not in data:
                data["_total_fetched_from_api"] = total_before_filter
            return data
    
        # Filter to subtree
        included_ids = {node_id}
        nodes_by_id = {node["id"]: node for node in all_nodes if node.get("id")}
    
        def add_descendants(parent_id: str) -> None:
            for node in all_nodes:
                if node.get("parent_id") == parent_id and node["id"] not in included_ids:
                    included_ids.add(node["id"])
                    add_descendants(node["id"])
    
        if node_id in nodes_by_id:
            add_descendants(node_id)
    
        filtered_nodes = [node for node in all_nodes if node["id"] in included_ids]
    
        return {
            "nodes": filtered_nodes,
            "_total_fetched_from_api": data.get("_total_fetched_from_api", total_before_filter),
            "_filtered_count": len(filtered_nodes),
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server