Skip to main content
Glama

batch_append_notes

Append content to multiple notes in a single atomic operation within Obsidian vaults, ensuring all updates complete successfully or none are applied.

Instructions

Append content to multiple notes atomically

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
appendsYes
confirmNo

Implementation Reference

  • Main handler function for the batch_append_notes tool. Performs batch appends to multiple notes with automatic backup creation, concurrent execution, error handling with rollback, and confirmation safety checks.
    name="batch_append_notes", description="Append content to multiple notes atomically", ) async def batch_append_notes( appends: list[NoteAppend], confirm: bool = False, ) -> str: """ Append content to multiple notes. Args: appends: List of NoteAppend objects with path and content confirm: Must be true to apply changes Returns: Success message with append summary """ if not appends: return "Error: No appends provided" # Check batch size limit if len(appends) > MAX_BATCH_SIZE: return ( f"Error: Batch size ({len(appends)}) exceeds maximum ({MAX_BATCH_SIZE}).\n" f"Split into smaller batches to avoid server timeouts." ) logger.info(f"Starting batch_append_notes: {len(appends)} notes") # Extract paths (Pydantic already validated) paths = [append.path for append in appends] if not confirm: return ( f"Error: Batch append to {len(appends)} notes requires explicit confirmation. " f"Set confirm=true to proceed." ) context = _get_context() try: # Create backup (async) backup_id = await context.vault.create_batch_backup(paths) # Apply all appends appended = [] failed = [] for append in appends: try: await context.vault.append_to_note(append.path, append.content) appended.append(append.path) except Exception as e: failed.append((append.path, str(e))) # Rollback on failure (async) if failed: await context.vault.restore_batch_backup(backup_id) result = ["❌ Batch append failed - all changes rolled back\n"] result.append("**Failed appends:**") for path, error in failed: result.append(f"- `{path}`: {error}") return "\n".join(result) # Success logger.info(f"Completed batch_append_notes: {len(appended)} notes updated successfully") result = [f"✅ Appended to {len(appended)} notes\n"] for path in appended: result.append(f"- `{path}`") result.append(f"\n**Backup:** `.batch_backups/{backup_id}/`") return "\n".join(result) except Exception as e: logger.exception("Error in batch append") return f"Error: {e}"
  • Pydantic schema defining the input structure for each append operation in the batch: path and content.
    class NoteAppend(BaseModel): """Schema for appending to a single note.""" path: str = Field(description="Relative path to the note") content: str = Field(description="Content to append to the note")
  • Supporting method in ObsidianVault class that implements the core single-note append logic, reading existing content, ensuring newline separation, and asynchronously writing the updated content.
    async def append_to_note(self, relative_path: str, content: str) -> None: """ Append content to an existing note. Args: relative_path: Path to the note content: Content to append Raises: VaultSecurityError: If path is invalid FileNotFoundError: If note doesn't exist """ file_path = self._validate_path(relative_path) if not file_path.exists(): raise FileNotFoundError(f"Note not found: {relative_path}") # Read existing content async with aiofiles.open(file_path, encoding="utf-8") as f: existing = await f.read() # Append new content (with newline separator if needed) if not existing.endswith("\n"): existing += "\n" existing += content # Write back async with aiofiles.open(file_path, "w", encoding="utf-8") as f: await f.write(existing) logger.info(f"Appended to note: {relative_path}")
  • Batch backup creation helper used by batch_append_notes for atomicity. Creates timestamped backups of specified notes concurrently with metadata preservation.
    async def create_batch_backup(self, relative_paths: list[str]) -> str: """ Create a backup of multiple notes asynchronously. Args: relative_paths: List of note paths to backup Returns: Backup ID (timestamp) for later restoration Raises: VaultSecurityError: If any path is invalid FileNotFoundError: If any note doesn't exist """ # Validate all paths first file_paths = [] for rel_path in relative_paths: file_path = self._validate_path(rel_path) if not file_path.exists(): raise FileNotFoundError(f"Note not found: {rel_path}") file_paths.append((rel_path, file_path)) # Create backup directory with timestamp backup_id = datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir = self.vault_path / ".batch_backups" / backup_id backup_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Creating batch backup {backup_id}: {len(relative_paths)} files...") # Copy all files to backup asynchronously async def copy_file(i: int, rel_path: str, file_path: Path) -> None: backup_file = backup_dir / rel_path backup_file.parent.mkdir(parents=True, exist_ok=True) # Use async file operations async with aiofiles.open(file_path, "rb") as src: content = await src.read() async with aiofiles.open(backup_file, "wb") as dst: await dst.write(content) # Preserve metadata shutil.copystat(file_path, backup_file) logger.debug(f"Backed up ({i}/{len(file_paths)}): {rel_path}") # Run all copies concurrently await asyncio.gather( *[ copy_file(i, rel_path, file_path) for i, (rel_path, file_path) in enumerate(file_paths, 1) ] ) logger.info(f"Completed batch backup: {backup_id} ({len(relative_paths)} notes)") return backup_id
  • Batch restore helper used for rollback on failure in batch_append_notes. Restores notes from backup concurrently preserving structure and metadata.
    async def restore_batch_backup(self, backup_id: str) -> list[str]: """ Restore notes from a batch backup asynchronously. Args: backup_id: Backup ID (timestamp) to restore from Returns: List of restored note paths Raises: FileNotFoundError: If backup doesn't exist """ backup_dir = self.vault_path / ".batch_backups" / backup_id if not backup_dir.exists(): raise FileNotFoundError(f"Backup not found: {backup_id}") logger.info(f"Restoring batch backup {backup_id}...") # Get all backup files backup_files = list(backup_dir.rglob("*.md")) # Restore all files asynchronously async def restore_file(i: int, backup_file: Path) -> str: # Get relative path from backup directory rel_path = backup_file.relative_to(backup_dir) target_file = self.vault_path / rel_path # Ensure parent directory exists target_file.parent.mkdir(parents=True, exist_ok=True) # Restore file async with aiofiles.open(backup_file, "rb") as src: content = await src.read() async with aiofiles.open(target_file, "wb") as dst: await dst.write(content) # Preserve metadata shutil.copystat(backup_file, target_file) logger.debug(f"Restored ({i}): {rel_path}") return str(rel_path) # Run all restores concurrently restored = await asyncio.gather( *[restore_file(i, backup_file) for i, backup_file in enumerate(backup_files, 1)] ) logger.info(f"Completed batch restore: {backup_id} ({len(restored)} notes)") return list(restored)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getglad/obsidian_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server