nexus_anchor_gems
Reveal hidden structure in WorkFlowy outlines by marking nodes with nexus tags to illuminate deeper organizational patterns.
Instructions
Let the PHANTOM GEM ILLUMINATE the TRUE GEMS that were ALWAYS PRESENT in the TERRAIN but not yet revealed: where SHARDS were marked, the TERRAIN now shimmers with deeper revealed structure (FIRST IMBUE—NOTHING CHANGES in the ETHER). The PHANTOM GEM remains a REFLECTION: an untouched witness.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| nexus_tag | Yes |
Implementation Reference
- Core handler: Loads coarse_terrain.json and phantom_gem.json, indexes phantom subtrees by ID, recursively replaces matching subtrees in terrain nodes, merges original_ids_seen and explicitly_preserved_ids ledgers, optionally attaches preview_tree, writes shimmering_terrain.json with updated structure.async def nexus_anchor_gems(self, nexus_tag: str) -> dict[str, Any]: """ANCHOR GEMS → shimmering_terrain.json.""" run_dir = self._get_nexus_dir(nexus_tag) coarse_path = os.path.join(run_dir, "coarse_terrain.json") phantom_path = os.path.join(run_dir, "phantom_gem.json") shimmering_path = os.path.join(run_dir, "shimmering_terrain.json") if not os.path.exists(coarse_path): raise NetworkError("coarse_terrain not found") if not os.path.exists(phantom_path): raise NetworkError("phantom_gem not found") with open(coarse_path, "r", encoding="utf-8") as f: coarse_data = json.load(f) with open(phantom_path, "r", encoding="utf-8") as f: phantom_data = json.load(f) phantom_roots = phantom_data.get("roots", []) phantom_nodes = phantom_data.get("nodes", []) if not phantom_roots or not phantom_nodes: # Nothing to anchor with open(shimmering_path, "w", encoding="utf-8") as f: json.dump(coarse_data, f, indent=2, ensure_ascii=False) return { "success": True, "nexus_tag": nexus_tag, "shimmering_terrain": shimmering_path, "roots": [], } # Build phantom subtree index subtree_by_id: dict[str, dict[str, Any]] = {} def _index_phantom(nodes: list[dict[str, Any]]) -> None: for node in nodes or []: if isinstance(node, dict): nid = node.get("id") if nid: subtree_by_id[nid] = node children = node.get("children") or [] if children: _index_phantom(children) _index_phantom(phantom_nodes) terrain_nodes = coarse_data.get("nodes", []) def replace_subtree(nodes: list[dict[str, Any]]) -> None: for idx, node in enumerate(nodes): nid = node.get("id") if nid in subtree_by_id: nodes[idx] = subtree_by_id[nid] else: children = node.get("children") or [] if children: replace_subtree(children) replace_subtree(terrain_nodes) # Merge ledgers original_ids: set[str] = set(coarse_data.get("original_ids_seen", []) or []) original_ids.update(phantom_data.get("original_ids_seen", []) or []) if original_ids: coarse_data["original_ids_seen"] = sorted(original_ids) explicitly_preserved: set[str] = set(coarse_data.get("explicitly_preserved_ids", []) or []) explicitly_preserved.update(phantom_data.get("explicitly_preserved_ids", []) or []) if explicitly_preserved: coarse_data["explicitly_preserved_ids"] = sorted(explicitly_preserved) coarse_data["nodes"] = terrain_nodes # Attach shimmering preview shimmering_preview = None try: shimmering_preview = self._annotate_preview_ids_and_build_tree( coarse_data.get("nodes") or [], "ST" ) except Exception: pass if shimmering_preview: coarse_data = { "export_timestamp": coarse_data.get("export_timestamp"), "export_root_children_status": coarse_data.get("export_root_children_status"), "__preview_tree__": shimmering_preview, "export_root_id": coarse_data.get("export_root_id"), "export_root_name": coarse_data.get("export_root_name"), "nodes": coarse_data.get("nodes"), "original_ids_seen": coarse_data.get("original_ids_seen"), "explicitly_preserved_ids": coarse_data.get("explicitly_preserved_ids"), } with open(shimmering_path, "w", encoding="utf-8") as f: json.dump(coarse_data, f, indent=2, ensure_ascii=False) return { "success": True, "nexus_tag": nexus_tag, "shimmering_terrain": shimmering_path, "roots": phantom_roots, }
- src/workflowy_mcp/server.py:1493-1525 (registration)MCP tool registration via @mcp.tool decorator. Thin wrapper that acquires rate limiter, delegates to WorkFlowyClient.nexus_anchor_gems(nexus_tag), handles rate limiting and exceptions.name="nexus_anchor_gems", description=( "Let the PHANTOM GEM ILLUMINATE the TRUE GEMS that were ALWAYS PRESENT in " "the TERRAIN but not yet revealed: where SHARDS were marked, the TERRAIN " "now shimmers with deeper revealed structure (FIRST IMBUE—NOTHING CHANGES " "in the ETHER). The PHANTOM GEM remains a REFLECTION: an untouched witness." ), ) async def nexus_anchor_gems( nexus_tag: str, ) -> dict: """ANCHOR the PHANTOM GEM into the TERRAIN to create SHIMMERING TERRAIN. The PHANTOM GEM now illuminates the TRUE GEMS that were always present in the TERRAIN but not yet revealed. Where SHARDS were marked, the TERRAIN now shimmers with deeper revealed structure (FIRST IMBUE—Workflowy remains untouched). The PHANTOM GEM stays as an unrefracted witness. """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_anchor_gems(nexus_tag=nexus_tag) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise