Skip to main content
Glama

workflowy_etch_async

Start asynchronous creation of WorkFlowy outline nodes by providing parent ID and node data, returning a job ID for status tracking.

Instructions

Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
parent_idYes
nodesNo
replace_allNo
nodes_fileNo

Implementation Reference

  • MCP registration of the workflowy_etch_async tool - wrapper that launches the core workflowy_etch handler as a background job via _start_background_job, returning job_id for mcp_job_status monitoring
    @mcp.tool( name="workflowy_etch_async", description="Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.", ) async def etch_async( parent_id: str, nodes: list[dict] | str | None = None, replace_all: bool = False, nodes_file: str | None = None, ) -> dict: """Start ETCH as a background job and return a job_id.""" client = get_client() # Resolve nodes source for the background job payload_nodes: list[dict] | str | None = nodes if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: payload_nodes = f.read() except Exception as e: return { "success": False, "error": f"Failed to read nodes_file '{nodes_file}': {str(e)}", } elif isinstance(payload_nodes, str): # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such. candidate = payload_nodes.strip() if os.path.exists(candidate) and candidate.lower().endswith(".json"): try: with open(candidate, "r", encoding="utf-8") as f: payload_nodes = f.read() nodes_file = candidate log_event( f"workflowy_etch_async: treating 'nodes' string as nodes_file path -> {candidate}", "ETCH_ASYNC", ) except Exception as e: # Fall back to treating it as JSON; let the ETCH parser report a clear error log_event( f"workflowy_etch_async: failed to read candidate nodes file '{candidate}': {e}", "ETCH_ASYNC", ) if payload_nodes is None: return { "success": False, "error": "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)", } async def run_etch(job_id: str) -> dict: # job_id reserved for future logging # Forward both the resolved payload_nodes and the optional nodes_file # into the client. workflowy_etch itself knows how to handle: # - list[dict] (already-parsed nodes) # - stringified JSON # - nodes_file path (object-with-nodes or raw string) return await client.workflowy_etch( parent_id=parent_id, nodes=payload_nodes, replace_all=replace_all, nodes_file=nodes_file, ) payload = { "parent_id": parent_id, "replace_all": replace_all, "nodes_file": nodes_file, } return await _start_background_job("etch", payload, run_etch)
  • Core handler logic executed by workflowy_etch_async - creates hierarchical nodes from JSON structure under parent_id, supports file input/stringified JSON/list-of-dicts, additive or replace_all modes, full validation/escaping/logging/retries
    async def workflowy_etch( self, parent_id: str, nodes: list[dict[str, Any]] | str, replace_all: bool = False, nodes_file: str | None = None, ) -> dict[str, Any]: """Create multiple nodes from JSON structure (ETCH command). Args: parent_id: Target Workflowy parent id. nodes: Either a parsed list[dict] structure or a JSON string representing that list. Ignored when nodes_file is provided. replace_all: If True, delete all existing children under parent_id first; otherwise additive. nodes_file: Optional path to a file containing ETCH payload. - If the file contains a JSON object with a top-level "nodes" key, that value is used. - Else, the entire file content is treated as a JSON string to be parsed the same way as the "nodes" string parameter (including autofix strategies). """ import asyncio logger = _ClientLogger() # Optional file-based payload: if nodes_file is provided, it wins # over the inline "nodes" argument. if nodes_file: try: with open(nodes_file, "r", encoding="utf-8") as f: file_text = f.read() # Try JSON object with top-level "nodes" first try: maybe_json = json.loads(file_text) if isinstance(maybe_json, dict) and "nodes" in maybe_json: nodes = maybe_json["nodes"] else: # Not a dict-with-nodes; treat entire file_text as # the stringified nodes payload nodes = file_text logger.info( f"📄 workflowy_etch: loaded nodes from file '{nodes_file}'" ) except json.JSONDecodeError: # Not valid JSON as a whole; assume raw JSON string # payload for nodes and let the normal string handler # below deal with it (including autofix). nodes = file_text logger.info( f"📄 workflowy_etch: using raw file text from '{nodes_file}'" ) except Exception as e: error_msg = ( f"Failed to read nodes_file '{nodes_file}': {e}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } # Auto-fix stringified JSON stringify_strategy_used = None if isinstance(nodes, str): logger.warning("⚠️ Received stringified JSON - attempting parse strategies") # Strategy 1: direct json.loads() on the full string try: parsed = json.loads(nodes) nodes = parsed stringify_strategy_used = "Strategy 1: Direct json.loads()" logger.info(f"✅ {stringify_strategy_used}") except json.JSONDecodeError as e: # Dan debug note: # In practice we've seen cases where the string is a valid JSON # array followed by a few stray characters (e.g. the outer '}' # from the request body). json.loads() raises # JSONDecodeError('Extra data', ...). # # Rather than naively stripping N characters, we do a # _structure-aware_ trim: # - Find the last closing bracket ']' in the string. # - If it exists, take everything up to and including that # bracket and try json.loads() again. # - This preserves the full array and only discards trailing # junk after a syntactically complete list literal. # # We only apply this once; if it still fails, we surface a # clear error and log to etch_debug.log. last_bracket = nodes.rfind("]") if last_bracket != -1: candidate = nodes[: last_bracket + 1] try: parsed2 = json.loads(candidate) nodes = parsed2 stringify_strategy_used = "Strategy 2: Trim after last ']' (recover from trailing junk)" self._log_to_file( "STRINGIFY AUTOFIX: Strategy 2 used. " f"Original len={len(nodes)}, trimmed_len={len(candidate)}. " f"Original JSONDecodeError={repr(e)}", "etch", ) logger.info(f"✅ {stringify_strategy_used}") except json.JSONDecodeError as e2: # Still not valid JSON even after trimming at the # last ']'. At this point we give up and return a # structured error so the caller can see it in the # MCP tool response. error_msg = ( "Failed to parse stringified JSON after Strategy 1 " "and Strategy 2. " f"Strategy 1 error={repr(e)}, Strategy 2 error={repr(e2)}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } else: # No closing bracket at all; there's nothing safe to trim # against. Log and fail clearly. error_msg = ( "Failed to parse stringified JSON: no closing ']' " f"found. Original error={repr(e)}" ) self._log_to_file(error_msg, "etch") return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg], } # Validate nodes is a list if not isinstance(nodes, list): return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": ["Parameter 'nodes' must be a list"] } # Validation checkpoint - NOTE fields only def validate_and_escape_nodes_recursive( nodes_list: list[dict[str, Any]], path: str = "root" ) -> tuple[bool, str | None, list[str]]: """Recursively validate and auto-escape NAME and NOTE fields.""" warnings = [] for idx, node in enumerate(nodes_list): node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}" # Validate NAME name = node.get('name') if not isinstance(name, str) or not name.strip(): return ( False, f"Node: {node_path}\n\nName must be non-empty string.", warnings, ) processed_name, name_warning = self._validate_name_field(name) if processed_name is not None: node['name'] = processed_name if name_warning: warnings.append(f"{node_path} - Name escaped") # Validate NOTE note = node.get('note') if note: processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False) if processed_note is None and note_warning: return (False, f"Node: {node_path}\n\n{note_warning}", warnings) node['note'] = processed_note if note_warning and "AUTO-ESCAPED" in note_warning: warnings.append(f"{node_path} - Note escaped") # Recurse children = node.get('children', []) if children: success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path) if not success: return (False, error_msg, warnings) warnings.extend(child_warnings) return (True, None, warnings) success, error_msg, warnings = validate_and_escape_nodes_recursive(nodes) if not success: return { "success": False, "nodes_created": 0, "root_node_ids": [], "errors": [error_msg or "Validation failed"] } if warnings: logger.info(f"✅ Auto-escaped {len(warnings)} node(s)") # Stats tracking stats = { "api_calls": 0, "retries": 0, "rate_limit_hits": 0, "nodes_created": 0, "skipped": 0, "errors": [] } # REPLACE_ALL or DEFAULT mode if replace_all: logger.info("🗑️ replace_all=True - Deleting all existing children") try: request = NodeListRequest(parentId=parent_id) existing_children, _ = await self.list_nodes(request) stats["api_calls"] += 1 for child in existing_children: try: await self.delete_node(child.id) logger.info(f" Deleted: {child.nm}") stats["api_calls"] += 1 except Exception as e: logger.warning(f" Failed to delete {child.nm}: {e}") except Exception as e: logger.warning(f"Could not list/delete existing: {e}") nodes_to_create = nodes existing_names = set() else: # Additive mode - skip existing by name try: request = NodeListRequest(parentId=parent_id) existing_children, _ = await self.list_nodes(request) stats["api_calls"] += 1 existing_names = {child.nm.strip() for child in existing_children if child.nm} nodes_to_create = [ node for node in nodes if node.get('name', '').strip() not in existing_names ] stats["skipped"] = len(nodes) - len(nodes_to_create) if stats["skipped"] > 0: logger.info(f"📝 Skipped {stats['skipped']} existing node(s)") if not nodes_to_create: return { "success": True, "nodes_created": 0, "root_node_ids": [], "skipped": stats["skipped"], "api_calls": stats["api_calls"], "message": "All nodes already exist - nothing to create" } except Exception as e: logger.warning(f"Could not check existing: {e}") nodes_to_create = nodes existing_names = set() # Create tree recursively async def create_tree(parent_id: str, nodes: list[dict[str, Any]]) -> list[str]: """Recursively create node tree.""" created_ids = [] for node_data in nodes: try: node_name = node_data['name'] if not replace_all and node_name in existing_names: stats["skipped"] += 1 continue request = NodeCreateRequest( name=node_name, parent_id=parent_id, note=node_data.get('note'), layoutMode=node_data.get('layout_mode'), position=node_data.get('position', 'bottom') ) # Create with retry (internal call) node = await self.create_node(request, _internal_call=True) if node: created_ids.append(node.id) stats["nodes_created"] += 1 self._log_to_file(f" Created: {node_name} ({node.id})", "etch") # Recursively create children if 'children' in node_data and node_data['children']: await create_tree(node.id, node_data['children']) except Exception as e: error_msg = f"Failed to create '{node_data.get('name', 'unknown')}': {str(e)}" logger.error(error_msg) stats["errors"].append(error_msg) continue return created_ids try: self._log_to_file(f"ETCH start (replace_all={replace_all}) parent={parent_id}", "etch") root_ids = await create_tree(parent_id, nodes_to_create) self._log_to_file(f"ETCH complete: {stats['nodes_created']} created", "etch") result = { "success": len(stats["errors"]) == 0, "nodes_created": stats["nodes_created"], "root_node_ids": root_ids, "api_calls": stats["api_calls"], "retries": stats["retries"], "errors": stats["errors"] } if not replace_all: result["skipped"] = stats["skipped"] if stringify_strategy_used: result["_stringify_autofix"] = stringify_strategy_used # Mark parent dirty if result.get("success", False): try: self._mark_nodes_export_dirty([parent_id]) except Exception: pass return result except Exception as e: error_msg = f"ETCH failed: {str(e)}" log_event(error_msg, "ETCH") stats["errors"].append(error_msg) return { "success": False, "nodes_created": stats["nodes_created"], "errors": stats["errors"] }
  • Key helper for node validation and auto-escaping (name/note fields) used in workflowy_etch
    def validate_and_escape_nodes_recursive( nodes_list: list[dict[str, Any]], path: str = "root" ) -> tuple[bool, str | None, list[str]]: """Recursively validate and auto-escape NAME and NOTE fields.""" warnings = [] for idx, node in enumerate(nodes_list): node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}" # Validate NAME name = node.get('name') if not isinstance(name, str) or not name.strip(): return ( False, f"Node: {node_path}\n\nName must be non-empty string.", warnings, ) processed_name, name_warning = self._validate_name_field(name) if processed_name is not None: node['name'] = processed_name if name_warning: warnings.append(f"{node_path} - Name escaped") # Validate NOTE note = node.get('note') if note: processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False) if processed_note is None and note_warning: return (False, f"Node: {node_path}\n\n{note_warning}", warnings) node['note'] = processed_note if note_warning and "AUTO-ESCAPED" in note_warning: warnings.append(f"{node_path} - Note escaped") # Recurse children = node.get('children', []) if children: success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path) if not success: return (False, error_msg, warnings) warnings.extend(child_warnings) return (True, None, warnings)
  • Logging helper for ETCH operations - writes to etch_debug.log (tag-specific)
    def _log_to_file_helper(message: str, log_type: str = "reconcile") -> None: """Log message to a tag-specific debug file (best-effort). Args: message: The message to log log_type: "reconcile" -> reconcile_debug.log "etch" -> etch_debug.log If _current_weave_context["json_file"] is set, logs go to the same directory as the JSON file (tag-specific). Otherwise, falls back to global temp/. """ try: filename = "reconcile_debug.log" if log_type == "etch": filename = "etch_debug.log" elif log_type == "nexus": filename = "nexus_debug.log" elif log_type in ("jewel", "jewelstorm"): filename = "jewelstorm_debug.log" # Determine log directory (tag-specific if in WEAVE context) json_file = _current_weave_context.get("json_file") if json_file and os.path.exists(json_file): # Tag-specific: put debug log in same directory as JSON log_path = os.path.join(os.path.dirname(json_file), filename) else: # Global fallback log_path = fr"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\{filename}" ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] with open(log_path, "a", encoding="utf-8") as dbg: dbg.write(f"[{ts}] {message}\n") except Exception: # Never let logging failures affect API behavior pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server