Skip to main content
Glama
nexus_json_tools.py71.3 kB
"""nexus_json_tools.py Utility CLI for manipulating NEXUS SCRY JSON files (Workflowy exports). This provides a small, Workflowy-API-like set of commands that operate on the SCRY JSON *offline*: - rename-node (change the `name` of a node by id) - set-note (change the `note` of a node by id) - delete-node (remove a node and its subtree by id) - move-node (reparent within the SCRY JSON) - extract-shard (BURN THE LENS – extract a shard subtree from Territory) - burn-gem (REHYDRATE GEM – splice full-depth subtrees into Territory and extract a GEM shard) - fuse-shard (2-way replace of roots – legacy/simple mode) - fuse-shard-3way (T/S0/S1 SHARD FUSE into Territory – new mode) Designed to be used on QUILLSTRIKE working stones for SCRY files, before running `nexus_weave` to write changes back into Workflowy. Example (simple rename): python nexus_json_tools.py \ E:\...\\temp\\qm-XXXX-mcp-deploy-docs--nexus_mcp_deploy_47ec.json \ rename-node --id 577dcdf0-... --name "New step title" Example (3-way shard fuse): python nexus_json_tools.py dummy \ fuse-shard-3way \ --witness-shard E:\...\\shard-original.json \ --morphed-shard E:\...\\shard-working.json \ --target-scry E:\...\\territory.json Example (burn GEM from full SCRY into Territory): python nexus_json_tools.py territory.json \ burn-gem \ --full-scry E:\...\\full_scry_for_roots.json \ --root-ids R1 R2 R3 \ --output-shard E:\...\\gem_shard_S0.json After edits, run: - QUILLMORPH on the working stone (to update the SCRY JSON) - generate_markdown_from_json() - nexus_weave() to apply to Workflowy. """ from __future__ import annotations import argparse import copy import json import os import sys from datetime import datetime from typing import Any, Dict, List, Optional, Tuple, Set JsonDict = Dict[str, Any] def log_jewel(message: str) -> None: """Log JEWELSTORM operations to a persistent logfile (best-effort). This mirrors the DATETIME+prefix style used by the NEXUS WEAVE reconcile_debug.log, but writes to jewelstorm_debug.log so JEWELSTORM activity can be inspected independently of WEAVE. """ try: log_path = ( r"E:\\__daniel347x\\__Obsidian\\__Inking into Mind\\--TypingMind\\Projects - All\\Projects - Individual\\TODO\\temp\\jewelstorm_debug.log" ) ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] with open(log_path, "a", encoding="utf-8") as f: f.write(f"[{ts}] {message}\n") except Exception: # Logging must never affect CLI behavior pass def transform_jewel( jewel_file: str, operations: List[Dict[str, Any]], dry_run: bool = False, stop_on_error: bool = True, ) -> Dict[str, Any]: """Apply JEWELSTORM semantic operations to a NEXUS working_gem JSON file. This is the semantic analogue of pattern-based edit_file() for PHANTOM GEM / working_gem JSON: - Operates purely offline (no Workflowy API calls) - Works on a local JSON file produced by JEWELSTRIKE (typically phantom_gem.json copied to a working stone) - Uses jewel_id as the stable identity handle inside JEWELSTORM - Never invents real Workflowy IDs: new nodes are written without an "id" field so NEXUS/WEAVE can treat them as CREATE operations Args: jewel_file: Path to working_gem JSON file operations: List of operation dictionaries, e.g.: { "op": "MOVE_NODE", "jewel_id": "J-001", "new_parent_jewel_id": "J-010", "position": "LAST" | "FIRST" | "BEFORE" | "AFTER", "relative_to_jewel_id": "J-999" # for BEFORE/AFTER } dry_run: If True, simulate only (no file write) stop_on_error: If True, abort on first error (no write) Returns: Dict with success flag, counts, and error details. Example: { "success": True, "applied_count": 3, "dry_run": False, "nodes_created": 1, "nodes_deleted": 0, "nodes_moved": 1, "nodes_renamed": 1, "notes_updated": 0, "attrs_updated": 0, "errors": [] } """ import uuid # ------- Load JSON safely (convert die/SystemExit into structured error) ------- try: data = load_json(jewel_file) except SystemExit as e: # die() inside load_json uses sys.exit return { "success": False, "applied_count": 0, "dry_run": dry_run, "nodes_created": 0, "nodes_deleted": 0, "nodes_moved": 0, "nodes_renamed": 0, "notes_updated": 0, "attrs_updated": 0, "errors": [ { "index": -1, "op": None, "code": "LOAD_ERROR", "message": f"Failed to load JSON from {jewel_file}: {e}", } ], } # Determine editable roots list (supports both export-package dict and bare list) if isinstance(data, dict) and isinstance(data.get("nodes"), list): original_roots = data["nodes"] elif isinstance(data, list): original_roots = data else: return { "success": False, "applied_count": 0, "dry_run": dry_run, "nodes_created": 0, "nodes_deleted": 0, "nodes_moved": 0, "nodes_renamed": 0, "notes_updated": 0, "attrs_updated": 0, "errors": [ { "index": -1, "op": None, "code": "FORMAT_ERROR", "message": "JSON must be a dict with 'nodes' or a bare list of nodes for transform_jewel.", } ], } # Work on a deep copy so we never mutate the original unless we succeed roots: List[JsonDict] = copy.deepcopy(original_roots) # ------- Indexing: jewel_id-based identity ------- by_jewel_id: Dict[str, JsonDict] = {} parent_by_jewel_id: Dict[str, Optional[str]] = {} existing_ids: Set[str] = set() def _ensure_children_list(node: JsonDict) -> List[JsonDict]: children = node.get("children") if not isinstance(children, list): children = [] node["children"] = children return children def _register_node(node: JsonDict, parent_jid: Optional[str]) -> None: """Register node and its subtree in jewel_id index. Identity: - Prefer "jewel_id" if present - Fallback to "id" for older files (pre-JEWELSTRIKE) """ jewel_id = node.get("jewel_id") or node.get("id") if jewel_id: if jewel_id in by_jewel_id: raise ValueError(f"Duplicate jewel_id/id {jewel_id!r} in JSON tree") by_jewel_id[jewel_id] = node parent_by_jewel_id[jewel_id] = parent_jid existing_ids.add(jewel_id) children = node.get("children") or [] if not isinstance(children, list): children = [] node["children"] = children for child in children: if isinstance(child, dict): _register_node(child, jewel_id) for root in roots: if isinstance(root, dict): _register_node(root, None) def _new_jewel_id() -> str: """Generate a fresh, human-friendly jewel_id that cannot collide.""" while True: candidate = "J-" + uuid.uuid4().hex[:8] if candidate not in existing_ids: existing_ids.add(candidate) return candidate def _get_node_and_parent_list(jewel_id: str) -> Tuple[JsonDict, Optional[str], List[JsonDict]]: node = by_jewel_id.get(jewel_id) if node is None: raise ValueError(f"Node with jewel_id/id {jewel_id!r} not found") parent_jid = parent_by_jewel_id.get(jewel_id) if parent_jid is None: siblings = roots else: parent_node = by_jewel_id.get(parent_jid) if parent_node is None: raise ValueError(f"Parent with jewel_id/id {parent_jid!r} not found for node {jewel_id!r}") siblings = _ensure_children_list(parent_node) return node, parent_jid, siblings def _assert_no_cycle(node_jewel_id: str, new_parent_jewel_id: Optional[str]) -> None: cur = new_parent_jewel_id while cur is not None: if cur == node_jewel_id: raise ValueError( f"Cannot move node {node_jewel_id!r} under its own descendant {new_parent_jewel_id!r}" ) cur = parent_by_jewel_id.get(cur) def _remove_subtree_from_index(node: JsonDict) -> None: jid = node.get("jewel_id") or node.get("id") if jid: by_jewel_id.pop(jid, None) parent_by_jewel_id.pop(jid, None) existing_ids.discard(jid) for child in node.get("children") or []: if isinstance(child, dict): _remove_subtree_from_index(child) def _build_subtree_from_spec(spec: Dict[str, Any]) -> JsonDict: """Build a new subtree from a CREATE_NODE spec (Pattern A). Spec keys used: - name (required) - note (optional) - attrs or data (optional, mapped to node['data']) - jewel_id (optional; if omitted, auto-generated) - children (optional list of nested specs) - parent_id (optional, ignored - will be set during insertion) """ if "name" not in spec: raise ValueError("CREATE_NODE spec requires 'name' field in node object") node: JsonDict = {"name": spec["name"]} if "note" in spec: node["note"] = spec["note"] attrs = spec.get("attrs") or spec.get("data") if isinstance(attrs, dict): node["data"] = copy.deepcopy(attrs) # Assign jewel_id (never assign real Workflowy id here) jewel_id = spec.get("jewel_id") if jewel_id is None: jewel_id = _new_jewel_id() else: if jewel_id in existing_ids: raise ValueError(f"CREATE_NODE requested duplicate jewel_id {jewel_id!r}") existing_ids.add(jewel_id) node["jewel_id"] = jewel_id # Recursively build children children_specs = spec.get("children") or [] children_nodes: List[JsonDict] = [] for child_spec in children_specs: if isinstance(child_spec, dict): child_node = _build_subtree_from_spec(child_spec) children_nodes.append(child_node) node["children"] = children_nodes return node def _classify_children_for_destructive_ops(node: JsonDict) -> Dict[str, Any]: """Classify immediate children for destructive JEWELSTORM ops. Returns a dict with: children: list of child dict nodes children_status: str truncated: bool has_loaded_ether_children: bool # child has a real Workflowy id has_new_children: bool # child has no id yet (JEWEL-only) category: "EMPTY" | "JEWEL_ONLY" | "ETHER_ONLY" | "MIXED" """ children_raw = node.get("children") or [] children = [c for c in children_raw if isinstance(c, dict)] children_status = node.get("children_status") or "complete" truncated = children_status in {"truncated_by_depth", "truncated_by_count"} has_loaded_ether_children = any(c.get("id") is not None for c in children) has_new_children = any(c.get("id") is None for c in children) if not children: category = "EMPTY" elif (not has_loaded_ether_children) and has_new_children and not truncated: category = "JEWEL_ONLY" elif has_loaded_ether_children and not has_new_children: category = "ETHER_ONLY" else: category = "MIXED" return { "children": children, "children_status": children_status, "truncated": truncated, "has_loaded_ether_children": has_loaded_ether_children, "has_new_children": has_new_children, "category": category, } # ------- Apply operations ------- applied_count = 0 nodes_created = 0 nodes_deleted = 0 nodes_moved = 0 nodes_renamed = 0 notes_updated = 0 attrs_updated = 0 errors: List[Dict[str, Any]] = [] for idx, op in enumerate(operations or []): op_type_raw = op.get("op") or op.get("operation") if not op_type_raw: err = { "index": idx, "op": op, "code": "MISSING_OP", "message": "Operation missing 'op' field", } errors.append(err) if stop_on_error: break continue op_type = str(op_type_raw).upper() try: # MOVE_NODE if op_type == "MOVE_NODE": src_jid = op.get("jewel_id") if not src_jid: raise ValueError("MOVE_NODE requires 'jewel_id'") position = str(op.get("position", "LAST")).upper() # Normalize synonyms if position == "BOTTOM": position = "LAST" if position == "TOP": position = "FIRST" rel_jid = op.get("relative_to_jewel_id") new_parent_jid = op.get("new_parent_jewel_id") if position in {"BEFORE", "AFTER"}: if not rel_jid: raise ValueError( "MOVE_NODE with position BEFORE/AFTER requires 'relative_to_jewel_id'" ) # Target parent/siblings come from relative node _, rel_parent_jid, rel_siblings = _get_node_and_parent_list(rel_jid) target_parent_jid = rel_parent_jid target_list = rel_siblings else: # FIRST/LAST under explicit parent (or root when parent None) target_parent_jid = new_parent_jid if new_parent_jid is None: target_list = roots else: parent_node = by_jewel_id.get(new_parent_jid) if parent_node is None: raise ValueError(f"New parent jewel_id/id {new_parent_jid!r} not found") target_list = _ensure_children_list(parent_node) # Cycle check _assert_no_cycle(src_jid, target_parent_jid) node, old_parent_jid, old_siblings = _get_node_and_parent_list(src_jid) # Remove from old siblings if node in old_siblings: old_siblings.remove(node) # Insert into new location if position == "FIRST": target_list.insert(0, node) elif position == "LAST": target_list.append(node) elif position in {"BEFORE", "AFTER"}: rel_node, _, _ = _get_node_and_parent_list(rel_jid) # type: ignore[arg-type] try: rel_index = target_list.index(rel_node) except ValueError as e: raise ValueError( f"Relative node {rel_jid!r} not found under chosen parent for MOVE_NODE" ) from e insert_index = rel_index if position == "BEFORE" else rel_index + 1 target_list.insert(insert_index, node) else: raise ValueError(f"Unsupported MOVE_NODE position {position!r}") # Update parent mapping parent_by_jewel_id[src_jid] = target_parent_jid nodes_moved += 1 # DELETE_NODE elif op_type == "DELETE_NODE": jid = op.get("jewel_id") if not jid: raise ValueError("DELETE_NODE requires 'jewel_id'") delete_from_ether = bool(op.get("delete_from_ether")) mode_raw = op.get("mode") mode = str(mode_raw).upper() if mode_raw is not None else "SMART" node, parent_jid, siblings = _get_node_and_parent_list(jid) info = _classify_children_for_destructive_ops(node) children = info["children"] category = info["category"] # Legacy strict mode: preserve original FAIL_IF_HAS_CHILDREN semantics if mode == "FAIL_IF_HAS_CHILDREN": if children: raise ValueError( f"DELETE_NODE {jid!r} refused: node has children and mode=FAIL_IF_HAS_CHILDREN" ) else: # SMART semantics (default when mode not provided) if category == "MIXED": raise ValueError( f"DELETE_NODE {jid!r} refused: mixed new/ETHER/truncated children not supported; " "delete or move children individually first" ) if category == "ETHER_ONLY" and not delete_from_ether: raise ValueError( f"DELETE_NODE {jid!r} refused: node has ETHER-backed children; " "set delete_from_ether=True to delete subtree in Workflowy" ) # EMPTY and JEWEL_ONLY are always allowed. # Remove from siblings and index if node in siblings: siblings.remove(node) _remove_subtree_from_index(node) nodes_deleted += 1 # DELETE_ALL_CHILDREN elif op_type == "DELETE_ALL_CHILDREN": jid = op.get("jewel_id") if not jid: raise ValueError("DELETE_ALL_CHILDREN requires 'jewel_id'") delete_from_ether = bool(op.get("delete_from_ether")) mode_raw = op.get("mode") mode = str(mode_raw).upper() if mode_raw is not None else "SMART" node = by_jewel_id.get(jid) if node is None: raise ValueError(f"Node with jewel_id/id {jid!r} not found") info = _classify_children_for_destructive_ops(node) children = info["children"] category = info["category"] truncated = info["truncated"] # Nothing to do if not children and not truncated: pass else: # For DELETE_ALL_CHILDREN we do not support truncated children sets in v1 if truncated: raise ValueError( f"DELETE_ALL_CHILDREN {jid!r} refused: children set is truncated; " "re-GLIMPSE with full children or delete the node instead" ) if mode == "FAIL_IF_HAS_CHILDREN": if children: raise ValueError( f"DELETE_ALL_CHILDREN {jid!r} refused: node has children and " "mode=FAIL_IF_HAS_CHILDREN" ) else: if category == "MIXED": raise ValueError( f"DELETE_ALL_CHILDREN {jid!r} refused: mixed new/ETHER children not supported; " "delete or move children individually first" ) if category == "ETHER_ONLY" and not delete_from_ether: raise ValueError( f"DELETE_ALL_CHILDREN {jid!r} refused: children are ETHER-backed; " "set delete_from_ether=True to delete them in Workflowy" ) # EMPTY and JEWEL_ONLY are always allowed here. # Allowed path: remove all current children from index and node for child in list(children): _remove_subtree_from_index(child) node["children"] = [] nodes_deleted += len(children) # RENAME_NODE elif op_type == "RENAME_NODE": jid = op.get("jewel_id") if not jid: raise ValueError("RENAME_NODE requires 'jewel_id'") new_name = op.get("new_name") if new_name is None: raise ValueError("RENAME_NODE requires 'new_name'") node = by_jewel_id.get(jid) if node is None: raise ValueError(f"Node with jewel_id/id {jid!r} not found") node["name"] = new_name nodes_renamed += 1 # SET_NOTE elif op_type == "SET_NOTE": jid = op.get("jewel_id") if not jid: raise ValueError("SET_NOTE requires 'jewel_id'") new_note = op.get("new_note") # Allow empty string / None to clear node = by_jewel_id.get(jid) if node is None: raise ValueError(f"Node with jewel_id/id {jid!r} not found") node["note"] = new_note notes_updated += 1 # SET_ATTRS elif op_type == "SET_ATTRS": jid = op.get("jewel_id") if not jid: raise ValueError("SET_ATTRS requires 'jewel_id'") attrs = op.get("attrs") or {} if not isinstance(attrs, dict): raise ValueError("SET_ATTRS 'attrs' must be a dict") node = by_jewel_id.get(jid) if node is None: raise ValueError(f"Node with jewel_id/id {jid!r} not found") data = node.get("data") if not isinstance(data, dict): data = {} allowed_keys = {"completed", "layoutMode", "priority", "tags"} for key, value in attrs.items(): if key not in allowed_keys: raise ValueError(f"Unsupported attr key {key!r} in SET_ATTRS") if value is None: data.pop(key, None) else: data[key] = value if data: node["data"] = data elif "data" in node: del node["data"] attrs_updated += 1 # CREATE_NODE (Pattern A with optional jewel_id) elif op_type == "CREATE_NODE": parent_jid = op.get("parent_jewel_id") position = str(op.get("position", "LAST")).upper() # Normalize synonyms if position == "BOTTOM": position = "LAST" if position == "TOP": position = "FIRST" rel_jid = op.get("relative_to_jewel_id") if parent_jid is None: parent_children = roots target_parent_jid = None else: parent_node = by_jewel_id.get(parent_jid) if parent_node is None: raise ValueError(f"CREATE_NODE parent_jewel_id {parent_jid!r} not found") parent_children = _ensure_children_list(parent_node) target_parent_jid = parent_jid # Build node spec for subtree # Two formats supported: # 1. Compact: node fields at operation level (name, note, children, etc.) # 2. Wrapped: node fields inside "node" key if "node" in op: spec = op["node"] else: # Remove op-specific keys to get node spec spec = {k: v for k, v in op.items() if k not in { "op", "operation", "parent_jewel_id", "position", "relative_to_jewel_id", }} new_node = _build_subtree_from_spec(spec) # Register subtree in indexes with correct parent mapping _register_node(new_node, target_parent_jid) # Insert relative to siblings if position == "FIRST": parent_children.insert(0, new_node) elif position == "LAST": parent_children.append(new_node) elif position in {"BEFORE", "AFTER"}: if not rel_jid: raise ValueError( "CREATE_NODE with position BEFORE/AFTER requires 'relative_to_jewel_id'" ) rel_node, _, rel_siblings = _get_node_and_parent_list(rel_jid) # type: ignore[arg-type] # Force siblings to be the same list as parent_children if rel_siblings is not parent_children: raise ValueError( "CREATE_NODE BEFORE/AFTER relative_to_jewel_id must share the same parent" ) try: rel_index = parent_children.index(rel_node) except ValueError as e: raise ValueError( f"Relative node {rel_jid!r} not found under chosen parent for CREATE_NODE" ) from e insert_index = rel_index if position == "BEFORE" else rel_index + 1 parent_children.insert(insert_index, new_node) else: raise ValueError(f"Unsupported CREATE_NODE position {position!r}") nodes_created += 1 # SET_ATTRS_BY_PATH (path-based attribute update, used for JEWEL UUID injection) elif op_type == "SET_ATTRS_BY_PATH": path = op.get("path") attrs = op.get("attrs") or {} if not isinstance(path, list) or not path: raise ValueError("SET_ATTRS_BY_PATH requires non-empty 'path' list") if not isinstance(attrs, dict): raise ValueError("SET_ATTRS_BY_PATH 'attrs' must be a dict") # Navigate by index path from roots current_list: List[JsonDict] = roots target_node: Optional[JsonDict] = None for level, idx in enumerate(path): if not isinstance(idx, int): raise ValueError( f"SET_ATTRS_BY_PATH path index at position {level} must be int, got {type(idx).__name__}" ) if idx < 0 or idx >= len(current_list): raise ValueError( f"SET_ATTRS_BY_PATH path index {idx} out of range at position {level}" ) target_node = current_list[idx] if level < len(path) - 1: children = target_node.get("children") if not isinstance(children, list): raise ValueError( f"SET_ATTRS_BY_PATH path descends into non-list children at position {level}" ) current_list = children if target_node is None: raise ValueError("SET_ATTRS_BY_PATH could not resolve target node from path") for key, value in attrs.items(): if key == "id": if value is None: target_node.pop("id", None) else: if not isinstance(value, str): raise ValueError( "SET_ATTRS_BY_PATH 'id' value must be a string when not None" ) target_node["id"] = value else: raise ValueError( f"Unsupported attr key {key!r} in SET_ATTRS_BY_PATH (only 'id' is currently allowed)" ) attrs_updated += 1 else: raise ValueError(f"Unknown operation type {op_type!r}") applied_count += 1 except Exception as e: # noqa: BLE001 errors.append( { "index": idx, "op": op, "code": "OP_ERROR", "message": str(e), } ) if stop_on_error: break # ------- Persist changes (if not dry-run and no stop-on-error failure) ------- if not dry_run and (not stop_on_error or not errors): # Attach modified roots back to original structure if isinstance(data, dict) and isinstance(data.get("nodes"), list): data["nodes"] = roots # Recalculate counts for readability in the JEWEL working_gem, # but DO NOT touch children_status here. Truncation semantics belong # to the NEXUS/TERRAIN layer (shimmering/enchanted terrain). wrapper = {"nodes": roots} recalc_all_counts_gem(wrapper) elif isinstance(data, list): data = roots # type: ignore[assignment] wrapper = {"nodes": roots} recalc_all_counts_gem(wrapper) save_json(jewel_file, data) # type: ignore[arg-type] return { "success": len(errors) == 0, "applied_count": applied_count, "dry_run": dry_run, "nodes_created": nodes_created, "nodes_deleted": nodes_deleted, "nodes_moved": nodes_moved, "nodes_renamed": nodes_renamed, "notes_updated": notes_updated, "attrs_updated": attrs_updated, "errors": errors, } def die(msg: str) -> None: print(f"[nexus_json_tools] ERROR: {msg}", file=sys.stderr) sys.exit(1) def load_json(path: str) -> JsonDict: if not os.path.isfile(path): die(f"File not found: {path}") with open(path, "r", encoding="utf-8") as f: try: return json.load(f) except json.JSONDecodeError as e: die(f"Failed to parse JSON from {path}: {e}") def save_json(path: str, data: JsonDict) -> None: tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) f.write("\n") os.replace(tmp, path) def _walk_nodes(nodes: List[JsonDict], parent: Optional[JsonDict]) -> Tuple[JsonDict, Optional[JsonDict]]: for node in nodes: yield node, parent children = node.get("children") or [] if isinstance(children, list): for child, p in _walk_nodes(children, node): yield child, p def find_node_and_parent(data: JsonDict, target_id: str) -> Tuple[Optional[JsonDict], Optional[JsonDict], Optional[List[JsonDict]]]: """Return (node, parent, siblings_list) for a given id, or (None, None, None). siblings_list is the list object that directly contains `node`. For root-level nodes, siblings_list is data["nodes"]. """ nodes = data.get("nodes") or [] if not isinstance(nodes, list): die("JSON root has no 'nodes' list – unexpected SCRY structure") # Root-level: parent=None, siblings=nodes for node in nodes: if node.get("id") == target_id: return node, None, nodes # Nested children for node, parent in _walk_nodes(nodes, None): if node.get("id") == target_id: if parent is None: # Should have been caught above, but guard anyway return node, None, nodes siblings = parent.get("children") or [] if not isinstance(siblings, list): die("Parent has non-list 'children' field – unexpected SCRY structure") return node, parent, siblings return None, None, None def recalc_counts_for_node(node: JsonDict) -> int: """Recalculate child/descendant counts for TERRAIN files (human-focused). Writes *_human_readable_only counters alongside children_status. These counts are intended for human inspection and debugging, not for core WEAVE semantics. Returns this node's total_descendant_count__human_readable_only. """ children = node.get("children") or [] if not isinstance(children, list): children = [] node["children"] = children immediate = len(children) total_desc = 0 for child in children: child_total = recalc_counts_for_node(child) total_desc += 1 + child_total node["immediate_child_count__human_readable_only"] = immediate node["total_descendant_count__human_readable_only"] = total_desc node["children_status"] = "complete" return total_desc def recalc_all_counts(data: JsonDict) -> None: nodes = data.get("nodes") or [] if not isinstance(nodes, list): return for node in nodes: recalc_counts_for_node(node) def recalc_counts_for_node_gem(node: JsonDict) -> int: """Recalculate counts for JEWEL working_gem files without changing children_status. This keeps truncation semantics (children_status) intact for GEM/JEWEL trees, while still giving humans up-to-date counts inside the working_gem. """ children = node.get("children") or [] if not isinstance(children, list): children = [] node["children"] = children immediate = len(children) total_desc = 0 for child in children: if isinstance(child, dict): child_total = recalc_counts_for_node_gem(child) total_desc += 1 + child_total node["immediate_child_count__human_readable_only"] = immediate node["total_descendant_count__human_readable_only"] = total_desc return total_desc def recalc_all_counts_gem(data: JsonDict) -> None: """Best-effort count recalculation for JEWEL working_gem files. Unlike recalc_all_counts, this helper does NOT modify children_status, so GEM files can continue to carry TERRAIN-derived truncation state (truncated_by_depth / truncated_by_count) untouched. """ nodes = data.get("nodes") or [] if not isinstance(nodes, list): return for node in nodes: if isinstance(node, dict): recalc_counts_for_node_gem(node) def cmd_rename_node(args: argparse.Namespace) -> None: data = load_json(args.file) node, parent, siblings = find_node_and_parent(data, args.id) if node is None: die(f"Node id not found: {args.id}") old_name = node.get("name") node["name"] = args.name recalc_all_counts(data) save_json(args.file, data) print(f"[nexus_json_tools] Renamed node {args.id!r}:\n old: {old_name!r}\n new: {args.name!r}") def cmd_set_note(args: argparse.Namespace) -> None: data = load_json(args.file) node, parent, siblings = find_node_and_parent(data, args.id) if node is None: die(f"Node id not found: {args.id}") old_note = node.get("note") node["note"] = args.note recalc_all_counts(data) save_json(args.file, data) print(f"[nexus_json_tools] Updated note for node {args.id!r}.") if args.show_old: print(" OLD note:") print(" " + (old_note or "<None>").replace("\n", "\n ")) if args.show_new: print(" NEW note:") print(" " + args.note.replace("\n", "\n ")) def cmd_move_node(args: argparse.Namespace) -> None: data = load_json(args.file) # 1. Find node and remove from old location node, old_parent, old_siblings = find_node_and_parent(data, args.id) if node is None: die(f"Node id not found: {args.id}") # Remove from old siblings old_siblings[:] = [n for n in old_siblings if n is not node] # 2. Find new parent # Special case: if parent-id is "root", we move to top-level nodes # But typically SCRY has a single export_root. We'll assume args.parent_id matches a node ID. # We need a way to find the new parent node object. # find_node_and_parent returns (node, parent, siblings). # If we search for the parent_id, the "node" returned is the parent we want. new_parent_node, _, _ = find_node_and_parent(data, args.parent_id) if new_parent_node is None: # Check if it matches the export_root_id (the top-level container) if data.get("export_root_id") == args.parent_id: # Moving to root level of the export target_children_list = data["nodes"] else: die(f"New parent node not found: {args.parent_id}") else: # It's a normal node target_children_list = new_parent_node.get("children") if target_children_list is None: target_children_list = [] new_parent_node["children"] = target_children_list # 3. Insert into new location if args.position == "top": target_children_list.insert(0, node) else: target_children_list.append(node) # 4. Recalc counts (globally is safest) recalc_all_counts(data) save_json(args.file, data) print(f"[nexus_json_tools] Moved node {args.id!r} to parent {args.parent_id!r} ({args.position}).") def cmd_extract_shard(args: argparse.Namespace) -> None: """Extract one or more roots (and all their descendants) into a shard JSON. NOTE: This is currently a 2-way shard: it contains only a single "nodes" list. For full 3-way T/S0/S1 semantics, use QUILLSTRIKE on this shard file to obtain a working stone + witness stone pair, then fuse via fuse-shard-3way. """ source_data = load_json(args.file) shard_roots: List[JsonDict] = [] for root_id in args.root_ids: node, _, _ = find_node_and_parent(source_data, root_id) if node is None: die(f"Shard root node not found: {root_id}") # Deep copy to ensure shard is independent shard_roots.append(copy.deepcopy(node)) shard_data: JsonDict = { "shard_source_file": os.path.basename(args.file), "shard_root_ids": args.root_ids, "nodes": shard_roots, } save_json(args.output, shard_data) print(f"[nexus_json_tools] Extracted {len(shard_roots)} roots to shard: {args.output}") def cmd_fuse_shard(args: argparse.Namespace) -> None: """Legacy 2-way fuse: replace root subtrees in target with shard roots. This is effectively a "shard is authoritative for its roots" operation: for each shard root id present in target, the entire subtree is replaced with the shard version. Hidden descendants in the Territory that are not present in the shard will be dropped. For safer, 3-way semantics that preserve hidden children unless explicitly removed/moved relative to a witness shard, use fuse-shard-3way instead. """ shard_data = load_json(args.shard_file) target_data = load_json(args.target_scry) updates = 0 # For each root in the shard, replace the corresponding node in target shard_nodes = shard_data.get("nodes", []) for shard_node in shard_nodes: s_id = shard_node.get("id") if not s_id: continue # Find matching node in target target_node, _, _ = find_node_and_parent(target_data, s_id) if target_node is None: print( f"[nexus_json_tools] Warning: Shard root {s_id} not found in target. " "Skipping (creation of new roots via fuse not yet supported)." ) continue # UPDATE IN PLACE: replace the contents of target_node with shard_node, # preserving the parent/children list references that hold it. target_node.clear() target_node.update(shard_node) updates += 1 recalc_all_counts(target_data) save_json(args.target_scry, target_data) print(f"[nexus_json_tools] Fused shard into target. Updated {updates} root subtrees.") def _index_shard_roots(roots: List[JsonDict]) -> Tuple[Dict[str, JsonDict], Dict[str, Optional[str]], Dict[str, List[str]]]: """Build id -> node, id -> parent_id, and id -> child_ids for a shard tree.""" by_id: Dict[str, JsonDict] = {} parent: Dict[str, Optional[str]] = {} children_ids: Dict[str, List[str]] = {} def walk(node: JsonDict, parent_id: Optional[str]) -> None: nid = node.get("id") if nid: by_id[nid] = node parent[nid] = parent_id kids = node.get("children") or [] if isinstance(kids, list) and nid: children_ids[nid] = [c.get("id") for c in kids if isinstance(c, dict) and c.get("id")] for c in kids or []: if isinstance(c, dict): walk(c, nid) for root in roots or []: if isinstance(root, dict): walk(root, None) return by_id, parent, children_ids def _index_territory(data: JsonDict) -> Tuple[Dict[str, JsonDict], Dict[str, Optional[str]], Dict[str, List[JsonDict]]]: """Build id -> node, id -> parent_id, id -> children_list for the Territory.""" by_id: Dict[str, JsonDict] = {} parent: Dict[str, Optional[str]] = {} children_lists: Dict[str, List[JsonDict]] = {} nodes = data.get("nodes") or [] def walk(node: JsonDict, parent_id: Optional[str]) -> None: nid = node.get("id") if nid: by_id[nid] = node parent[nid] = parent_id kids = node.get("children") or [] if isinstance(kids, list) and nid: children_lists[nid] = kids for c in kids or []: if isinstance(c, dict): walk(c, nid) for root in nodes: if isinstance(root, dict): walk(root, None) return by_id, parent, children_lists def _register_subtree_in_indexes( node: JsonDict, parent_id: Optional[str], t_by_id: Dict[str, JsonDict], t_parent: Dict[str, Optional[str]], t_children_lists: Dict[str, List[JsonDict]], ) -> None: if not isinstance(node, dict): return nid = node.get("id") if nid: t_by_id[nid] = node t_parent[nid] = parent_id children = node.get("children") or [] if nid and isinstance(children, list): t_children_lists[nid] = children for child in children: if isinstance(child, dict): _register_subtree_in_indexes(child, nid, t_by_id, t_parent, t_children_lists) def _remove_subtree_from_indexes( node: JsonDict, t_by_id: Dict[str, JsonDict], t_parent: Dict[str, Optional[str]], t_children_lists: Dict[str, List[JsonDict]], ) -> None: if not isinstance(node, dict): return for child in node.get("children") or []: if isinstance(child, dict): _remove_subtree_from_indexes(child, t_by_id, t_parent, t_children_lists) nid = node.get("id") if nid: t_by_id.pop(nid, None) t_parent.pop(nid, None) t_children_lists.pop(nid, None) def _expand_with_ancestors(ids: Set[str], parent_map: Dict[str, Optional[str]]) -> None: queue = list(ids) while queue: nid = queue.pop() parent_id = parent_map.get(nid) if parent_id and parent_id not in ids: ids.add(parent_id) queue.append(parent_id) def _collect_subtree_ids(root_id: str, children_ids: Dict[str, List[str]]) -> List[str]: seen: List[str] = [] stack: List[str] = [root_id] while stack: nid = stack.pop() if nid in seen: continue seen.append(nid) for cid in children_ids.get(nid, []): stack.append(cid) return seen def _compute_delete_roots(delete_ids: List[str], parent_map: Dict[str, Optional[str]]) -> List[str]: """Given a set of ids to delete and a parent map from S0, compute branch roots. This mirrors the logic used in workflowy_move_reconcile: we only need to explicitly delete the highest ancestors; their descendants go with them. """ to_delete_set = set(delete_ids) roots: set[str] = set() for d in to_delete_set: cur = parent_map.get(d) is_root = True while cur is not None: if cur in to_delete_set: is_root = False break cur = parent_map.get(cur) if is_root: roots.add(d) return list(roots) def _recalc_counts_preserving_truncation( territory: JsonDict, affected_ids: Set[str], original_status: Dict[str, Optional[str]], original_truncated: Dict[str, bool], ) -> None: """Recalculate *_human_readable_only counts while preserving truncation. This helper is used after destructive operations (e.g. 3-way shard fuse) to refresh the human-facing counters while keeping children_status aligned with the original TERRAIN semantics. No functional behavior depends on the *_human_readable_only fields; they are for inspection only. """ if not affected_ids: return def helper(node: JsonDict) -> int: children = node.get("children") or [] total = 0 for child in children: if isinstance(child, dict): total += 1 + helper(child) nid = node.get("id") if nid and nid in affected_ids: node["immediate_child_count__human_readable_only"] = len(children) node["total_descendant_count__human_readable_only"] = total if original_truncated.get(nid, False): prev = original_status.get(nid) if prev and prev != "complete": node["children_status"] = prev else: node["children_status"] = "incomplete" else: node["children_status"] = "complete" return total for root in territory.get("nodes") or []: if isinstance(root, dict): helper(root) def cmd_fuse_shard_3way(args: argparse.Namespace) -> None: """3-way SHARD FUSE: merge S1 into Territory using S0 as witness. Inputs: - Territory T: args.target_scry (full SCRY JSON) - Witness S0: args.witness_shard (original shard after BURN THE LENS) - Morphed S1: args.morphed_shard (edited shard after REFRACTION) Semantics (per root): - Nodes never present in S0 are treated as *hidden* Territory nodes and are always preserved (not deleted or moved). - Nodes present in S0 but not in S1 are treated as explicit deletions: the corresponding subtrees in Territory are removed. - Nodes present in S1 but not in S0 are treated as creations: new subtrees are added under the parents indicated by S1. - Nodes present in both S0 and S1 are updated and possibly reparented to match S1's structure. Sibling order for these "visible" children follows S1, while hidden children keep their original relative order and remain grouped ahead of the visible block for that parent. This guarantees: - No unintended orphaning of hidden descendants. - Explicit delete/move operations are detected as differences between S0 and S1, not inferred from absence in a shallow shard alone. """ log_jewel( f"fuse-shard-3way: target={args.target_scry}, " f"witness={args.witness_shard}, morphed={args.morphed_shard}" ) territory = load_json(args.target_scry) witness = load_json(args.witness_shard) morphed = load_json(args.morphed_shard) witness_roots = witness.get("nodes") or [] morphed_roots = morphed.get("nodes") or [] if not isinstance(witness_roots, list) or not isinstance(morphed_roots, list): die("Both witness and morphed shards must have a 'nodes' list") # Index S0, S1, and Territory s0_by_id, s0_parent, s0_children_ids = _index_shard_roots(witness_roots) s1_by_id, s1_parent, s1_children_ids = _index_shard_roots(morphed_roots) t_by_id, t_parent, t_children_lists = _index_territory(territory) # --- Invariant checks: enforce correct NEXUS pipeline before fusing --- ids_T1: Set[str] = set(t_by_id.keys()) ids_S0: Set[str] = set(s0_by_id.keys()) ids_S1: Set[str] = set(s1_by_id.keys()) # (A) GEM must be fully embedded into TERRAIN: every GEM id should be # present in shimmering_terrain.json. If not, it likely means IGNITE SHARDS # was run after ATTACH GEMS (or ATTACH GEMS was skipped entirely). missing_in_T1 = ids_S0 - ids_T1 if missing_in_T1: msg = ( "nexus_anchor_jewels / fuse-shard-3way invariant violation: " "phantom_gem (S0) contains Workflowy ids that are not present in " "shimmering_terrain (T1). This usually means you ran IGNITE SHARDS " "after ATTACH GEMS or never called ATTACH GEMS for this tag." ) log_jewel(msg + f" Offending ids (sample): {sorted(list(missing_in_T1))[:5]}") die(msg) # (B) JEWEL must not introduce new Workflowy ids that were never in the GEM. # New JEWEL nodes should be id-less; WEAVE will create real ids in ETHER. extra_S1_vs_S0 = ids_S1 - ids_S0 if extra_S1_vs_S0: msg = ( "nexus_anchor_jewels / fuse-shard-3way invariant violation: " "phantom_jewel (S1) contains Workflowy ids that do not appear in " "phantom_gem (S0). JEWELSTORM must not introduce new Workflowy ids; " "only id-less nodes are allowed as new children. Did you run a new " "SCRY/IGNITE or copy ids from another tree after capturing the GEM?" ) log_jewel(msg + f" Offending ids (sample): {sorted(list(extra_S1_vs_S0))[:5]}") die(msg) original_status: Dict[str, Optional[str]] = {} original_truncated: Dict[str, bool] = {} for nid, node in t_by_id.items(): if not nid: continue status = node.get("children_status") # Truncation semantics belong to the TERRAIN layer and are encoded via # children_status. We no longer infer truncation from count fields; the # *_human_readable_only counters are purely informational. original_status[nid] = status original_truncated[nid] = bool(status and status != "complete") affected_ids: Set[str] = set() # Determine which roots to process: intersection of root_ids present in both S0 and S1 s0_root_ids = [n.get("id") for n in witness_roots if isinstance(n, dict) and n.get("id")] s1_root_ids = {n.get("id") for n in morphed_roots if isinstance(n, dict) and n.get("id")} root_ids = [rid for rid in s0_root_ids if rid in s1_root_ids] if not root_ids: die("No overlapping shard roots between witness and morphed shards") updates = 0 creates = 0 deletes = 0 moves = 0 reorders = 0 for root_id in root_ids: if root_id not in t_by_id: print(f"[nexus_json_tools] Warning: root {root_id} not found in Territory; skipping") continue # Collect subtree ids for this root in S0 and S1 ids_s0 = set(_collect_subtree_ids(root_id, s0_children_ids)) if root_id in s0_by_id else set() ids_s1 = set(_collect_subtree_ids(root_id, s1_children_ids)) if root_id in s1_by_id else set() # Partition ids for this shard root delete_ids = list(ids_s0 - ids_s1) new_ids = ids_s1 - ids_s0 common_ids = ids_s0 & ids_s1 # Compute S1 depths for this shard root so that, if S1 ever introduces # new nodes *with* real Workflowy ids, we create parents before # children. In the canonical JEWELSTORM flow, new nodes are id-less and # handled separately below; this depth map is a forward-compatible # safeguard. depth_s1: Dict[str, int] = {} stack: List[Tuple[str, int]] = [(root_id, 0)] if root_id in ids_s1 else [] while stack: cur_id, d = stack.pop() if cur_id in depth_s1: continue depth_s1[cur_id] = d for cid in s1_children_ids.get(cur_id, []): if cid in ids_s1: stack.append((cid, d + 1)) # --- CREATE: nodes present in S1 but not in S0 (id-based) --- # Process in top-down order so that any new parent ids are created # before their new children. ordered_new_ids = sorted(new_ids, key=lambda nid: depth_s1.get(nid, 0)) for nid in ordered_new_ids: s1_node = s1_by_id.get(nid) if s1_node is None: continue parent_id = s1_parent.get(nid) if parent_id is None: parent_children = territory.get("nodes") if not isinstance(parent_children, list): parent_children = [] territory["nodes"] = parent_children else: parent_node = t_by_id.get(parent_id) if parent_node is None: print( f"[nexus_json_tools] Warning: new node {nid} has parent {parent_id} " "which is not present in Territory; skipping creation." ) continue parent_children = parent_node.get("children") if not isinstance(parent_children, list): parent_children = [] parent_node["children"] = parent_children t_children_lists[parent_id] = parent_children new_node = copy.deepcopy(s1_node) parent_children.append(new_node) _register_subtree_in_indexes(new_node, parent_id, t_by_id, t_parent, t_children_lists) affected_ids.update(filter(None, [nid, parent_id])) creates += 1 # --- CREATE: id-less direct children from S1 under existing parents --- # These are new subtrees introduced by JEWELSTORM that intentionally # have no Workflowy id yet. We append them to the corresponding Territory # parents so the reconciliation algorithm can CREATE them. for p, s1_parent_node in s1_by_id.items(): if p not in ids_s1: continue t_parent_node = t_by_id.get(p) if t_parent_node is None: continue t_children = t_parent_node.get("children") if not isinstance(t_children, list): t_children = [] t_parent_node["children"] = t_children t_children_lists[p] = t_children else: t_children_lists.setdefault(p, t_children) # Build a simple signature for existing id-less children to avoid # obvious duplicates when re-running fuse on the same Territory. existing_signatures: set[tuple[Any, Any]] = set() for child in t_children: if not isinstance(child, dict) or child.get("id") is not None: continue existing_signatures.add((child.get("name"), child.get("note"))) for child in s1_parent_node.get("children") or []: if not isinstance(child, dict) or child.get("id") is not None: continue sig = (child.get("name"), child.get("note")) if sig in existing_signatures: continue new_child = copy.deepcopy(child) t_children.append(new_child) existing_signatures.add(sig) affected_ids.add(p) creates += 1 # --- MOVE: nodes in both S0 and S1 whose parent changed --- for nid in common_ids: s1_p = s1_parent.get(nid) t_p = t_parent.get(nid) if s1_p == t_p: continue if t_p is None: old_siblings = territory.get("nodes") or [] else: old_siblings = t_children_lists.get(t_p) or [] old_siblings[:] = [n for n in old_siblings if not (isinstance(n, dict) and n.get("id") == nid)] if t_p is not None: t_children_lists[t_p] = old_siblings if s1_p is None: new_siblings = territory.get("nodes") if not isinstance(new_siblings, list): new_siblings = [] territory["nodes"] = new_siblings else: new_parent_node = t_by_id.get(s1_p) if new_parent_node is None: print( f"[nexus_json_tools] Warning: cannot reparent {nid} to {s1_p} " "(parent not found in Territory); leaving in place." ) continue new_siblings = new_parent_node.get("children") if not isinstance(new_siblings, list): new_siblings = [] new_parent_node["children"] = new_siblings t_children_lists[s1_p] = new_siblings node_obj = t_by_id.get(nid) if node_obj is None: continue new_siblings.append(node_obj) t_parent[nid] = s1_p affected_ids.update(filter(None, [nid, t_p, s1_p])) moves += 1 # --- DELETE: nodes present in S0 but not in S1 (after moves) --- delete_roots = _compute_delete_roots(delete_ids, s0_parent) for nid in delete_roots: t_node = t_by_id.get(nid) if t_node is None: continue parent_id = t_parent.get(nid) if parent_id is None: siblings = territory.get("nodes") or [] else: siblings = t_children_lists.get(parent_id) or [] siblings[:] = [n for n in siblings if not (isinstance(n, dict) and n.get("id") == nid)] _remove_subtree_from_indexes(t_node, t_by_id, t_parent, t_children_lists) if parent_id: affected_ids.add(parent_id) deletes += 1 # --- REORDER: align order of visible children with S1, keep hidden + id-less children --- parents_to_consider = {s1_parent[nid] for nid in ids_s1} | {root_id} parents_to_consider = {p for p in parents_to_consider if p is not None and p in t_by_id} for p in parents_to_consider: desired_ids = s1_children_ids.get(p, []) if not desired_ids: continue children_list = t_children_lists.get(p) if children_list is None: parent_node = t_by_id[p] new_children_list: List[JsonDict] = [] for cid in desired_ids: child_node = t_by_id.get(cid) if child_node is not None: new_children_list.append(child_node) parent_node["children"] = new_children_list t_children_lists[p] = new_children_list if new_children_list: reorders += 1 affected_ids.add(p) continue hidden_children: List[JsonDict] = [] for child in children_list: if not isinstance(child, dict): continue cid = child.get("id") # Preserve id-less children (new JEWELSTORM subtrees and hidden Territory # nodes that never had an id in the shard) as part of the hidden block. if cid is None: hidden_children.append(child) continue # Also preserve Territory nodes that are outside the S0/S1 shard entirely. if cid not in ids_s0 and cid not in ids_s1: hidden_children.append(child) new_children_list: List[JsonDict] = [] new_children_list.extend(hidden_children) for cid in desired_ids: child_node = t_by_id.get(cid) if child_node is not None and child_node not in new_children_list: new_children_list.append(child_node) if new_children_list != children_list: parent_node = t_by_id[p] parent_node["children"] = new_children_list t_children_lists[p] = new_children_list reorders += 1 affected_ids.add(p) # --- UPDATE content for common ids --- for nid in common_ids: t_node = t_by_id.get(nid) s1_node = s1_by_id.get(nid) if t_node is None or s1_node is None: continue t_node["name"] = s1_node.get("name") t_node["note"] = s1_node.get("note") t_node["data"] = s1_node.get("data") t_node["completed"] = bool(s1_node.get("completed", False)) affected_ids.add(nid) updates += 1 # Expand affected set to include ancestors so ancestor counts stay correct affected_ids = {nid for nid in affected_ids if nid} _expand_with_ancestors(affected_ids, t_parent) _recalc_counts_preserving_truncation(territory, affected_ids, original_status, original_truncated) save_json(args.target_scry, territory) summary_msg = ( "[nexus_json_tools] 3-way fuse complete: " f"updates={updates}, creates={creates}, deletes={deletes}, moves={moves}, reorders={reorders}." ) print(summary_msg) log_jewel(summary_msg) def cmd_delete_node(args: argparse.Namespace) -> None: data = load_json(args.file) node, parent, siblings = find_node_and_parent(data, args.id) if node is None or siblings is None: die(f"Node id not found: {args.id}") # Remove the node from its siblings list before_len = len(siblings) siblings[:] = [n for n in siblings if n is not node] after_len = len(siblings) if before_len == after_len: die(f"Internal error: node {args.id} not actually present in siblings list") recalc_all_counts(data) save_json(args.file, data) print(f"[nexus_json_tools] Deleted node {args.id!r} and its subtree.") def cmd_burn_gem(args: argparse.Namespace) -> None: """Rehydrate selected roots from a full SCRY and extract a full GEM shard. Inputs: - args.file = Territory SCRY JSON (possibly truncated by max_depth / child_count_limit) - --full-scry = Full-depth SCRY JSON for the same Workflowy tree (or at least for the selected roots). - --root-ids = One or more node UUIDs identifying GEM roots (these IDs must appear in BOTH Territory and full-scry). - --output-shard = Path for GEM shard JSON (S0 for these roots). Behavior: - For each root_id, we locate its subtree in the full SCRY and deep-copy that subtree into two places: 1) Territory: we replace the corresponding truncated subtree under that root in Territory, effectively rehydrating it. 2) GEM shard: we append the subtree to a new shard JSON (S0) that will be used as the witness shard for 3-way fuse. - After this, the Territory JSON now contains full subtrees for the GEM roots, and the GEM shard is a full-depth snapshot of those same subtrees. This allows a workflow of: 1) Coarse SCRY (small depth/width) → Territory. 2) Full SCRY for a few selected roots → full-scry.json. 3) burn-gem over those roots to splice full GEMs into Territory and extract GEM shard S0. 4) Edit GEM shard (S1) deeply. 5) Run fuse-shard-3way (T/S0/S1) and then WEAVE. """ territory = load_json(args.file) full_scry = load_json(args.full_scry) gem_roots: List[JsonDict] = [] for root_id in args.root_ids: # 1) Locate full subtree in full_scry full_node, full_parent, full_siblings = find_node_and_parent(full_scry, root_id) if full_node is None: die(f"[burn-gem] Root id {root_id!r} not found in full-scry JSON {args.full_scry}") full_subtree = copy.deepcopy(full_node) gem_roots.append(copy.deepcopy(full_subtree)) # 2) Locate corresponding node in Territory and replace its subtree terr_node, terr_parent, terr_siblings = find_node_and_parent(territory, root_id) if terr_node is None or terr_siblings is None: die(f"[burn-gem] Root id {root_id!r} not found in Territory JSON {args.file}") # Replace node in its siblings list replaced = False for i, n in enumerate(terr_siblings): if isinstance(n, dict) and n.get("id") == root_id: terr_siblings[i] = full_subtree replaced = True break if not replaced: die(f"[burn-gem] Internal error: unable to replace root {root_id!r} in Territory siblings list") # Save updated Territory (hydrated for these GEM roots) save_json(args.file, territory) # Save GEM shard (S0) containing all rehydrated roots shard_data: JsonDict = { "shard_source_file": os.path.basename(args.full_scry), "shard_root_ids": args.root_ids, "nodes": gem_roots, } save_json(args.output_shard, shard_data) print( f"[nexus_json_tools] burn-gem complete: hydrated {len(args.root_ids)} roots into Territory " f"and wrote GEM shard to {args.output_shard!r}." ) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Manipulate NEXUS SCRY JSON files (rename/set-note/delete/move/shard fuse).", ) parser.add_argument( "file", help="Path to SCRY JSON file (e.g., qm-XXXX-...--nexus_*.json). Not used for all commands.", ) subparsers = parser.add_subparsers(dest="command", required=True) # rename-node p_rename = subparsers.add_parser( "rename-node", help="Rename a node by id (update its 'name' field)", ) p_rename.add_argument("--id", required=True, help="Node UUID to rename") p_rename.add_argument("--name", required=True, help="New name for the node") p_rename.set_defaults(func=cmd_rename_node) # set-note p_note = subparsers.add_parser( "set-note", help="Set/replace the 'note' field for a node", ) p_note.add_argument("--id", required=True, help="Node UUID whose note to update") p_note.add_argument( "--note", required=True, help="New note text (use \n for newlines; JSON will store real newlines)", ) p_note.add_argument( "--show-old", action="store_true", help="Print the previous note value for inspection", ) p_note.add_argument( "--show-new", action="store_true", help="Print the new note value after change", ) p_note.set_defaults(func=cmd_set_note) # delete-node p_del = subparsers.add_parser( "delete-node", help="Delete a node (and its subtree) by id", ) p_del.add_argument("--id", required=True, help="Node UUID to delete") p_del.set_defaults(func=cmd_delete_node) # move-node p_move = subparsers.add_parser( "move-node", help="Move a node to a new parent within the SCRY JSON", ) p_move.add_argument("--id", required=True, help="Node UUID to move") p_move.add_argument("--parent-id", required=True, help="New parent UUID (or export_root_id for top-level)") p_move.add_argument( "--position", choices=["top", "bottom"], default="bottom", help="Position in new parent's children list", ) p_move.set_defaults(func=cmd_move_node) # extract-shard (2-way shard; S0 captured in a single file) p_extract = subparsers.add_parser( "extract-shard", help="Extract a subset of nodes (and their descendants) to a new JSON file", ) p_extract.add_argument("--root-ids", required=True, nargs="+", help="List of Node UUIDs to be the roots of the shard") p_extract.add_argument("--output", required=True, help="Output path for the shard JSON") p_extract.set_defaults(func=cmd_extract_shard) # burn-gem (rehydrate selected roots from full SCRY and extract GEM shard) p_gem = subparsers.add_parser( "burn-gem", help="Rehydrate selected roots from a full SCRY into Territory and extract a GEM shard", ) p_gem.add_argument( "--full-scry", required=True, help="Path to full-depth SCRY JSON for GEM roots (usually produced by nexus_scry with generous depth/child limits)", ) p_gem.add_argument( "--root-ids", required=True, nargs="+", help="List of Node UUIDs to treat as GEM roots (must exist in both Territory and full-scry)", ) p_gem.add_argument( "--output-shard", required=True, help="Output path for the GEM shard JSON (S0)", ) p_gem.set_defaults(func=cmd_burn_gem) # fuse-shard (legacy 2-way replacement) p_fuse = subparsers.add_parser( "fuse-shard", help="Legacy 2-way: replace root subtrees in target with shard roots", ) p_fuse.add_argument("--shard-file", required=True, help="Path to the modified shard JSON") p_fuse.add_argument("--target-scry", required=True, help="Path to the target SCRY JSON to update") p_fuse.set_defaults(func=cmd_fuse_shard) # fuse-shard-3way (new T/S0/S1 aware fuse) p_fuse3 = subparsers.add_parser( "fuse-shard-3way", help="3-way SHARD FUSE: Territory + witness shard S0 + morphed shard S1", ) p_fuse3.add_argument( "--witness-shard", required=True, help="Path to original shard JSON (S0) captured right after BURN THE LENS", ) p_fuse3.add_argument( "--morphed-shard", required=True, help="Path to modified shard JSON (S1) after REFRACTION/QUILLSTRIKE", ) p_fuse3.add_argument( "--target-scry", required=True, help="Path to the Territory SCRY JSON to update (T)", ) p_fuse3.set_defaults(func=cmd_fuse_shard_3way) return parser def main(argv: Optional[list[str]] = None) -> None: parser = build_parser() args = parser.parse_args(argv) args.func(args) if __name__ == "__main__": # pragma: no cover main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server