Skip to main content
Glama

workflowy_etch_async

Start asynchronous creation of WorkFlowy outline nodes by providing parent ID and node data, returning a job ID for status tracking.

Instructions

Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
parent_idYes
nodesNo
replace_allNo
nodes_fileNo

Implementation Reference

  • MCP registration of the workflowy_etch_async tool - wrapper that launches the core workflowy_etch handler as a background job via _start_background_job, returning job_id for mcp_job_status monitoring
    @mcp.tool(
        name="workflowy_etch_async",
        description="Start an async ETCH job (Workflowy node creation) and return a job_id for status polling.",
    )
    async def etch_async(
        parent_id: str,
        nodes: list[dict] | str | None = None,
        replace_all: bool = False,
        nodes_file: str | None = None,
    ) -> dict:
        """Start ETCH as a background job and return a job_id."""
        client = get_client()
    
        # Resolve nodes source for the background job
        payload_nodes: list[dict] | str | None = nodes
    
        if nodes_file:
            try:
                with open(nodes_file, "r", encoding="utf-8") as f:
                    payload_nodes = f.read()
            except Exception as e:
                return {
                    "success": False,
                    "error": f"Failed to read nodes_file '{nodes_file}': {str(e)}",
                }
        elif isinstance(payload_nodes, str):
            # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such.
            candidate = payload_nodes.strip()
            if os.path.exists(candidate) and candidate.lower().endswith(".json"):
                try:
                    with open(candidate, "r", encoding="utf-8") as f:
                        payload_nodes = f.read()
                    nodes_file = candidate
                    log_event(
                        f"workflowy_etch_async: treating 'nodes' string as nodes_file path -> {candidate}",
                        "ETCH_ASYNC",
                    )
                except Exception as e:
                    # Fall back to treating it as JSON; let the ETCH parser report a clear error
                    log_event(
                        f"workflowy_etch_async: failed to read candidate nodes file '{candidate}': {e}",
                        "ETCH_ASYNC",
                    )
    
        if payload_nodes is None:
            return {
                "success": False,
                "error": "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)",
            }
    
        async def run_etch(job_id: str) -> dict:  # job_id reserved for future logging
            # Forward both the resolved payload_nodes and the optional nodes_file
            # into the client. workflowy_etch itself knows how to handle:
            #   - list[dict] (already-parsed nodes)
            #   - stringified JSON
            #   - nodes_file path (object-with-nodes or raw string)
            return await client.workflowy_etch(
                parent_id=parent_id,
                nodes=payload_nodes,
                replace_all=replace_all,
                nodes_file=nodes_file,
            )
    
        payload = {
            "parent_id": parent_id,
            "replace_all": replace_all,
            "nodes_file": nodes_file,
        }
    
        return await _start_background_job("etch", payload, run_etch)
  • Core handler logic executed by workflowy_etch_async - creates hierarchical nodes from JSON structure under parent_id, supports file input/stringified JSON/list-of-dicts, additive or replace_all modes, full validation/escaping/logging/retries
    async def workflowy_etch(
        self,
        parent_id: str,
        nodes: list[dict[str, Any]] | str,
        replace_all: bool = False,
        nodes_file: str | None = None,
    ) -> dict[str, Any]:
        """Create multiple nodes from JSON structure (ETCH command).
    
        Args:
            parent_id: Target Workflowy parent id.
            nodes: Either a parsed list[dict] structure or a JSON string
                representing that list. Ignored when nodes_file is provided.
            replace_all: If True, delete all existing children under
                parent_id first; otherwise additive.
            nodes_file: Optional path to a file containing ETCH payload.
                - If the file contains a JSON object with a top-level
                  "nodes" key, that value is used.
                - Else, the entire file content is treated as a JSON
                  string to be parsed the same way as the "nodes" string
                  parameter (including autofix strategies).
        """
        import asyncio
    
        logger = _ClientLogger()
    
        # Optional file-based payload: if nodes_file is provided, it wins
        # over the inline "nodes" argument.
        if nodes_file:
            try:
                with open(nodes_file, "r", encoding="utf-8") as f:
                    file_text = f.read()
                # Try JSON object with top-level "nodes" first
                try:
                    maybe_json = json.loads(file_text)
                    if isinstance(maybe_json, dict) and "nodes" in maybe_json:
                        nodes = maybe_json["nodes"]
                    else:
                        # Not a dict-with-nodes; treat entire file_text as
                        # the stringified nodes payload
                        nodes = file_text
                    logger.info(
                        f"πŸ“„ workflowy_etch: loaded nodes from file '{nodes_file}'"
                    )
                except json.JSONDecodeError:
                    # Not valid JSON as a whole; assume raw JSON string
                    # payload for nodes and let the normal string handler
                    # below deal with it (including autofix).
                    nodes = file_text
                    logger.info(
                        f"πŸ“„ workflowy_etch: using raw file text from '{nodes_file}'"
                    )
            except Exception as e:
                error_msg = (
                    f"Failed to read nodes_file '{nodes_file}': {e}"
                )
                self._log_to_file(error_msg, "etch")
                return {
                    "success": False,
                    "nodes_created": 0,
                    "root_node_ids": [],
                    "errors": [error_msg],
                }
        
        # Auto-fix stringified JSON
        stringify_strategy_used = None
        if isinstance(nodes, str):
            logger.warning("⚠️ Received stringified JSON - attempting parse strategies")
    
            # Strategy 1: direct json.loads() on the full string
            try:
                parsed = json.loads(nodes)
                nodes = parsed
                stringify_strategy_used = "Strategy 1: Direct json.loads()"
                logger.info(f"βœ… {stringify_strategy_used}")
            except json.JSONDecodeError as e:
                # Dan debug note:
                # In practice we've seen cases where the string is a valid JSON
                # array followed by a few stray characters (e.g. the outer '}'
                # from the request body). json.loads() raises
                # JSONDecodeError('Extra data', ...).
                #
                # Rather than naively stripping N characters, we do a
                # _structure-aware_ trim:
                #  - Find the last closing bracket ']' in the string.
                #  - If it exists, take everything up to and including that
                #    bracket and try json.loads() again.
                #  - This preserves the full array and only discards trailing
                #    junk after a syntactically complete list literal.
                #
                # We only apply this once; if it still fails, we surface a
                # clear error and log to etch_debug.log.
                last_bracket = nodes.rfind("]")
                if last_bracket != -1:
                    candidate = nodes[: last_bracket + 1]
                    try:
                        parsed2 = json.loads(candidate)
                        nodes = parsed2
                        stringify_strategy_used = "Strategy 2: Trim after last ']' (recover from trailing junk)"
                        self._log_to_file(
                            "STRINGIFY AUTOFIX: Strategy 2 used. "
                            f"Original len={len(nodes)}, trimmed_len={len(candidate)}. "
                            f"Original JSONDecodeError={repr(e)}",
                            "etch",
                        )
                        logger.info(f"βœ… {stringify_strategy_used}")
                    except json.JSONDecodeError as e2:
                        # Still not valid JSON even after trimming at the
                        # last ']'. At this point we give up and return a
                        # structured error so the caller can see it in the
                        # MCP tool response.
                        error_msg = (
                            "Failed to parse stringified JSON after Strategy 1 "
                            "and Strategy 2. "
                            f"Strategy 1 error={repr(e)}, Strategy 2 error={repr(e2)}"
                        )
                        self._log_to_file(error_msg, "etch")
                        return {
                            "success": False,
                            "nodes_created": 0,
                            "root_node_ids": [],
                            "errors": [error_msg],
                        }
                else:
                    # No closing bracket at all; there's nothing safe to trim
                    # against. Log and fail clearly.
                    error_msg = (
                        "Failed to parse stringified JSON: no closing ']' "
                        f"found. Original error={repr(e)}"
                    )
                    self._log_to_file(error_msg, "etch")
                    return {
                        "success": False,
                        "nodes_created": 0,
                        "root_node_ids": [],
                        "errors": [error_msg],
                    }
        
        # Validate nodes is a list
        if not isinstance(nodes, list):
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "errors": ["Parameter 'nodes' must be a list"]
            }
        
        # Validation checkpoint - NOTE fields only
        def validate_and_escape_nodes_recursive(
            nodes_list: list[dict[str, Any]], path: str = "root"
        ) -> tuple[bool, str | None, list[str]]:
            """Recursively validate and auto-escape NAME and NOTE fields."""
            warnings = []
            
            for idx, node in enumerate(nodes_list):
                node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}"
                
                # Validate NAME
                name = node.get('name')
                if not isinstance(name, str) or not name.strip():
                    return (
                        False,
                        f"Node: {node_path}\n\nName must be non-empty string.",
                        warnings,
                    )
                
                processed_name, name_warning = self._validate_name_field(name)
                if processed_name is not None:
                    node['name'] = processed_name
                if name_warning:
                    warnings.append(f"{node_path} - Name escaped")
                
                # Validate NOTE
                note = node.get('note')
                if note:
                    processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False)
                    
                    if processed_note is None and note_warning:
                        return (False, f"Node: {node_path}\n\n{note_warning}", warnings)
                    
                    node['note'] = processed_note
                    
                    if note_warning and "AUTO-ESCAPED" in note_warning:
                        warnings.append(f"{node_path} - Note escaped")
                
                # Recurse
                children = node.get('children', [])
                if children:
                    success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path)
                    if not success:
                        return (False, error_msg, warnings)
                    warnings.extend(child_warnings)
            
            return (True, None, warnings)
        
        success, error_msg, warnings = validate_and_escape_nodes_recursive(nodes)
        
        if not success:
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "errors": [error_msg or "Validation failed"]
            }
        
        if warnings:
            logger.info(f"βœ… Auto-escaped {len(warnings)} node(s)")
        
        # Stats tracking
        stats = {
            "api_calls": 0,
            "retries": 0,
            "rate_limit_hits": 0,
            "nodes_created": 0,
            "skipped": 0,
            "errors": []
        }
        
        # REPLACE_ALL or DEFAULT mode
        if replace_all:
            logger.info("πŸ—‘οΈ replace_all=True - Deleting all existing children")
            try:
                request = NodeListRequest(parentId=parent_id)
                existing_children, _ = await self.list_nodes(request)
                stats["api_calls"] += 1
                
                for child in existing_children:
                    try:
                        await self.delete_node(child.id)
                        logger.info(f"  Deleted: {child.nm}")
                        stats["api_calls"] += 1
                    except Exception as e:
                        logger.warning(f"  Failed to delete {child.nm}: {e}")
            except Exception as e:
                logger.warning(f"Could not list/delete existing: {e}")
            
            nodes_to_create = nodes
            existing_names = set()
        else:
            # Additive mode - skip existing by name
            try:
                request = NodeListRequest(parentId=parent_id)
                existing_children, _ = await self.list_nodes(request)
                stats["api_calls"] += 1
                
                existing_names = {child.nm.strip() for child in existing_children if child.nm}
                nodes_to_create = [
                    node for node in nodes 
                    if node.get('name', '').strip() not in existing_names
                ]
                
                stats["skipped"] = len(nodes) - len(nodes_to_create)
                
                if stats["skipped"] > 0:
                    logger.info(f"πŸ“ Skipped {stats['skipped']} existing node(s)")
                
                if not nodes_to_create:
                    return {
                        "success": True,
                        "nodes_created": 0,
                        "root_node_ids": [],
                        "skipped": stats["skipped"],
                        "api_calls": stats["api_calls"],
                        "message": "All nodes already exist - nothing to create"
                    }
            except Exception as e:
                logger.warning(f"Could not check existing: {e}")
                nodes_to_create = nodes
                existing_names = set()
        
        # Create tree recursively
        async def create_tree(parent_id: str, nodes: list[dict[str, Any]]) -> list[str]:
            """Recursively create node tree."""
            created_ids = []
            
            for node_data in nodes:
                try:
                    node_name = node_data['name']
                    
                    if not replace_all and node_name in existing_names:
                        stats["skipped"] += 1
                        continue
                    
                    request = NodeCreateRequest(
                        name=node_name,
                        parent_id=parent_id,
                        note=node_data.get('note'),
                        layoutMode=node_data.get('layout_mode'),
                        position=node_data.get('position', 'bottom')
                    )
                    
                    # Create with retry (internal call)
                    node = await self.create_node(request, _internal_call=True)
                    
                    if node:
                        created_ids.append(node.id)
                        stats["nodes_created"] += 1
                        self._log_to_file(f"  Created: {node_name} ({node.id})", "etch")
                        
                        # Recursively create children
                        if 'children' in node_data and node_data['children']:
                            await create_tree(node.id, node_data['children'])
                
                except Exception as e:
                    error_msg = f"Failed to create '{node_data.get('name', 'unknown')}': {str(e)}"
                    logger.error(error_msg)
                    stats["errors"].append(error_msg)
                    continue
            
            return created_ids
        
        try:
            self._log_to_file(f"ETCH start (replace_all={replace_all}) parent={parent_id}", "etch")
            root_ids = await create_tree(parent_id, nodes_to_create)
            
            self._log_to_file(f"ETCH complete: {stats['nodes_created']} created", "etch")
            
            result = {
                "success": len(stats["errors"]) == 0,
                "nodes_created": stats["nodes_created"],
                "root_node_ids": root_ids,
                "api_calls": stats["api_calls"],
                "retries": stats["retries"],
                "errors": stats["errors"]
            }
            
            if not replace_all:
                result["skipped"] = stats["skipped"]
            
            if stringify_strategy_used:
                result["_stringify_autofix"] = stringify_strategy_used
    
            # Mark parent dirty
            if result.get("success", False):
                try:
                    self._mark_nodes_export_dirty([parent_id])
                except Exception:
                    pass
            
            return result
            
        except Exception as e:
            error_msg = f"ETCH failed: {str(e)}"
            log_event(error_msg, "ETCH")
            stats["errors"].append(error_msg)
            
            return {
                "success": False,
                "nodes_created": stats["nodes_created"],
                "errors": stats["errors"]
            }
  • Key helper for node validation and auto-escaping (name/note fields) used in workflowy_etch
    def validate_and_escape_nodes_recursive(
        nodes_list: list[dict[str, Any]], path: str = "root"
    ) -> tuple[bool, str | None, list[str]]:
        """Recursively validate and auto-escape NAME and NOTE fields."""
        warnings = []
        
        for idx, node in enumerate(nodes_list):
            node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}"
            
            # Validate NAME
            name = node.get('name')
            if not isinstance(name, str) or not name.strip():
                return (
                    False,
                    f"Node: {node_path}\n\nName must be non-empty string.",
                    warnings,
                )
            
            processed_name, name_warning = self._validate_name_field(name)
            if processed_name is not None:
                node['name'] = processed_name
            if name_warning:
                warnings.append(f"{node_path} - Name escaped")
            
            # Validate NOTE
            note = node.get('note')
            if note:
                processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False)
                
                if processed_note is None and note_warning:
                    return (False, f"Node: {node_path}\n\n{note_warning}", warnings)
                
                node['note'] = processed_note
                
                if note_warning and "AUTO-ESCAPED" in note_warning:
                    warnings.append(f"{node_path} - Note escaped")
            
            # Recurse
            children = node.get('children', [])
            if children:
                success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path)
                if not success:
                    return (False, error_msg, warnings)
                warnings.extend(child_warnings)
        
        return (True, None, warnings)
  • Logging helper for ETCH operations - writes to etch_debug.log (tag-specific)
    def _log_to_file_helper(message: str, log_type: str = "reconcile") -> None:
        """Log message to a tag-specific debug file (best-effort).
    
        Args:
            message: The message to log
            log_type: "reconcile" -> reconcile_debug.log
                        "etch"      -> etch_debug.log
        
        If _current_weave_context["json_file"] is set, logs go to the same directory
        as the JSON file (tag-specific). Otherwise, falls back to global temp/.
        """
        try:
            filename = "reconcile_debug.log"
            if log_type == "etch":
                filename = "etch_debug.log"
            elif log_type == "nexus":
                filename = "nexus_debug.log"
            elif log_type in ("jewel", "jewelstorm"):
                filename = "jewelstorm_debug.log"
            
            # Determine log directory (tag-specific if in WEAVE context)
            json_file = _current_weave_context.get("json_file")
            if json_file and os.path.exists(json_file):
                # Tag-specific: put debug log in same directory as JSON
                log_path = os.path.join(os.path.dirname(json_file), filename)
            else:
                # Global fallback
                log_path = fr"E:\__daniel347x\__Obsidian\__Inking into Mind\--TypingMind\Projects - All\Projects - Individual\TODO\temp\{filename}"
            
            ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
            
            with open(log_path, "a", encoding="utf-8") as dbg:
                dbg.write(f"[{ts}] {message}\n")
        except Exception:
            # Never let logging failures affect API behavior
            pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server