nexus_ignite_shards
Analyze and extract deeper insights from selected WorkFlowy outline nodes, creating structured summaries of hierarchical content layers.
Instructions
IGNITE selected SHARDS so the ETHER glows more deeply around them, revealing deeper layers (but not necessarily to FULL depth). The deeper revelation is captured as a PHANTOM GEM (S0), an unrefracted witness of those subtrees.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| nexus_tag | Yes | ||
| root_ids | Yes | ||
| max_depth | No | ||
| child_limit | No | ||
| per_root_limits | No |
Implementation Reference
- Core handler: orchestrates IGNITE SHARDS logic - indexes coarse terrain, performs targeted deep API exports on root_ids, constructs phantom_gem.json by splicing full subtrees into terrain skeleton while preserving truncation metadataasync def nexus_ignite_shards( self, nexus_tag: str, root_ids: list[str], max_depth: int | None = None, child_limit: int | None = None, per_root_limits: dict[str, dict[str, int]] | None = None, ) -> dict[str, Any]: """IGNITE SHARDS → phantom_gem.json.""" logger = _ClientLogger() run_dir = self._get_nexus_dir(nexus_tag) coarse_path = os.path.join(run_dir, "coarse_terrain.json") phantom_path = os.path.join(run_dir, "phantom_gem.json") if not os.path.exists(coarse_path): raise NetworkError("coarse_terrain.json not found; run nexus_scry first") if not root_ids: empty = {"nexus_tag": nexus_tag, "roots": [], "nodes": []} with open(phantom_path, "w", encoding="utf-8") as f: json.dump(empty, f, indent=2, ensure_ascii=False) return { "success": True, "nexus_tag": nexus_tag, "phantom_gem": phantom_path, "roots": [], "node_count": 0, } # Read coarse terrain with open(coarse_path, "r", encoding="utf-8") as f: coarse_data = json.load(f) terrain_nodes = coarse_data.get("nodes", []) export_root_id = coarse_data.get("export_root_id") # Build ledger original_ids_seen: set[str] = set() if isinstance(coarse_data.get("original_ids_seen"), list): original_ids_seen.update(str(nid) for nid in coarse_data.get("original_ids_seen", []) if nid) else: def _collect(nodes: list[dict[str, Any]]) -> None: for node in nodes or []: if isinstance(node, dict): nid = node.get("id") if nid: original_ids_seen.add(str(nid)) _collect(node.get("children") or []) _collect(terrain_nodes) if export_root_id: original_ids_seen.add(str(export_root_id)) # Build indexes parent_by_id: dict[str, str | None] = {} node_by_id: dict[str, dict[str, Any]] = {} def _index(nodes: list[dict[str, Any]], parent_id: str | None) -> None: for node in nodes: nid = node.get("id") if nid: parent_by_id[nid] = parent_id node_by_id[nid] = node children = node.get("children") or [] if children: _index(children, nid) _index(terrain_nodes, export_root_id) # Normalize roots unique_root_ids = [] for rid in root_ids: if rid not in unique_root_ids: unique_root_ids.append(rid) roots_set = set(unique_root_ids) # Check existence missing = [rid for rid in roots_set if rid not in parent_by_id] if missing: raise NetworkError(f"Roots not in coarse_terrain: {missing}") # Enforce disjointness for rid in roots_set: parent = parent_by_id.get(rid) while parent is not None: if parent in roots_set: raise NetworkError( f"Roots not disjoint: '{rid}' descends from '{parent}'" ) parent = parent_by_id.get(parent) # Deep SCRY for each root deep_subtrees: dict[str, dict[str, Any]] = {} roots_resolved = [] total_nodes = 0 per_root_limits = per_root_limits or {} for root_id in unique_root_ids: limits = per_root_limits.get(root_id, {}) root_max_depth = limits.get("max_depth", max_depth) root_child_limit = limits.get("child_limit", child_limit) try: raw = await export_nodes_impl(self, node_id=root_id) except Exception as e: logger.error(f"Export failed for {root_id}: {e}") continue flat = raw.get("nodes", []) if not flat: continue total_nodes += raw.get("_total_fetched_from_api", len(flat)) tree = self._build_hierarchy(flat, True) if not tree: continue root_subtree = next((c for c in tree if c.get("id") == root_id), tree[0]) self._annotate_child_counts_and_truncate( [root_subtree], max_depth=root_max_depth, child_count_limit=root_child_limit, current_depth=1, ) deep_subtrees[root_id] = root_subtree roots_resolved.append(root_id) # Extend ledger def _collect_subtree(node: dict[str, Any]) -> None: if isinstance(node, dict): nid = node.get("id") if nid: original_ids_seen.add(str(nid)) for child in node.get("children") or []: _collect_subtree(child) _collect_subtree(root_subtree) # Build GEM skeleton if not roots_resolved: phantom_payload = { "nexus_tag": nexus_tag, "roots": [], "export_root_id": export_root_id, "export_root_name": coarse_data.get("export_root_name", "Root"), "original_ids_seen": sorted(original_ids_seen), "nodes": [], } else: skeleton_by_id: dict[str, dict[str, Any]] = {} top_level_ids: list[str] = [] top_level_nodes: list[dict[str, Any]] = [] def ensure_skeleton(nid: str) -> dict[str, Any]: source = node_by_id.get(nid) if not source: raise NetworkError(f"Node {nid} not in coarse_terrain") if nid in skeleton_by_id: return skeleton_by_id[nid] skel = {k: v for k, v in source.items() if k != "children"} skel["children"] = [] skeleton_by_id[nid] = skel return skel for root_id in roots_resolved: path: list[str] = [] cur = root_id while cur is not None and cur != export_root_id: path.append(cur) cur = parent_by_id.get(cur) if cur != export_root_id: raise NetworkError(f"Root {root_id} doesn't descend from export_root") path.reverse() parent_id_iter = export_root_id for nid in path: node_skel = ensure_skeleton(nid) if parent_id_iter == export_root_id: if nid not in top_level_ids: top_level_ids.append(nid) top_level_nodes.append(node_skel) else: parent_skel = skeleton_by_id[parent_id_iter] children_list = parent_skel.get("children") or [] if not any(isinstance(c, dict) and c.get("id") == nid for c in children_list): children_list.append(node_skel) parent_skel["children"] = children_list parent_id_iter = nid # Replace skeletons with deep subtrees for root_id, subtree in deep_subtrees.items(): parent_id = parent_by_id.get(root_id) if parent_id is None or parent_id == export_root_id: for idx, nid in enumerate(top_level_ids): if nid == root_id: top_level_nodes[idx] = subtree break else: parent_skel = skeleton_by_id.get(parent_id) if parent_skel: children_list = parent_skel.get("children") or [] for idx, child in enumerate(children_list): if isinstance(child, dict) and child.get("id") == root_id: children_list[idx] = subtree break skeleton_by_id[root_id] = subtree # Compute preview phantom_preview = None try: phantom_preview = self._annotate_preview_ids_and_build_tree(top_level_nodes, "PG") except Exception: pass phantom_payload = { "nexus_tag": nexus_tag, "__preview_tree__": phantom_preview, "export_root_id": export_root_id, "export_root_name": coarse_data.get("export_root_name", "Root"), "nodes": top_level_nodes, "roots": roots_resolved, "original_ids_seen": sorted(original_ids_seen), } # Update coarse terrain ledger try: coarse_data["original_ids_seen"] = sorted(original_ids_seen) with open(coarse_path, "w", encoding="utf-8") as f: json.dump(coarse_data, f, indent=2, ensure_ascii=False) except Exception: logger.warning("Failed to update coarse_terrain ledger") _log_to_file_helper( f"nexus_ignite_shards[{nexus_tag}]: roots={roots_resolved}, node_count={total_nodes}", "nexus", ) with open(phantom_path, "w", encoding="utf-8") as f: json.dump(phantom_payload, f, indent=2, ensure_ascii=False) return { "success": True, "nexus_tag": nexus_tag, "phantom_gem": phantom_path, "roots": roots_resolved, "node_count": total_nodes, }
- src/workflowy_mcp/server.py:1450-1490 (registration)MCP tool registration: @mcp.tool decorator binds 'nexus_ignite_shards' name to handler that delegates to WorkFlowyClient.nexus_ignite_shardsname="nexus_ignite_shards", description=( "IGNITE selected SHARDS so the ETHER glows more deeply around them, revealing " "deeper layers (but not necessarily to FULL depth). The deeper revelation is " "captured as a PHANTOM GEM (S0), an unrefracted witness of those subtrees." ), ) async def nexus_ignite_shards( nexus_tag: str, root_ids: list[str], max_depth: int | None = None, child_limit: int | None = None, per_root_limits: dict[str, dict[str, int]] | None = None, ) -> dict: """IGNITE SHARDS in the TERRAIN so the ETHER glows more deeply around them. From an existing TERRAIN, mark specific nodes as SHARDS and IGNITE them. The ETHER glows around these SHARDS, revealing deeper layers (but not necessarily to full depth). The revealed structure is condensed into a PHANTOM GEM (S0). """ client = get_client() if _rate_limiter: await _rate_limiter.acquire() try: result = await client.nexus_ignite_shards( nexus_tag=nexus_tag, root_ids=root_ids, max_depth=max_depth, child_limit=child_limit, per_root_limits=per_root_limits, ) if _rate_limiter: _rate_limiter.on_success() return result except Exception as e: # noqa: BLE001 if _rate_limiter and hasattr(e, "__class__") and e.__class__.__name__ == "RateLimitError": _rate_limiter.on_rate_limit(getattr(e, "retry_after", None)) raise