Skip to main content
Glama

workflowy_etch

Create multiple hierarchical nodes directly from JSON data in WorkFlowy without file intermediaries, enabling structured content organization and task management.

Instructions

Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
parent_idYes
nodesNo
replace_allNo
nodes_fileNo

Implementation Reference

  • The core handler function that implements the workflowy_etch tool logic: parses nodes from JSON or file, validates/escapes content, handles replace_all or additive modes, skips existing nodes by name, recursively creates hierarchical node trees via API calls with retries and rate limiting.
    async def workflowy_etch( self, parent_id: str, nodes: list[dict[str, Any]] | str, replace_all: bool = False, nodes_file: str | None = None, ) -> dict[str, Any]: """Create multiple nodes from JSON structure (ETCH command). Args: parent_id: Target Workflowy parent id. nodes: Either a parsed list[dict] structure or a JSON string representing that list. Ignored when nodes_file is provided. replace_all: If True, delete all existing children under parent_id first; otherwise additive. nodes_file: Optional path to a file containing ETCH payload. - If the file contains a JSON object with a top-level "nodes" key, that value is used. - Else, the entire file content is treated as a JSON string to be parsed the same way as the "nodes" string parameter (including autofix strategies). """ import asyncio logger = _ClientLogger() # Optional file-based payload: if nodes_file is provided, it wins # over the inline "nodes" argument. if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: file_text = f.read() # Try JSON object with top-level "nodes" first try: maybe_json = json.loads(file_text) if isinstance(maybe_json, dict) and "nodes" in maybe_json: nodes = maybe_json["nodes"] else: # Not a dict-with-nodes; treat entire file_text as # the stringified nodes payload nodes = file_text logger.info( f"📄 workflowy_etch: loaded nodes from file '{nodes_file}'" ) except json.JSONDecodeError: # Not valid JSON as a whole; assume raw JSON string # payload for nodes and let the normal string handler # below deal with it (including autofix). nodes = file_text logger.info( f"📄 workflowy_etch: using raw file text from '{nodes_file}'" ) except Exception as e: error_msg = ( f"Failed to read nodes_file '{nodes_file}': {e}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } # Auto-fix stringified JSON stringify_strategy_used = None if isinstance(nodes, str): logger.warning("⚠️ Received stringified JSON - attempting parse strategies") # Strategy 1: direct json.loads() on the full string try: parsed = json.loads(nodes) nodes = parsed stringify_strategy_used = "Strategy 1: Direct json.loads()" logger.info(f"✅ {stringify_strategy_used}") except json.JSONDecodeError as e: # Dan debug note: # In practice we've seen cases where the string is a valid JSON # array followed by a few stray characters (e.g. the outer '}' # from the request body). json.loads() raises # JSONDecodeError('Extra data', ...). # # Rather than naively stripping N characters, we do a # _structure-aware_ trim: # - Find the last closing bracket ']' in the string. # - If it exists, take everything up to and including that # bracket and try json.loads() again. # - This preserves the full array and only discards trailing # junk after a syntactically complete list literal. # # We only apply this once; if it still fails, we surface a # clear error and log to etch_debug.log. last_bracket = nodes.rfind("]") if last_bracket != -1: candidate = nodes[: last_bracket + 1] try: parsed2 = json.loads(candidate) nodes = parsed2 stringify_strategy_used = "Strategy 2: Trim after last ']' (recover from trailing junk)" self._log_to_file( "STRINGIFY AUTOFIX: Strategy 2 used. " f"Original len={len(nodes)}, trimmed_len={len(candidate)}. " f"Original JSONDecodeError={repr(e)}", "etch", ) logger.info(f"✅ {stringify_strategy_used}") except json.JSONDecodeError as e2: # Still not valid JSON even after trimming at the # last ']'. At this point we give up and return a # structured error so the caller can see it in the # MCP tool response. error_msg = ( "Failed to parse stringified JSON after Strategy 1 " "and Strategy 2. " f"Strategy 1 error={repr(e)}, Strategy 2 error={repr(e2)}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } else: # No closing bracket at all; there's nothing safe to trim # against. Log and fail clearly. error_msg = ( "Failed to parse stringified JSON: no closing ']' " f"found. Original error={repr(e)}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } # Validate nodes is a list if not isinstance(nodes, list): return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": ["Parameter 'nodes' must be a list"] } # Validation checkpoint - NOTE fields only def validate_and_escape_nodes_recursive( nodes_list: list[dict[str, Any]], path: str = "root" ) -> tuple[bool, str | None, list[str]]: """Recursively validate and auto-escape NAME and NOTE fields.""" warnings = [] for idx, node in enumerate(nodes_list): node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}" # Validate NAME name = node.get('name') if not isinstance(name, str) or not name.strip(): return ( False, f"Node: {node_path}\n\nName must be non-empty string.", warnings, ) processed_name, name_warning = self._validate_name_field(name) if processed_name is not None: node['name'] = processed_name if name_warning: warnings.append(f"{node_path} - Name escaped") # Validate NOTE note = node.get('note') if note: processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False) if processed_note is None and note_warning: return (False, f"Node: {node_path}\n\n{note_warning}", warnings) node['note'] = processed_note if note_warning and "AUTO-ESCAPED" in note_warning: warnings.append(f"{node_path} - Note escaped") # Recurse children = node.get('children', []) if children: success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path) if not success: return (False, error_msg, warnings) warnings.extend(child_warnings) return (True, None, warnings) success, error_msg, warnings = validate_and_escape_nodes_recursive(nodes) if not success: return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg or "Validation failed"] } if warnings: logger.info(f"✅ Auto-escaped {len(warnings)} node(s)") # Stats tracking stats = { "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "nodes_created": 0, "skipped": 0, "errors": [] } # REPLACE_ALL or DEFAULT mode if replace_all: logger.info("🗑️ replace_all=True - Deleting all existing children") try: request = NodeListRequest(parentId=parent_id) existing_children, _ = await self.list_nodes(request) stats["api_calls"] += 1 for child in existing_children: try: await self.delete_node(child.id) logger.info(f" Deleted: {child.nm}") stats["api_calls"] += 1 except Exception as e: logger.warning(f" Failed to delete {child.nm}: {e}") except Exception as e: logger.warning(f"Could not list/delete existing: {e}") nodes_to_create = nodes existing_names = set() else: # Additive mode - skip existing by name try: request = NodeListRequest(parentId=parent_id) existing_children, _ = await self.list_nodes(request) stats["api_calls"] += 1 existing_names = {child.nm.strip() for child in existing_children if child.nm} nodes_to_create = [ node for node in nodes if node.get('name', '').strip() not in existing_names ] stats["skipped"] = len(nodes) - len(nodes_to_create) if stats["skipped"] > 0: logger.info(f"📝 Skipped {stats['skipped']} existing node(s)") if not nodes_to_create: return { "success": True, "nodes_created": 0, "root_node_ids": [], "skipped": stats["skipped"], "api_calls": stats["api_calls"], "message": "All nodes already exist - nothing to create" } except Exception as e: logger.warning(f"Could not check existing: {e}") nodes_to_create = nodes existing_names = set() # Create tree recursively async def create_tree(parent_id: str, nodes: list[dict[str, Any]]) -> list[str]: """Recursively create node tree.""" created_ids = [] for node_data in nodes: try: node_name = node_data['name'] if not replace_all and node_name in existing_names: stats["skipped"] += 1 continue request = NodeCreateRequest( name=node_name, parent_id=parent_id, note=node_data.get('note'), layoutMode=node_data.get('layout_mode'), position=node_data.get('position', 'bottom') ) # Create with retry (internal call) node = await self.create_node(request, _internal_call=True) if node: created_ids.append(node.id) stats["nodes_created"] += 1 self._log_to_file(f" Created: {node_name} ({node.id})", "etch") # Recursively create children if 'children' in node_data and node_data['children']: await create_tree(node.id, node_data['children']) except Exception as e: error_msg = f"Failed to create '{node_data.get('name', 'unknown')}': {str(e)}" logger.error(error_msg) stats["errors"].append(error_msg) continue return created_ids try: self._log_to_file(f"ETCH start (replace_all={replace_all}) parent={parent_id}", "etch") root_ids = await create_tree(parent_id, nodes_to_create) self._log_to_file(f"ETCH complete: {stats['nodes_created']} created", "etch") result = { "success": len(stats["errors"]) == 0, "nodes_created": stats["nodes_created"], "root_node_ids": root_ids, "api_calls": stats["api_calls"], "retries": stats["retries"], "errors": stats["errors"] } if not replace_all: result["skipped"] = stats["skipped"] if stringify_strategy_used: result["_stringify_autofix"] = stringify_strategy_used # Mark parent dirty if result.get("success", False): try: self._mark_nodes_export_dirty([parent_id]) except Exception: pass return result except Exception as e: error_msg = f"ETCH failed: {str(e)}" log_event(error_msg, "ETCH") stats["errors"].append(error_msg) return { "success": False, "nodes_created": stats["nodes_created"], "errors": stats["errors"] }
  • FastMCP tool registration decorator and wrapper function that exposes the workflowy_etch tool to MCP clients, handling parameter resolution (nodes as list/string/file), forwarding to client.workflowy_etch, and returning results.
    @mcp.tool( name="workflowy_etch", description="Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation." ) async def etch( parent_id: str, nodes: list[dict] | str | None = None, replace_all: bool = False, nodes_file: str | None = None, ) -> dict: """Create multiple nodes from JSON structure. ETCH command - simple additive node creation (no UUIDs, no updates/moves). Fallback: If this fails, use INSCRIBE scroll (write_file → bulk_import). DEFAULT: Additive (skip existing by name, add new children only) REPLACE: Wipe all children, create fresh For complex operations (moves/updates with UUIDs): Use NEXUS scroll instead. Args: parent_id: Parent UUID where nodes should be created nodes: List of node objects (NO UUIDs - just name/note/children) replace_all: If True, delete ALL existing children first. Default False. Returns: Dictionary with success status, nodes created, skipped (if append_only), API call stats, and errors """ client = get_client() # Resolve nodes source: direct value or file-based payload_nodes: list[dict] | str | None = nodes used_nodes_file = False if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: payload_nodes = f.read() used_nodes_file = True except Exception as e: return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [ f"Failed to read nodes_file '{nodes_file}': {str(e)}", "Hint: Ensure the path is correct and accessible from the MCP server", ], } elif isinstance(payload_nodes, str): # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such. candidate = payload_nodes.strip() if os.path.exists(candidate) and candidate.lower().endswith(".json"): try: with open(candidate, "r", encoding="utf-8") as f: payload_nodes = f.read() used_nodes_file = True nodes_file = candidate log_event( f"workflowy_etch: treating 'nodes' string as nodes_file path -> {candidate}", "ETCH", ) except Exception as e: # Fall back to treating it as JSON; let the ETCH parser report a clear error log_event( f"workflowy_etch: failed to read candidate nodes file '{candidate}': {e}", "ETCH", ) if payload_nodes is None: return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [ "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)", ], } # Rate limiter handled within workflowy_etch method due to recursive operations try: result = await client.workflowy_etch(parent_id, payload_nodes, replace_all=replace_all) # Annotate result to show where nodes came from (helps debugging real-world agent usage) if used_nodes_file: result.setdefault("_source", {})["nodes_file"] = nodes_file return result except Exception as e: # Top-level exception capture return { "success": False, "nodes_created": 0, "root_node_ids": [], "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "errors": [f"An unexpected error occurred: {str(e)}"] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server