Skip to main content
Glama

workflowy_etch

Create multiple hierarchical nodes directly from JSON data in WorkFlowy without file intermediaries, enabling structured content organization and task management.

Instructions

Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
parent_idYes
nodesNo
replace_allNo
nodes_fileNo

Implementation Reference

  • The core handler function that implements the workflowy_etch tool logic: parses nodes from JSON or file, validates/escapes content, handles replace_all or additive modes, skips existing nodes by name, recursively creates hierarchical node trees via API calls with retries and rate limiting.
    async def workflowy_etch(
        self,
        parent_id: str,
        nodes: list[dict[str, Any]] | str,
        replace_all: bool = False,
        nodes_file: str | None = None,
    ) -> dict[str, Any]:
        """Create multiple nodes from JSON structure (ETCH command).
    
        Args:
            parent_id: Target Workflowy parent id.
            nodes: Either a parsed list[dict] structure or a JSON string
                representing that list. Ignored when nodes_file is provided.
            replace_all: If True, delete all existing children under
                parent_id first; otherwise additive.
            nodes_file: Optional path to a file containing ETCH payload.
                - If the file contains a JSON object with a top-level
                  "nodes" key, that value is used.
                - Else, the entire file content is treated as a JSON
                  string to be parsed the same way as the "nodes" string
                  parameter (including autofix strategies).
        """
        import asyncio
    
        logger = _ClientLogger()
    
        # Optional file-based payload: if nodes_file is provided, it wins
        # over the inline "nodes" argument.
        if nodes_file:
            try:
                with open(nodes_file, "r", encoding="utf-8") as f:
                    file_text = f.read()
                # Try JSON object with top-level "nodes" first
                try:
                    maybe_json = json.loads(file_text)
                    if isinstance(maybe_json, dict) and "nodes" in maybe_json:
                        nodes = maybe_json["nodes"]
                    else:
                        # Not a dict-with-nodes; treat entire file_text as
                        # the stringified nodes payload
                        nodes = file_text
                    logger.info(
                        f"πŸ“„ workflowy_etch: loaded nodes from file '{nodes_file}'"
                    )
                except json.JSONDecodeError:
                    # Not valid JSON as a whole; assume raw JSON string
                    # payload for nodes and let the normal string handler
                    # below deal with it (including autofix).
                    nodes = file_text
                    logger.info(
                        f"πŸ“„ workflowy_etch: using raw file text from '{nodes_file}'"
                    )
            except Exception as e:
                error_msg = (
                    f"Failed to read nodes_file '{nodes_file}': {e}"
                )
                self._log_to_file(error_msg, "etch")
                return {
                    "success": False,
                    "nodes_created": 0,
                    "root_node_ids": [],
                    "errors": [error_msg],
                }
        
        # Auto-fix stringified JSON
        stringify_strategy_used = None
        if isinstance(nodes, str):
            logger.warning("⚠️ Received stringified JSON - attempting parse strategies")
    
            # Strategy 1: direct json.loads() on the full string
            try:
                parsed = json.loads(nodes)
                nodes = parsed
                stringify_strategy_used = "Strategy 1: Direct json.loads()"
                logger.info(f"βœ… {stringify_strategy_used}")
            except json.JSONDecodeError as e:
                # Dan debug note:
                # In practice we've seen cases where the string is a valid JSON
                # array followed by a few stray characters (e.g. the outer '}'
                # from the request body). json.loads() raises
                # JSONDecodeError('Extra data', ...).
                #
                # Rather than naively stripping N characters, we do a
                # _structure-aware_ trim:
                #  - Find the last closing bracket ']' in the string.
                #  - If it exists, take everything up to and including that
                #    bracket and try json.loads() again.
                #  - This preserves the full array and only discards trailing
                #    junk after a syntactically complete list literal.
                #
                # We only apply this once; if it still fails, we surface a
                # clear error and log to etch_debug.log.
                last_bracket = nodes.rfind("]")
                if last_bracket != -1:
                    candidate = nodes[: last_bracket + 1]
                    try:
                        parsed2 = json.loads(candidate)
                        nodes = parsed2
                        stringify_strategy_used = "Strategy 2: Trim after last ']' (recover from trailing junk)"
                        self._log_to_file(
                            "STRINGIFY AUTOFIX: Strategy 2 used. "
                            f"Original len={len(nodes)}, trimmed_len={len(candidate)}. "
                            f"Original JSONDecodeError={repr(e)}",
                            "etch",
                        )
                        logger.info(f"βœ… {stringify_strategy_used}")
                    except json.JSONDecodeError as e2:
                        # Still not valid JSON even after trimming at the
                        # last ']'. At this point we give up and return a
                        # structured error so the caller can see it in the
                        # MCP tool response.
                        error_msg = (
                            "Failed to parse stringified JSON after Strategy 1 "
                            "and Strategy 2. "
                            f"Strategy 1 error={repr(e)}, Strategy 2 error={repr(e2)}"
                        )
                        self._log_to_file(error_msg, "etch")
                        return {
                            "success": False,
                            "nodes_created": 0,
                            "root_node_ids": [],
                            "errors": [error_msg],
                        }
                else:
                    # No closing bracket at all; there's nothing safe to trim
                    # against. Log and fail clearly.
                    error_msg = (
                        "Failed to parse stringified JSON: no closing ']' "
                        f"found. Original error={repr(e)}"
                    )
                    self._log_to_file(error_msg, "etch")
                    return {
                        "success": False,
                        "nodes_created": 0,
                        "root_node_ids": [],
                        "errors": [error_msg],
                    }
        
        # Validate nodes is a list
        if not isinstance(nodes, list):
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "errors": ["Parameter 'nodes' must be a list"]
            }
        
        # Validation checkpoint - NOTE fields only
        def validate_and_escape_nodes_recursive(
            nodes_list: list[dict[str, Any]], path: str = "root"
        ) -> tuple[bool, str | None, list[str]]:
            """Recursively validate and auto-escape NAME and NOTE fields."""
            warnings = []
            
            for idx, node in enumerate(nodes_list):
                node_path = f"{path}[{idx}].{node.get('name', 'unnamed')}"
                
                # Validate NAME
                name = node.get('name')
                if not isinstance(name, str) or not name.strip():
                    return (
                        False,
                        f"Node: {node_path}\n\nName must be non-empty string.",
                        warnings,
                    )
                
                processed_name, name_warning = self._validate_name_field(name)
                if processed_name is not None:
                    node['name'] = processed_name
                if name_warning:
                    warnings.append(f"{node_path} - Name escaped")
                
                # Validate NOTE
                note = node.get('note')
                if note:
                    processed_note, note_warning = self._validate_note_field(note, skip_newline_check=False)
                    
                    if processed_note is None and note_warning:
                        return (False, f"Node: {node_path}\n\n{note_warning}", warnings)
                    
                    node['note'] = processed_note
                    
                    if note_warning and "AUTO-ESCAPED" in note_warning:
                        warnings.append(f"{node_path} - Note escaped")
                
                # Recurse
                children = node.get('children', [])
                if children:
                    success, error_msg, child_warnings = validate_and_escape_nodes_recursive(children, node_path)
                    if not success:
                        return (False, error_msg, warnings)
                    warnings.extend(child_warnings)
            
            return (True, None, warnings)
        
        success, error_msg, warnings = validate_and_escape_nodes_recursive(nodes)
        
        if not success:
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "errors": [error_msg or "Validation failed"]
            }
        
        if warnings:
            logger.info(f"βœ… Auto-escaped {len(warnings)} node(s)")
        
        # Stats tracking
        stats = {
            "api_calls": 0,
            "retries": 0,
            "rate_limit_hits": 0,
            "nodes_created": 0,
            "skipped": 0,
            "errors": []
        }
        
        # REPLACE_ALL or DEFAULT mode
        if replace_all:
            logger.info("πŸ—‘οΈ replace_all=True - Deleting all existing children")
            try:
                request = NodeListRequest(parentId=parent_id)
                existing_children, _ = await self.list_nodes(request)
                stats["api_calls"] += 1
                
                for child in existing_children:
                    try:
                        await self.delete_node(child.id)
                        logger.info(f"  Deleted: {child.nm}")
                        stats["api_calls"] += 1
                    except Exception as e:
                        logger.warning(f"  Failed to delete {child.nm}: {e}")
            except Exception as e:
                logger.warning(f"Could not list/delete existing: {e}")
            
            nodes_to_create = nodes
            existing_names = set()
        else:
            # Additive mode - skip existing by name
            try:
                request = NodeListRequest(parentId=parent_id)
                existing_children, _ = await self.list_nodes(request)
                stats["api_calls"] += 1
                
                existing_names = {child.nm.strip() for child in existing_children if child.nm}
                nodes_to_create = [
                    node for node in nodes 
                    if node.get('name', '').strip() not in existing_names
                ]
                
                stats["skipped"] = len(nodes) - len(nodes_to_create)
                
                if stats["skipped"] > 0:
                    logger.info(f"πŸ“ Skipped {stats['skipped']} existing node(s)")
                
                if not nodes_to_create:
                    return {
                        "success": True,
                        "nodes_created": 0,
                        "root_node_ids": [],
                        "skipped": stats["skipped"],
                        "api_calls": stats["api_calls"],
                        "message": "All nodes already exist - nothing to create"
                    }
            except Exception as e:
                logger.warning(f"Could not check existing: {e}")
                nodes_to_create = nodes
                existing_names = set()
        
        # Create tree recursively
        async def create_tree(parent_id: str, nodes: list[dict[str, Any]]) -> list[str]:
            """Recursively create node tree."""
            created_ids = []
            
            for node_data in nodes:
                try:
                    node_name = node_data['name']
                    
                    if not replace_all and node_name in existing_names:
                        stats["skipped"] += 1
                        continue
                    
                    request = NodeCreateRequest(
                        name=node_name,
                        parent_id=parent_id,
                        note=node_data.get('note'),
                        layoutMode=node_data.get('layout_mode'),
                        position=node_data.get('position', 'bottom')
                    )
                    
                    # Create with retry (internal call)
                    node = await self.create_node(request, _internal_call=True)
                    
                    if node:
                        created_ids.append(node.id)
                        stats["nodes_created"] += 1
                        self._log_to_file(f"  Created: {node_name} ({node.id})", "etch")
                        
                        # Recursively create children
                        if 'children' in node_data and node_data['children']:
                            await create_tree(node.id, node_data['children'])
                
                except Exception as e:
                    error_msg = f"Failed to create '{node_data.get('name', 'unknown')}': {str(e)}"
                    logger.error(error_msg)
                    stats["errors"].append(error_msg)
                    continue
            
            return created_ids
        
        try:
            self._log_to_file(f"ETCH start (replace_all={replace_all}) parent={parent_id}", "etch")
            root_ids = await create_tree(parent_id, nodes_to_create)
            
            self._log_to_file(f"ETCH complete: {stats['nodes_created']} created", "etch")
            
            result = {
                "success": len(stats["errors"]) == 0,
                "nodes_created": stats["nodes_created"],
                "root_node_ids": root_ids,
                "api_calls": stats["api_calls"],
                "retries": stats["retries"],
                "errors": stats["errors"]
            }
            
            if not replace_all:
                result["skipped"] = stats["skipped"]
            
            if stringify_strategy_used:
                result["_stringify_autofix"] = stringify_strategy_used
    
            # Mark parent dirty
            if result.get("success", False):
                try:
                    self._mark_nodes_export_dirty([parent_id])
                except Exception:
                    pass
            
            return result
            
        except Exception as e:
            error_msg = f"ETCH failed: {str(e)}"
            log_event(error_msg, "ETCH")
            stats["errors"].append(error_msg)
            
            return {
                "success": False,
                "nodes_created": stats["nodes_created"],
                "errors": stats["errors"]
            }
  • FastMCP tool registration decorator and wrapper function that exposes the workflowy_etch tool to MCP clients, handling parameter resolution (nodes as list/string/file), forwarding to client.workflowy_etch, and returning results.
    @mcp.tool(
        name="workflowy_etch",
        description="Create multiple nodes from JSON structure (no file intermediary). ETCH command for direct node creation."
    )
    async def etch(
        parent_id: str,
        nodes: list[dict] | str | None = None,
        replace_all: bool = False,
        nodes_file: str | None = None,
    ) -> dict:
        """Create multiple nodes from JSON structure.
    
        ETCH command - simple additive node creation (no UUIDs, no updates/moves).
        Fallback: If this fails, use INSCRIBE scroll (write_file β†’ bulk_import).
    
        DEFAULT: Additive (skip existing by name, add new children only)
        REPLACE: Wipe all children, create fresh
    
        For complex operations (moves/updates with UUIDs): Use NEXUS scroll instead.
    
        Args:
            parent_id: Parent UUID where nodes should be created
            nodes: List of node objects (NO UUIDs - just name/note/children)
            replace_all: If True, delete ALL existing children first. Default False.
    
        Returns:
            Dictionary with success status, nodes created, skipped (if append_only), API call stats, and errors
        """
        client = get_client()
    
        # Resolve nodes source: direct value or file-based
        payload_nodes: list[dict] | str | None = nodes
        used_nodes_file = False
    
        if nodes_file:
            try:
                with open(nodes_file, "r", encoding="utf-8") as f:
                    payload_nodes = f.read()
                used_nodes_file = True
            except Exception as e:
                return {
                    "success": False,
                    "nodes_created": 0,
                    "root_node_ids": [],
                    "api_calls": 0,
                    "retries": 0,
                    "rate_limit_hits": 0,
                    "errors": [
                        f"Failed to read nodes_file '{nodes_file}': {str(e)}",
                        "Hint: Ensure the path is correct and accessible from the MCP server",
                    ],
                }
        elif isinstance(payload_nodes, str):
            # Agent convenience: if 'nodes' looks like a real JSON file path, try it as such.
            candidate = payload_nodes.strip()
            if os.path.exists(candidate) and candidate.lower().endswith(".json"):
                try:
                    with open(candidate, "r", encoding="utf-8") as f:
                        payload_nodes = f.read()
                    used_nodes_file = True
                    nodes_file = candidate
                    log_event(
                        f"workflowy_etch: treating 'nodes' string as nodes_file path -> {candidate}",
                        "ETCH",
                    )
                except Exception as e:
                    # Fall back to treating it as JSON; let the ETCH parser report a clear error
                    log_event(
                        f"workflowy_etch: failed to read candidate nodes file '{candidate}': {e}",
                        "ETCH",
                    )
    
        if payload_nodes is None:
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "api_calls": 0,
                "retries": 0,
                "rate_limit_hits": 0,
                "errors": [
                    "Missing ETCH payload: provide either 'nodes' (list or stringified JSON) or 'nodes_file' (path to JSON file)",
                ],
            }
    
        # Rate limiter handled within workflowy_etch method due to recursive operations
    
        try:
            result = await client.workflowy_etch(parent_id, payload_nodes, replace_all=replace_all)
            # Annotate result to show where nodes came from (helps debugging real-world agent usage)
            if used_nodes_file:
                result.setdefault("_source", {})["nodes_file"] = nodes_file
            return result
        except Exception as e:
            # Top-level exception capture
            return {
                "success": False,
                "nodes_created": 0,
                "root_node_ids": [],
                "api_calls": 0,
                "retries": 0,
                "rate_limit_hits": 0,
                "errors": [f"An unexpected error occurred: {str(e)}"]
            }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/daniel347x/workflowy-mcp-fixed'

If you have feedback or need assistance with the MCP directory API, please join our Discord server