Skip to main content
Glama

batch_append_notes

Append content to multiple notes in a single atomic operation within Obsidian vaults, ensuring all updates complete successfully or none are applied.

Instructions

Append content to multiple notes atomically

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
appendsYes
confirmNo

Implementation Reference

  • Main handler function for the batch_append_notes tool. Performs batch appends to multiple notes with automatic backup creation, concurrent execution, error handling with rollback, and confirmation safety checks.
        name="batch_append_notes",
        description="Append content to multiple notes atomically",
    )
    async def batch_append_notes(
        appends: list[NoteAppend],
        confirm: bool = False,
    ) -> str:
        """
        Append content to multiple notes.
    
        Args:
            appends: List of NoteAppend objects with path and content
            confirm: Must be true to apply changes
    
        Returns:
            Success message with append summary
        """
        if not appends:
            return "Error: No appends provided"
    
        # Check batch size limit
        if len(appends) > MAX_BATCH_SIZE:
            return (
                f"Error: Batch size ({len(appends)}) exceeds maximum ({MAX_BATCH_SIZE}).\n"
                f"Split into smaller batches to avoid server timeouts."
            )
    
        logger.info(f"Starting batch_append_notes: {len(appends)} notes")
    
        # Extract paths (Pydantic already validated)
        paths = [append.path for append in appends]
    
        if not confirm:
            return (
                f"Error: Batch append to {len(appends)} notes requires explicit confirmation. "
                f"Set confirm=true to proceed."
            )
    
        context = _get_context()
    
        try:
            # Create backup (async)
            backup_id = await context.vault.create_batch_backup(paths)
    
            # Apply all appends
            appended = []
            failed = []
    
            for append in appends:
                try:
                    await context.vault.append_to_note(append.path, append.content)
                    appended.append(append.path)
                except Exception as e:
                    failed.append((append.path, str(e)))
    
            # Rollback on failure (async)
            if failed:
                await context.vault.restore_batch_backup(backup_id)
                result = ["❌ Batch append failed - all changes rolled back\n"]
                result.append("**Failed appends:**")
                for path, error in failed:
                    result.append(f"- `{path}`: {error}")
                return "\n".join(result)
    
            # Success
            logger.info(f"Completed batch_append_notes: {len(appended)} notes updated successfully")
            result = [f"✅ Appended to {len(appended)} notes\n"]
            for path in appended:
                result.append(f"- `{path}`")
            result.append(f"\n**Backup:** `.batch_backups/{backup_id}/`")
            return "\n".join(result)
    
        except Exception as e:
            logger.exception("Error in batch append")
            return f"Error: {e}"
  • Pydantic schema defining the input structure for each append operation in the batch: path and content.
    class NoteAppend(BaseModel):
        """Schema for appending to a single note."""
    
        path: str = Field(description="Relative path to the note")
        content: str = Field(description="Content to append to the note")
  • Supporting method in ObsidianVault class that implements the core single-note append logic, reading existing content, ensuring newline separation, and asynchronously writing the updated content.
    async def append_to_note(self, relative_path: str, content: str) -> None:
        """
        Append content to an existing note.
    
        Args:
            relative_path: Path to the note
            content: Content to append
    
        Raises:
            VaultSecurityError: If path is invalid
            FileNotFoundError: If note doesn't exist
        """
        file_path = self._validate_path(relative_path)
    
        if not file_path.exists():
            raise FileNotFoundError(f"Note not found: {relative_path}")
    
        # Read existing content
        async with aiofiles.open(file_path, encoding="utf-8") as f:
            existing = await f.read()
    
        # Append new content (with newline separator if needed)
        if not existing.endswith("\n"):
            existing += "\n"
    
        existing += content
    
        # Write back
        async with aiofiles.open(file_path, "w", encoding="utf-8") as f:
            await f.write(existing)
        logger.info(f"Appended to note: {relative_path}")
  • Batch backup creation helper used by batch_append_notes for atomicity. Creates timestamped backups of specified notes concurrently with metadata preservation.
    async def create_batch_backup(self, relative_paths: list[str]) -> str:
        """
        Create a backup of multiple notes asynchronously.
    
        Args:
            relative_paths: List of note paths to backup
    
        Returns:
            Backup ID (timestamp) for later restoration
    
        Raises:
            VaultSecurityError: If any path is invalid
            FileNotFoundError: If any note doesn't exist
        """
        # Validate all paths first
        file_paths = []
        for rel_path in relative_paths:
            file_path = self._validate_path(rel_path)
            if not file_path.exists():
                raise FileNotFoundError(f"Note not found: {rel_path}")
            file_paths.append((rel_path, file_path))
    
        # Create backup directory with timestamp
        backup_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_dir = self.vault_path / ".batch_backups" / backup_id
        backup_dir.mkdir(parents=True, exist_ok=True)
    
        logger.info(f"Creating batch backup {backup_id}: {len(relative_paths)} files...")
    
        # Copy all files to backup asynchronously
        async def copy_file(i: int, rel_path: str, file_path: Path) -> None:
            backup_file = backup_dir / rel_path
            backup_file.parent.mkdir(parents=True, exist_ok=True)
    
            # Use async file operations
            async with aiofiles.open(file_path, "rb") as src:
                content = await src.read()
            async with aiofiles.open(backup_file, "wb") as dst:
                await dst.write(content)
    
            # Preserve metadata
            shutil.copystat(file_path, backup_file)
            logger.debug(f"Backed up ({i}/{len(file_paths)}): {rel_path}")
    
        # Run all copies concurrently
        await asyncio.gather(
            *[
                copy_file(i, rel_path, file_path)
                for i, (rel_path, file_path) in enumerate(file_paths, 1)
            ]
        )
    
        logger.info(f"Completed batch backup: {backup_id} ({len(relative_paths)} notes)")
        return backup_id
  • Batch restore helper used for rollback on failure in batch_append_notes. Restores notes from backup concurrently preserving structure and metadata.
    async def restore_batch_backup(self, backup_id: str) -> list[str]:
        """
        Restore notes from a batch backup asynchronously.
    
        Args:
            backup_id: Backup ID (timestamp) to restore from
    
        Returns:
            List of restored note paths
    
        Raises:
            FileNotFoundError: If backup doesn't exist
        """
        backup_dir = self.vault_path / ".batch_backups" / backup_id
    
        if not backup_dir.exists():
            raise FileNotFoundError(f"Backup not found: {backup_id}")
    
        logger.info(f"Restoring batch backup {backup_id}...")
    
        # Get all backup files
        backup_files = list(backup_dir.rglob("*.md"))
    
        # Restore all files asynchronously
        async def restore_file(i: int, backup_file: Path) -> str:
            # Get relative path from backup directory
            rel_path = backup_file.relative_to(backup_dir)
            target_file = self.vault_path / rel_path
    
            # Ensure parent directory exists
            target_file.parent.mkdir(parents=True, exist_ok=True)
    
            # Restore file
            async with aiofiles.open(backup_file, "rb") as src:
                content = await src.read()
            async with aiofiles.open(target_file, "wb") as dst:
                await dst.write(content)
    
            # Preserve metadata
            shutil.copystat(backup_file, target_file)
            logger.debug(f"Restored ({i}): {rel_path}")
            return str(rel_path)
    
        # Run all restores concurrently
        restored = await asyncio.gather(
            *[restore_file(i, backup_file) for i, backup_file in enumerate(backup_files, 1)]
        )
    
        logger.info(f"Completed batch restore: {backup_id} ({len(restored)} notes)")
        return list(restored)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getglad/obsidian_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server