batch_append_notes
Append content to multiple notes in a single atomic operation within Obsidian vaults, ensuring all updates complete successfully or none are applied.
Instructions
Append content to multiple notes atomically
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| appends | Yes | ||
| confirm | No |
Implementation Reference
- src/obsidian_mcp/server.py:1626-1701 (handler)Main handler function for the batch_append_notes tool. Performs batch appends to multiple notes with automatic backup creation, concurrent execution, error handling with rollback, and confirmation safety checks.name="batch_append_notes", description="Append content to multiple notes atomically", ) async def batch_append_notes( appends: list[NoteAppend], confirm: bool = False, ) -> str: """ Append content to multiple notes. Args: appends: List of NoteAppend objects with path and content confirm: Must be true to apply changes Returns: Success message with append summary """ if not appends: return "Error: No appends provided" # Check batch size limit if len(appends) > MAX_BATCH_SIZE: return ( f"Error: Batch size ({len(appends)}) exceeds maximum ({MAX_BATCH_SIZE}).\n" f"Split into smaller batches to avoid server timeouts." ) logger.info(f"Starting batch_append_notes: {len(appends)} notes") # Extract paths (Pydantic already validated) paths = [append.path for append in appends] if not confirm: return ( f"Error: Batch append to {len(appends)} notes requires explicit confirmation. " f"Set confirm=true to proceed." ) context = _get_context() try: # Create backup (async) backup_id = await context.vault.create_batch_backup(paths) # Apply all appends appended = [] failed = [] for append in appends: try: await context.vault.append_to_note(append.path, append.content) appended.append(append.path) except Exception as e: failed.append((append.path, str(e))) # Rollback on failure (async) if failed: await context.vault.restore_batch_backup(backup_id) result = ["❌ Batch append failed - all changes rolled back\n"] result.append("**Failed appends:**") for path, error in failed: result.append(f"- `{path}`: {error}") return "\n".join(result) # Success logger.info(f"Completed batch_append_notes: {len(appended)} notes updated successfully") result = [f"✅ Appended to {len(appended)} notes\n"] for path in appended: result.append(f"- `{path}`") result.append(f"\n**Backup:** `.batch_backups/{backup_id}/`") return "\n".join(result) except Exception as e: logger.exception("Error in batch append") return f"Error: {e}"
- src/obsidian_mcp/server.py:114-119 (schema)Pydantic schema defining the input structure for each append operation in the batch: path and content.class NoteAppend(BaseModel): """Schema for appending to a single note.""" path: str = Field(description="Relative path to the note") content: str = Field(description="Content to append to the note")
- src/obsidian_mcp/vault.py:551-582 (helper)Supporting method in ObsidianVault class that implements the core single-note append logic, reading existing content, ensuring newline separation, and asynchronously writing the updated content.async def append_to_note(self, relative_path: str, content: str) -> None: """ Append content to an existing note. Args: relative_path: Path to the note content: Content to append Raises: VaultSecurityError: If path is invalid FileNotFoundError: If note doesn't exist """ file_path = self._validate_path(relative_path) if not file_path.exists(): raise FileNotFoundError(f"Note not found: {relative_path}") # Read existing content async with aiofiles.open(file_path, encoding="utf-8") as f: existing = await f.read() # Append new content (with newline separator if needed) if not existing.endswith("\n"): existing += "\n" existing += content # Write back async with aiofiles.open(file_path, "w", encoding="utf-8") as f: await f.write(existing) logger.info(f"Appended to note: {relative_path}")
- src/obsidian_mcp/vault.py:917-971 (helper)Batch backup creation helper used by batch_append_notes for atomicity. Creates timestamped backups of specified notes concurrently with metadata preservation.async def create_batch_backup(self, relative_paths: list[str]) -> str: """ Create a backup of multiple notes asynchronously. Args: relative_paths: List of note paths to backup Returns: Backup ID (timestamp) for later restoration Raises: VaultSecurityError: If any path is invalid FileNotFoundError: If any note doesn't exist """ # Validate all paths first file_paths = [] for rel_path in relative_paths: file_path = self._validate_path(rel_path) if not file_path.exists(): raise FileNotFoundError(f"Note not found: {rel_path}") file_paths.append((rel_path, file_path)) # Create backup directory with timestamp backup_id = datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir = self.vault_path / ".batch_backups" / backup_id backup_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Creating batch backup {backup_id}: {len(relative_paths)} files...") # Copy all files to backup asynchronously async def copy_file(i: int, rel_path: str, file_path: Path) -> None: backup_file = backup_dir / rel_path backup_file.parent.mkdir(parents=True, exist_ok=True) # Use async file operations async with aiofiles.open(file_path, "rb") as src: content = await src.read() async with aiofiles.open(backup_file, "wb") as dst: await dst.write(content) # Preserve metadata shutil.copystat(file_path, backup_file) logger.debug(f"Backed up ({i}/{len(file_paths)}): {rel_path}") # Run all copies concurrently await asyncio.gather( *[ copy_file(i, rel_path, file_path) for i, (rel_path, file_path) in enumerate(file_paths, 1) ] ) logger.info(f"Completed batch backup: {backup_id} ({len(relative_paths)} notes)") return backup_id
- src/obsidian_mcp/vault.py:972-1022 (helper)Batch restore helper used for rollback on failure in batch_append_notes. Restores notes from backup concurrently preserving structure and metadata.async def restore_batch_backup(self, backup_id: str) -> list[str]: """ Restore notes from a batch backup asynchronously. Args: backup_id: Backup ID (timestamp) to restore from Returns: List of restored note paths Raises: FileNotFoundError: If backup doesn't exist """ backup_dir = self.vault_path / ".batch_backups" / backup_id if not backup_dir.exists(): raise FileNotFoundError(f"Backup not found: {backup_id}") logger.info(f"Restoring batch backup {backup_id}...") # Get all backup files backup_files = list(backup_dir.rglob("*.md")) # Restore all files asynchronously async def restore_file(i: int, backup_file: Path) -> str: # Get relative path from backup directory rel_path = backup_file.relative_to(backup_dir) target_file = self.vault_path / rel_path # Ensure parent directory exists target_file.parent.mkdir(parents=True, exist_ok=True) # Restore file async with aiofiles.open(backup_file, "rb") as src: content = await src.read() async with aiofiles.open(target_file, "wb") as dst: await dst.write(content) # Preserve metadata shutil.copystat(backup_file, target_file) logger.debug(f"Restored ({i}): {rel_path}") return str(rel_path) # Run all restores concurrently restored = await asyncio.gather( *[restore_file(i, backup_file) for i, backup_file in enumerate(backup_files, 1)] ) logger.info(f"Completed batch restore: {backup_id} ({len(restored)} notes)") return list(restored)