workflowy_refresh_nodes_export_cache
Force a fresh WorkFlowy nodes-export snapshot to update the local cache for NEXUS and UUID Navigator, ensuring current data access.
Instructions
Force a fresh /nodes-export snapshot and update the local cache used by NEXUS and the UUID Navigator.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/workflowy_mcp/server.py:1366-1391 (registration)MCP tool registration and thin wrapper handler that acquires rate limiter and calls client.refresh_nodes_export_cache()name="workflowy_refresh_nodes_export_cache", description=( "Force a fresh /nodes-export snapshot and update the local cache used " "by NEXUS and the UUID Navigator." ), ) async def workflowy_refresh_nodes_export_cache() -> dict: """Explicitly refresh the cached /nodes-export snapshot. This is primarily useful after large out-of-band edits in Workflowy desktop, or when you want to be certain the cache reflects the latest ETHER state before running NEXUS or UUID Navigator operations. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.refresh_nodes_export_cache() if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 return {"success": False, "error": str(e)}
- Core handler logic: clears cache and dirty markers, calls export_nodes_impl with use_cache=False to fetch fresh data, returns success stats with node countasync def refresh_nodes_export_cache(self, max_retries: int = 10) -> dict[str, Any]: """Force a fresh /nodes-export call and update the in-memory cache. This is exposed via an MCP tool so Dan (or an agent) can explicitly refresh the snapshot used by UUID Navigator and NEXUS without waiting for an auto-refresh trigger. """ # Import here to avoid circular dependency from .api_client_etch import export_nodes_impl # Clear any previous cache and dirty markers first. self._nodes_export_cache = None self._nodes_export_cache_timestamp = None self._nodes_export_dirty_ids.clear() # Delegate to export_nodes with caching disabled for this call. data = await export_nodes_impl( self, node_id=None, max_retries=max_retries, use_cache=False, force_refresh=True ) nodes = data.get("nodes", []) or [] return { "success": True, "node_count": len(nodes), "timestamp": datetime.now().isoformat(), }
- Cache fields: _nodes_export_cache, _nodes_export_cache_timestamp, _nodes_export_dirty_ids used to track and manage the cached /nodes-export data# _nodes_export_cache stores the last /nodes-export payload (flat nodes list). # _nodes_export_dirty_ids holds UUIDs whose subtrees/ancestors have been # mutated since the last refresh. A "*" entry means "treat everything as dirty". self._nodes_export_cache: dict[str, Any] | None = None self._nodes_export_cache_timestamp = None self._nodes_export_dirty_ids: set[str] = set()
- Helper to mark cache as dirty after mutations; used by CRUD operations to invalidate relevant cache partsdef _mark_nodes_export_dirty(self, node_ids: list[str] | None = None) -> None: """Mark parts of the cached /nodes-export snapshot as dirty. When the cache is populated, this is used by mutating operations to record which UUIDs (or entire regions via "*") have changed since the last refresh. Subsequent export_nodes(...) calls can decide whether they can safely reuse the cached snapshot for a given subtree or must re-fetch from the API. """ # If there is no cache, there's nothing to mark. if self._nodes_export_cache is None: return # node_ids=None is the conservative "everything is dirty" sentinel. if node_ids is None: self._nodes_export_dirty_ids.add("*") return for nid in node_ids: if nid: self._nodes_export_dirty_ids.add(nid)