Skip to main content
Glama

Obsidian MCP Server

by suhailnajeeb
organization.py58.1 kB
"""Organization tools for Obsidian MCP server.""" import re from typing import List, Dict, Any, Optional from fastmcp import Context from ..utils.filesystem import get_vault from ..utils import validate_note_path, sanitize_path, is_markdown_file from ..utils.validation import validate_tags from ..models import Note, NoteMetadata, Tag from ..constants import ERROR_MESSAGES async def move_note( source_path: str, destination_path: str, update_links: bool = True, ctx: Context = None ) -> dict: """ Move a note to a new location, optionally with a new name. Use this tool to reorganize your vault by moving notes to different folders. If the filename changes during the move, all wiki-style links will be automatically updated throughout your vault. Args: source_path: Current path of the note destination_path: New path for the note (can include a new filename) update_links: Whether to update links when filename changes (default: true) ctx: MCP context for progress reporting Returns: Dictionary containing move status and link update information Examples: >>> # Move without renaming (no link updates needed) >>> await move_note("Inbox/Note.md", "Projects/Note.md") { "source": "Inbox/Note.md", "destination": "Projects/Note.md", "moved": true, "renamed": false, "links_updated": 0 } >>> # Move with renaming (links will be updated) >>> await move_note("Inbox/Quick Note.md", "Projects/Project Plan.md") { "source": "Inbox/Quick Note.md", "destination": "Projects/Project Plan.md", "moved": true, "renamed": true, "links_updated": 5, "notes_updated": 3 } """ # Validate paths for path, name in [(source_path, "source"), (destination_path, "destination")]: is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid {name} path: {error_msg}") # Import link management functions from ..tools.link_management import get_backlinks # Sanitize paths source_path = sanitize_path(source_path) destination_path = sanitize_path(destination_path) if source_path == destination_path: raise ValueError("Source and destination paths are the same") vault = get_vault() # Check if source exists try: source_note = await vault.read_note(source_path) except FileNotFoundError: # If exact path not found, try to find the note by filename if ctx: ctx.info(f"Note not found at {source_path}, searching for filename...") # Import search function from ..tools.search_discovery import search_notes # Search for notes with this filename source_filename = source_path.split('/')[-1] search_result = await search_notes(f"path:{source_filename}", max_results=10, ctx=None) if search_result["count"] == 0: raise FileNotFoundError(ERROR_MESSAGES["note_not_found"].format(path=source_path)) elif search_result["count"] == 1: # Exactly one match - use it found_path = search_result["results"][0]["path"] if ctx: ctx.info(f"Found unique match at: {found_path}") # Update source_path to the found path source_path = found_path # Now try to read the note again source_note = await vault.read_note(source_path) else: # Multiple matches - show them to the user matches = [result["path"] for result in search_result["results"]] matches_str = "\n - ".join(matches) raise ValueError( f"Multiple notes found with name '{source_filename}'. Please specify the full path:\n - {matches_str}" ) # Extract filenames to check if name is changing source_filename = source_path.split('/')[-1] dest_filename = destination_path.split('/')[-1] source_name = source_filename[:-3] if source_filename.endswith('.md') else source_filename dest_name = dest_filename[:-3] if dest_filename.endswith('.md') else dest_filename name_changed = source_name != dest_name if ctx: ctx.info(f"Moving note from {source_path} to {destination_path}") if name_changed: ctx.info(f"Filename is changing from '{source_filename}' to '{dest_filename}'") # Check if destination already exists try: await vault.read_note(destination_path) raise FileExistsError(f"Note already exists at destination: {destination_path}") except FileNotFoundError: # Good, destination doesn't exist pass # If name is changing and update_links is True, update all backlinks before moving links_updated = 0 notes_updated = 0 link_update_details = [] if name_changed and update_links: if ctx: ctx.info(f"Filename changed - updating all links from '{source_name}' to '{dest_name}'") # Get all backlinks to the old note backlinks_result = await get_backlinks(source_path, include_context=False, ctx=None) backlinks = backlinks_result['findings'] if ctx: ctx.info(f"Found {len(backlinks)} backlinks to update") # Group backlinks by source note for efficient updating updates_by_note = {} for backlink in backlinks: source = backlink['source_path'] if source not in updates_by_note: updates_by_note[source] = [] updates_by_note[source].append(backlink) # Update each note that contains backlinks for note_path, note_backlinks in updates_by_note.items(): try: # Read the note note = await vault.read_note(note_path) content = note.content original_content = content updates_in_note = 0 # Replace all wiki-style links to the old note patterns_to_replace = [ (re.compile(rf'\[\[{re.escape(source_name)}\]\]'), f'[[{dest_name}]]'), (re.compile(rf'\[\[{re.escape(source_name)}\.md\]\]'), f'[[{dest_name}.md]]'), (re.compile(rf'\[\[{re.escape(source_filename)}\]\]'), f'[[{dest_filename}]]'), # With aliases (re.compile(rf'\[\[{re.escape(source_name)}\|([^\]]+)\]\]'), rf'[[{dest_name}|\1]]'), (re.compile(rf'\[\[{re.escape(source_name)}\.md\|([^\]]+)\]\]'), rf'[[{dest_name}.md|\1]]'), (re.compile(rf'\[\[{re.escape(source_filename)}\|([^\]]+)\]\]'), rf'[[{dest_filename}|\1]]'), ] # Apply all replacements for pattern, replacement in patterns_to_replace: content, count = pattern.subn(replacement, content) updates_in_note += count # If content changed, write it back if content != original_content: await vault.write_note(note_path, content, overwrite=True) links_updated += updates_in_note notes_updated += 1 link_update_details.append({ "note": note_path, "updates": updates_in_note }) if ctx: ctx.info(f"Updated {updates_in_note} links in {note_path}") except Exception as e: if ctx: ctx.info(f"Error updating links in {note_path}: {str(e)}") # Create note at new location await vault.write_note(destination_path, source_note.content, overwrite=False) # Delete original note await vault.delete_note(source_path) if ctx: if name_changed and update_links: ctx.info(f"Successfully moved and renamed note, updated {links_updated} links") else: ctx.info(f"Successfully moved note") # Return standardized move operation structure return { "success": True, "source": source_path, "destination": destination_path, "type": "note", "renamed": name_changed, "details": { "items_moved": 1, "links_updated": links_updated, "notes_updated": notes_updated, "link_update_details": link_update_details[:10] if name_changed else [] } } async def rename_note( old_path: str, new_path: str, update_links: bool = True, ctx: Context = None ) -> dict: """ Rename a note and optionally update all references to it. Use this tool to change a note's filename while automatically updating all wiki-style links ([[note name]]) that reference it throughout your vault. This maintains link integrity when reorganizing notes. Args: old_path: Current path of the note new_path: New path for the note (must be in same directory) update_links: Whether to update links in other notes (default: true) ctx: MCP context for progress reporting Returns: Dictionary containing rename status and updated links information Example: >>> await rename_note("Projects/AI Research.md", "Projects/Machine Learning Study.md", ctx=ctx) { "success": true, "old_path": "Projects/AI Research.md", "new_path": "Projects/Machine Learning Study.md", "operation": "renamed", "details": { "links_updated": 12, "notes_updated": 8, "link_update_details": [ { "note": "Daily/2024-01-15.md", "updates": 2 } ] } } """ # Import link management functions from ..tools.link_management import get_backlinks, WIKI_LINK_PATTERN # Validate paths for path, name in [(old_path, "old"), (new_path, "new")]: is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid {name} path: {error_msg}") # Sanitize paths old_path = sanitize_path(old_path) new_path = sanitize_path(new_path) if old_path == new_path: raise ValueError("Old and new paths are the same") vault = get_vault() # Check if source exists try: source_note = await vault.read_note(old_path) except FileNotFoundError: # If exact path not found, try to find the note by filename if ctx: ctx.info(f"Note not found at {old_path}, searching for filename...") # Import search function from ..tools.search_discovery import search_notes # Search for notes with this filename old_filename = old_path.split('/')[-1] search_result = await search_notes(f"path:{old_filename}", max_results=10, ctx=None) if search_result["count"] == 0: raise FileNotFoundError(ERROR_MESSAGES["note_not_found"].format(path=old_path)) elif search_result["count"] == 1: # Exactly one match - use it found_path = search_result["results"][0]["path"] if ctx: ctx.info(f"Found unique match at: {found_path}") # Update old_path to the found path old_path = found_path # Also update new_path to use the same directory found_dir = '/'.join(found_path.split('/')[:-1]) if '/' in found_path else '' new_filename = new_path.split('/')[-1] new_path = f"{found_dir}/{new_filename}" if found_dir else new_filename # Now try to read the note again source_note = await vault.read_note(old_path) else: # Multiple matches - show them to the user matches = [result["path"] for result in search_result["results"]] matches_str = "\n - ".join(matches) raise ValueError( f"Multiple notes found with name '{old_filename}'. Please specify the full path:\n - {matches_str}" ) # Now that we have the actual paths, extract directory and filename old_dir = '/'.join(old_path.split('/')[:-1]) if '/' in old_path else '' new_dir = '/'.join(new_path.split('/')[:-1]) if '/' in new_path else '' if old_dir != new_dir: raise ValueError("Rename can only change the filename, not the directory. Use move_note to change directories.") # Extract filenames with and without .md extension old_filename = old_path.split('/')[-1] new_filename = new_path.split('/')[-1] old_name = old_filename[:-3] if old_filename.endswith('.md') else old_filename new_name = new_filename[:-3] if new_filename.endswith('.md') else new_filename if ctx: ctx.info(f"Renaming note from {old_path} to {new_path}") # Check if destination already exists try: await vault.read_note(new_path) raise FileExistsError(f"Note already exists at destination: {new_path}") except FileNotFoundError: # Good, destination doesn't exist pass # If update_links is True, find and update all backlinks before renaming links_updated = 0 notes_updated = 0 link_update_details = [] if update_links: if ctx: ctx.info(f"Finding all notes that link to {old_name}") # Get all backlinks to the old note backlinks_result = await get_backlinks(old_path, include_context=False, ctx=None) backlinks = backlinks_result['findings'] if ctx: ctx.info(f"Found {len(backlinks)} backlinks to update") # Group backlinks by source note for efficient updating updates_by_note = {} for backlink in backlinks: source = backlink['source_path'] if source not in updates_by_note: updates_by_note[source] = [] updates_by_note[source].append(backlink) # Update each note that contains backlinks for note_path, note_backlinks in updates_by_note.items(): try: # Read the note note = await vault.read_note(note_path) content = note.content original_content = content updates_in_note = 0 # Replace all wiki-style links to the old note # We need to handle various formats: # [[old_name]], [[old_name.md]], [[old_name|alias]] # Pattern to match wiki links to our old note patterns_to_replace = [ (re.compile(rf'\[\[{re.escape(old_name)}\]\]'), f'[[{new_name}]]'), (re.compile(rf'\[\[{re.escape(old_name)}\.md\]\]'), f'[[{new_name}.md]]'), (re.compile(rf'\[\[{re.escape(old_filename)}\]\]'), f'[[{new_filename}]]'), # With aliases (re.compile(rf'\[\[{re.escape(old_name)}\|([^\]]+)\]\]'), rf'[[{new_name}|\1]]'), (re.compile(rf'\[\[{re.escape(old_name)}\.md\|([^\]]+)\]\]'), rf'[[{new_name}.md|\1]]'), (re.compile(rf'\[\[{re.escape(old_filename)}\|([^\]]+)\]\]'), rf'[[{new_filename}|\1]]'), ] # Apply all replacements for pattern, replacement in patterns_to_replace: content, count = pattern.subn(replacement, content) updates_in_note += count # If content changed, write it back if content != original_content: await vault.write_note(note_path, content, overwrite=True) links_updated += updates_in_note notes_updated += 1 link_update_details.append({ "note": note_path, "updates": updates_in_note }) if ctx: ctx.info(f"Updated {updates_in_note} links in {note_path}") except Exception as e: if ctx: ctx.info(f"Error updating links in {note_path}: {str(e)}") # Now rename the note itself await vault.write_note(new_path, source_note.content, overwrite=False) await vault.delete_note(old_path) if ctx: ctx.info(f"Successfully renamed note and updated {links_updated} links") # Return standardized CRUD success structure return { "success": True, "old_path": old_path, "new_path": new_path, "operation": "renamed", "details": { "links_updated": links_updated, "notes_updated": notes_updated, "link_update_details": link_update_details[:10] # Limit details to first 10 notes } } async def create_folder( folder_path: str, create_placeholder: bool = True, ctx: Context = None ) -> dict: """ Create a new folder in the vault, including all parent folders. Since Obsidian doesn't have explicit folders (they're created automatically when notes are added), this tool creates a folder by adding a placeholder file. It will create all necessary parent folders in the path. Args: folder_path: Path of the folder to create (e.g., "Research/Studies/2024") create_placeholder: Whether to create a placeholder file (default: true) ctx: MCP context for progress reporting Returns: Dictionary containing creation status Example: >>> await create_folder("Research/Studies/2024", ctx=ctx) { "folder": "Research/Studies/2024", "created": true, "placeholder_file": "Research/Studies/2024/.gitkeep", "folders_created": ["Research", "Research/Studies", "Research/Studies/2024"] } """ # Validate folder path if folder_path.endswith('.md') or folder_path.endswith('.markdown'): raise ValueError(f"Invalid folder path: '{folder_path}'. Folder paths should not end with .md") if '..' in folder_path or folder_path.startswith('/'): raise ValueError(f"Invalid folder path: '{folder_path}'. Paths must be relative and cannot contain '..'") if not folder_path or folder_path.isspace(): raise ValueError("Folder path cannot be empty") # Sanitize path folder_path = folder_path.strip('/').replace('\\', '/') if ctx: ctx.info(f"Creating folder: {folder_path}") vault = get_vault() # Split the path to check each level path_parts = folder_path.split('/') folders_to_check = [] folders_created = [] # Build list of all folders to check/create for i in range(len(path_parts)): partial_path = '/'.join(path_parts[:i+1]) folders_to_check.append(partial_path) # Check each folder level from ..tools.search_discovery import list_notes for folder in folders_to_check: try: existing_notes = await list_notes(folder, recursive=False, ctx=None) # Folder exists if we can list it (even with 0 notes) if ctx: ctx.info(f"Folder already exists: {folder}") except Exception: # Folder doesn't exist, mark it for creation folders_created.append(folder) if ctx: ctx.info(f"Will create folder: {folder}") if not folders_created and not create_placeholder: # All folders already exist # Return standardized CRUD success structure return { "success": True, "path": folder_path, "operation": "exists", "details": { "created": False, "message": "All folders in path already exist", "folders_created": [] } } if not create_placeholder: # Return standardized CRUD success structure return { "success": True, "path": folder_path, "operation": "created", "details": { "created": True, "message": "Folders will be created when first note is added", "placeholder_file": None, "folders_created": folders_created } } # Create a placeholder file in the deepest folder to establish the entire path placeholder_path = f"{folder_path}/.gitkeep" placeholder_content = f"# Folder: {folder_path}\n\nThis file ensures the folder exists in the vault structure.\n" try: await vault.write_note(placeholder_path, placeholder_content, overwrite=False) # Return standardized CRUD success structure return { "success": True, "path": folder_path, "operation": "created", "details": { "created": True, "placeholder_file": placeholder_path, "folders_created": folders_created if folders_created else ["(all already existed)"] } } except Exception as e: # Try with README.md if .gitkeep fails try: readme_path = f"{folder_path}/README.md" readme_content = f"# {folder_path.split('/')[-1]}\n\nThis folder contains notes related to {folder_path.replace('/', ' > ')}.\n" await vault.write_note(readme_path, readme_content, overwrite=False) # Return standardized CRUD success structure return { "success": True, "path": folder_path, "operation": "created", "details": { "created": True, "placeholder_file": readme_path, "folders_created": folders_created if folders_created else ["(all already existed)"] } } except Exception as e2: raise ValueError(f"Failed to create folder: {str(e2)}") async def move_folder( source_folder: str, destination_folder: str, update_links: bool = True, ctx: Context = None ) -> dict: """ Move an entire folder and all its contents to a new location. Use this tool to reorganize your vault structure by moving entire folders with all their notes and subfolders. Args: source_folder: Current folder path (e.g., "Projects/Old") destination_folder: New folder path (e.g., "Archive/Projects/Old") update_links: Whether to update links in other notes (default: true) ctx: MCP context for progress reporting Returns: Dictionary containing move status and statistics Example: >>> await move_folder("Projects/Completed", "Archive/2024/Projects", ctx=ctx) { "source": "Projects/Completed", "destination": "Archive/2024/Projects", "moved": true, "notes_moved": 15, "folders_moved": 3, "links_updated": 0 } """ # Validate folder paths (no .md extension) for folder, name in [(source_folder, "source"), (destination_folder, "destination")]: if folder.endswith('.md') or folder.endswith('.markdown'): raise ValueError(f"Invalid {name} folder path: '{folder}'. Folder paths should not end with .md") if '..' in folder or folder.startswith('/'): raise ValueError(f"Invalid {name} folder path: '{folder}'. Paths must be relative and cannot contain '..'") # Sanitize paths source_folder = source_folder.strip('/').replace('\\', '/') destination_folder = destination_folder.strip('/').replace('\\', '/') if source_folder == destination_folder: raise ValueError("Source and destination folders are the same") # Check if destination is a subfolder of source (would create circular reference) if destination_folder.startswith(source_folder + '/'): raise ValueError("Cannot move a folder into its own subfolder") if ctx: ctx.info(f"Moving folder from {source_folder} to {destination_folder}") vault = get_vault() # Get all notes in the source folder recursively from ..tools.search_discovery import list_notes folder_contents = await list_notes(source_folder, recursive=True, ctx=None) if folder_contents["count"] == 0: raise ValueError(f"No notes found in folder: {source_folder}") notes_moved = 0 folders_moved = set() # Track unique folders links_updated = 0 errors = [] # Move each note for note_info in folder_contents["notes"]: old_path = note_info["path"] # Calculate new path by replacing the source folder prefix relative_path = old_path[len(source_folder):].lstrip('/') new_path = f"{destination_folder}/{relative_path}" if destination_folder else relative_path # Track folders folder_parts = relative_path.split('/')[:-1] # Exclude filename for i in range(len(folder_parts)): folder_path = '/'.join(folder_parts[:i+1]) folders_moved.add(folder_path) try: # Read the note note = await vault.read_note(old_path) # Create at new location await vault.write_note(new_path, note.content, overwrite=False) # Delete from old location await vault.delete_note(old_path) notes_moved += 1 if ctx: ctx.info(f"Moved: {old_path} → {new_path}") except Exception as e: errors.append(f"Failed to move {old_path}: {str(e)}") if ctx: ctx.info(f"Error moving {old_path}: {str(e)}") # Update links if requested if update_links: # This would require searching for all notes that link to notes in the source folder # and updating them. For now, we'll mark this as a future enhancement. pass # Return standardized move operation structure result = { "success": True, "source": source_folder, "destination": destination_folder, "type": "folder", "details": { "items_moved": notes_moved, "links_updated": links_updated, "notes_moved": notes_moved, "folders_moved": len(folders_moved) } } if errors: result["details"]["errors"] = errors[:5] # Limit to first 5 errors result["details"]["total_errors"] = len(errors) return result async def add_tags( path: str, tags: List[str], ctx: Context = None ) -> dict: """ Add tags to a note's frontmatter. Use this tool to add organizational tags to notes. Tags are added to the YAML frontmatter and do not modify the note's content. Args: path: Path to the note tags: List of tags to add (without # prefix) ctx: MCP context for progress reporting Returns: Dictionary containing updated tag list Example: >>> await add_tags("Projects/AI.md", ["machine-learning", "research"], ctx=ctx) { "path": "Projects/AI.md", "tags_added": ["machine-learning", "research"], "all_tags": ["ai", "project", "machine-learning", "research"] } """ # Validate path is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid path: {error_msg}") path = sanitize_path(path) # Validate tags is_valid, error = validate_tags(tags) if not is_valid: raise ValueError(error) # Clean tags (remove # prefix if present) - validation already does this tags = [tag.lstrip("#").strip() for tag in tags if tag.strip()] if ctx: ctx.info(f"Adding tags to {path}: {tags}") vault = get_vault() try: note = await vault.read_note(path) except FileNotFoundError: raise FileNotFoundError(ERROR_MESSAGES["note_not_found"].format(path=path)) # Parse frontmatter and update tags content = note.content updated_content = _update_frontmatter_tags(content, tags, "add") # Update the note await vault.write_note(path, updated_content, overwrite=True) # Get updated note to return current tags updated_note = await vault.read_note(path) # Return standardized tag operation structure return { "success": True, "path": path, "operation": "added", "tags": { "before": note.metadata.tags if note.metadata.tags else [], "after": updated_note.metadata.tags, "changes": { "added": tags, "removed": [] } } } async def update_tags( path: str, tags: List[str], merge: bool = False, ctx: Context = None ) -> dict: """ Update tags on a note - either replace all tags or merge with existing. Use this tool when you want to set a note's tags based on its content or purpose. Perfect for AI-driven tag suggestions after analyzing a note. Args: path: Path to the note tags: New tags to set (without # prefix) merge: If True, adds to existing tags. If False, replaces all tags (default: False) ctx: MCP context for progress reporting Returns: Dictionary containing previous and new tag lists Example: >>> # After analyzing a note about machine learning project >>> await update_tags("Projects/ML Research.md", ["ai", "research", "neural-networks"], ctx=ctx) { "path": "Projects/ML Research.md", "previous_tags": ["project", "todo"], "new_tags": ["ai", "research", "neural-networks"], "operation": "replaced" } """ # Validate path is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid path: {error_msg}") path = sanitize_path(path) # Validate tags is_valid, error = validate_tags(tags) if not is_valid: raise ValueError(error) # Clean tags (remove # prefix if present) tags = [tag.lstrip("#").strip() for tag in tags if tag.strip()] if ctx: ctx.info(f"Updating tags for {path}: {tags} (merge={merge})") vault = get_vault() try: note = await vault.read_note(path) except FileNotFoundError: raise FileNotFoundError(ERROR_MESSAGES["note_not_found"].format(path=path)) # Store previous tags previous_tags = note.metadata.tags.copy() if note.metadata.tags else [] # Determine final tags based on merge setting if merge: # Merge with existing tags (like add_tags but more explicit) final_tags = list(set(previous_tags + tags)) operation = "merged" else: # Replace all tags final_tags = tags operation = "replaced" # Update the note's frontmatter content = note.content updated_content = _update_frontmatter_tags(content, final_tags, "replace") # Update the note await vault.write_note(path, updated_content, overwrite=True) # Return standardized tag operation structure added_tags = list(set(final_tags) - set(previous_tags)) if merge else final_tags removed_tags = list(set(previous_tags) - set(final_tags)) if not merge else [] return { "success": True, "path": path, "operation": "updated", "tags": { "before": previous_tags, "after": final_tags, "changes": { "added": added_tags, "removed": removed_tags, "merge_mode": merge, "operation_type": operation } } } async def remove_tags( path: str, tags: List[str], ctx: Context = None ) -> dict: """ Remove tags from a note's frontmatter. Use this tool to remove organizational tags from notes. Args: path: Path to the note tags: List of tags to remove (without # prefix) ctx: MCP context for progress reporting Returns: Dictionary containing updated tag list Example: >>> await remove_tags("Projects/AI.md", ["outdated"], ctx=ctx) { "path": "Projects/AI.md", "tags_removed": ["outdated"], "remaining_tags": ["ai", "project", "machine-learning"] } """ # Validate path is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid path: {error_msg}") path = sanitize_path(path) # Validate tags is_valid, error = validate_tags(tags) if not is_valid: raise ValueError(error) # Clean tags (remove # prefix if present) - validation already does this tags = [tag.lstrip("#").strip() for tag in tags if tag.strip()] if ctx: ctx.info(f"Removing tags from {path}: {tags}") vault = get_vault() try: note = await vault.read_note(path) except FileNotFoundError: raise FileNotFoundError(ERROR_MESSAGES["note_not_found"].format(path=path)) # Parse frontmatter and update tags content = note.content updated_content = _update_frontmatter_tags(content, tags, "remove") # Update the note await vault.write_note(path, updated_content, overwrite=True) # Get updated note to return current tags updated_note = await vault.read_note(path) # Return standardized tag operation structure return { "success": True, "path": path, "operation": "removed", "tags": { "before": note.metadata.tags if note.metadata.tags else [], "after": updated_note.metadata.tags if updated_note.metadata.tags else [], "changes": { "added": [], "removed": tags } } } async def get_note_info( path: str, ctx: Context = None ) -> dict: """ Get metadata and information about a note without retrieving its full content. Use this tool when you need to check a note's metadata, tags, or other properties without loading the entire content. Args: path: Path to the note ctx: MCP context for progress reporting Returns: Dictionary containing note metadata and statistics Example: >>> await get_note_info("Projects/AI Research.md", ctx=ctx) { "path": "Projects/AI Research.md", "exists": true, "metadata": { "tags": ["ai", "research", "active"], "created": "2024-01-10T10:00:00Z", "modified": "2024-01-15T14:30:00Z", "aliases": ["AI Study", "ML Research"] }, "stats": { "size_bytes": 4523, "word_count": 823, "link_count": 12 } } """ # Validate path is_valid, error_msg = validate_note_path(path) if not is_valid: raise ValueError(f"Invalid path: {error_msg}") path = sanitize_path(path) if ctx: ctx.info(f"Getting info for: {path}") vault = get_vault() try: note = await vault.read_note(path) except FileNotFoundError: # Return standardized CRUD structure for non-existent note return { "success": False, "path": path, "operation": "read", "details": { "exists": False, "error": "Note not found" } } # Calculate statistics content = note.content word_count = len(content.split()) # Count links (both [[wikilinks]] and [markdown](links)) wikilink_count = len(re.findall(r'\[\[([^\]]+)\]\]', content)) markdown_link_count = len(re.findall(r'\[([^\]]+)\]\(([^)]+)\)', content)) link_count = wikilink_count + markdown_link_count # Return standardized CRUD success structure return { "success": True, "path": path, "operation": "read", "details": { "exists": True, "metadata": note.metadata.model_dump(exclude_none=True), "stats": { "size_bytes": len(content.encode('utf-8')), "word_count": word_count, "link_count": link_count } } } def _update_frontmatter_tags(content: str, tags: List[str], operation: str) -> str: """ Update tags in YAML frontmatter. Args: content: Note content tags: Tags to add, remove, or replace with operation: "add", "remove", or "replace" Returns: Updated content """ # Check if frontmatter exists if not content.startswith("---\n"): # Create frontmatter if it doesn't exist if operation in ["add", "replace"]: frontmatter = f"---\ntags: {tags}\n---\n\n" return frontmatter + content else: # Nothing to remove if no frontmatter return content # Parse existing frontmatter try: end_index = content.index("\n---\n", 4) + 5 frontmatter = content[4:end_index-5] rest_of_content = content[end_index:] except ValueError: # Invalid frontmatter return content # Parse YAML manually (simple approach for tags) lines = frontmatter.split('\n') new_lines = [] tags_found = False in_tags_list = False existing_tags = [] i = 0 while i < len(lines): line = lines[i] if line.startswith('tags:'): tags_found = True # Check if tags are on the same line if '[' in line: # Array format: tags: [tag1, tag2] match = re.search(r'\[(.*?)\]', line) if match: existing_tags = [t.strip().strip('"').strip("'") for t in match.group(1).split(',') if t.strip()] elif line.strip() != 'tags:': # Inline format: tags: tag1 tag2 existing_tags = line.split(':', 1)[1].strip().split() else: # Bullet list format on next lines in_tags_list = True i += 1 # Parse all bullet list items while i < len(lines) and lines[i].strip().startswith('- '): tag = lines[i].strip()[2:].strip() if tag: existing_tags.append(tag) i += 1 i -= 1 # Back up one since the outer loop will increment # Update tags based on operation if operation == "add": # Add new tags, avoid duplicates for tag in tags: if tag not in existing_tags: existing_tags.append(tag) elif operation == "replace": # Replace all tags existing_tags = tags else: # remove existing_tags = [t for t in existing_tags if t not in tags] # Format updated tags if existing_tags: new_lines.append(f"tags: [{', '.join(existing_tags)}]") # Skip line if no tags remain elif in_tags_list and line.strip().startswith('- '): # Skip bullet list items - they've already been processed pass else: new_lines.append(line) in_tags_list = False i += 1 # If no tags were found and we're adding or replacing, add them if not tags_found and operation in ["add", "replace"]: new_lines.insert(0, f"tags: [{', '.join(tags)}]") # Reconstruct content new_frontmatter = '\n'.join(new_lines) return f"---\n{new_frontmatter}\n---\n{rest_of_content}" async def list_tags( include_counts: bool = True, sort_by: str = "name", include_files: bool = False, ctx=None ) -> dict: """ List all unique tags used across the vault with usage statistics. Use this tool to discover existing tags before creating new ones. This helps maintain consistency in your tagging system and prevents duplicate tags with slight variations (e.g., 'project' vs 'projects'). Args: include_counts: Whether to include usage count for each tag (default: true) sort_by: How to sort results - "name" (alphabetical) or "count" (by usage) (default: "name") include_files: Whether to include file paths for each tag (default: false) ctx: MCP context for progress reporting Returns: Dictionary containing all unique tags with optional usage counts Example: >>> await list_tags(include_counts=True, sort_by="count") { "items": [ {"name": "project", "count": 42}, {"name": "meeting", "count": 38}, {"name": "idea", "count": 15} ], "total": 25, "scope": {"include_counts": true, "sort_by": "count", "include_files": false} } >>> await list_tags(include_files=True, include_counts=False) { "items": [ {"name": "project", "files": ["Projects/Web App.md", "Projects/Mobile App.md"]}, {"name": "meeting", "files": ["Meetings/2024-01-15.md", "Meetings/2024-01-22.md"]} ], "total": 2, "scope": {"include_counts": false, "sort_by": "name", "include_files": true} } """ # Validate sort_by parameter if sort_by not in ["name", "count"]: raise ValueError(ERROR_MESSAGES["invalid_sort_by"].format(value=sort_by)) if ctx: ctx.info("Collecting tags from vault...") vault = get_vault() # Dictionary to store tag counts and file paths tag_counts = {} tag_files = {} if include_files else None try: # Get all notes in the vault all_notes = await vault.list_notes(recursive=True) if ctx: ctx.info(f"Scanning {len(all_notes)} notes for tags...") # Process notes in batches for better performance import asyncio # Adjust batch size based on vault size batch_size = 50 if len(all_notes) > 1000 else 20 max_concurrent = asyncio.Semaphore(batch_size) async def process_note(note_info): async with max_concurrent: try: note = await vault.read_note(note_info["path"]) if note and note.metadata and note.metadata.tags: return (note_info["path"], note.metadata.tags) except Exception: return (note_info["path"], []) return (note_info["path"], []) # Process all notes concurrently with semaphore limiting tasks = [process_note(note_info) for note_info in all_notes] results = await asyncio.gather(*tasks) # Count tags and collect file paths for path, tags in results: for tag in tags: if tag: tag_counts[tag] = tag_counts.get(tag, 0) + 1 if include_files: if tag not in tag_files: tag_files[tag] = [] tag_files[tag].append(path) # Format results if include_counts or include_files: tags = [] for tag, count in tag_counts.items(): tag_item = {"name": tag} if include_counts: tag_item["count"] = count if include_files and tag in tag_files: tag_item["files"] = sorted(tag_files[tag]) tags.append(tag_item) # Sort based on preference if sort_by == "count": tags.sort(key=lambda x: x.get("count", 0), reverse=True) else: # sort by name tags.sort(key=lambda x: x["name"].lower()) else: # Just return tag names sorted tags = sorted(tag_counts.keys(), key=str.lower) # Return standardized list results structure return { "items": tags, "total": len(tag_counts), "scope": { "include_counts": include_counts, "sort_by": sort_by, "include_files": include_files } } except Exception as e: if ctx: ctx.info(f"Failed to list tags: {str(e)}") raise ValueError(ERROR_MESSAGES["tag_collection_failed"].format(error=str(e))) def _update_frontmatter_properties(content: str, property_updates: dict, properties_to_remove: List[str] = None) -> str: """ Update multiple frontmatter properties while preserving YAML structure. Args: content: Note content with frontmatter property_updates: Dict of properties to add/update {property: value} properties_to_remove: List of property names to remove Returns: Updated content with modified frontmatter """ import yaml from io import StringIO properties_to_remove = properties_to_remove or [] # Check if frontmatter exists if not content.startswith("---\n"): # Create frontmatter if properties to add if property_updates: # Use yaml.dump to ensure proper formatting yaml_content = yaml.dump(property_updates, default_flow_style=False, allow_unicode=True, sort_keys=False) return f"---\n{yaml_content}---\n\n{content}" return content # Parse existing frontmatter try: end_index = content.index("\n---\n", 4) + 5 frontmatter_str = content[4:end_index-5] rest_of_content = content[end_index:] except ValueError: # Invalid frontmatter return content # Parse YAML try: frontmatter = yaml.safe_load(frontmatter_str) or {} except yaml.YAMLError: # If YAML parsing fails, return original return content # Update properties for key, value in property_updates.items(): if value is None and key not in properties_to_remove: properties_to_remove.append(key) else: frontmatter[key] = value # Remove properties for prop in properties_to_remove: frontmatter.pop(prop, None) # Special handling for tags to ensure list format if 'tags' in frontmatter and isinstance(frontmatter['tags'], str): # Convert string tags to list frontmatter['tags'] = [frontmatter['tags']] # Dump back to YAML if frontmatter: yaml_content = yaml.dump(frontmatter, default_flow_style=False, allow_unicode=True, sort_keys=False) # Remove the trailing newline that yaml.dump adds yaml_content = yaml_content.rstrip('\n') return f"---\n{yaml_content}\n---\n{rest_of_content}" else: # No frontmatter left return rest_of_content.lstrip('\n') def _remove_inline_tags(content: str, tags_to_remove: List[str]) -> tuple[str, int]: """ Remove inline tags from note body while preserving frontmatter. Args: content: Full note content tags_to_remove: List of tags to remove (without # prefix) Returns: Tuple of (updated content, number of tags removed) """ if not tags_to_remove: return content, 0 # Skip frontmatter if it exists body_start = 0 if content.startswith("---\n"): try: end_index = content.index("\n---\n", 4) + 5 body_start = end_index except ValueError: pass frontmatter = content[:body_start] if body_start > 0 else "" body = content[body_start:] # Remove code blocks to avoid modifying tags in code code_blocks = [] # Remove fenced code blocks def replace_code_block(match): code_blocks.append(match.group(0)) return f"__CODE_BLOCK_{len(code_blocks)-1}__" body_no_code = re.sub(r'```[\s\S]*?```', replace_code_block, body) # Remove inline code inline_code = [] def replace_inline_code(match): inline_code.append(match.group(0)) return f"__INLINE_CODE_{len(inline_code)-1}__" body_no_code = re.sub(r'`[^`]+`', replace_inline_code, body_no_code) # Remove tags tags_removed = 0 for tag in tags_to_remove: # Escape special regex characters escaped_tag = re.escape(tag) # Match #tag at word boundaries (not part of URL or other text) pattern = rf'(^|\s)#{escaped_tag}(?=\s|$|\.|,|;|:|!|\?|\))' # Count matches before replacing matches = re.findall(pattern, body_no_code) tags_removed += len(matches) # Remove the tags (keep the whitespace before the tag) body_no_code = re.sub(pattern, r'\1', body_no_code) # Clean up multiple spaces left by removal body_no_code = re.sub(r' +', ' ', body_no_code) # Restore code blocks for i, block in enumerate(code_blocks): body_no_code = body_no_code.replace(f"__CODE_BLOCK_{i}__", block) for i, code in enumerate(inline_code): body_no_code = body_no_code.replace(f"__INLINE_CODE_{i}__", code) return frontmatter + body_no_code, tags_removed async def batch_update_properties( search_criteria: dict, property_updates: dict = None, properties_to_remove: List[str] = None, add_tags: List[str] = None, remove_tags: List[str] = None, remove_inline_tags: bool = False, ctx=None ) -> dict: """ Batch update properties across multiple notes. Args: search_criteria: How to find notes - dict with one of: - 'query': Search query string - 'folder': Folder path to process - 'files': Explicit list of file paths property_updates: Dict of properties to add/update properties_to_remove: List of property names to remove add_tags: List of tags to add (additive) remove_tags: List of tags to remove remove_inline_tags: Whether to also remove tags from note body ctx: MCP context for progress reporting Returns: Dict with operation results and affected files """ vault = get_vault() # Validate search criteria if not search_criteria: raise ValueError( "search_criteria is required. Provide one of: " "1) {'query': 'tag:project'} for search-based selection, " "2) {'folder': 'Projects', 'recursive': true} for folder-based selection, " "3) {'files': ['Note1.md', 'Note2.md']} for specific files" ) if not any(k in search_criteria for k in ['query', 'folder', 'files']): raise ValueError( "search_criteria must include exactly one selection method: " "'query' (search string), 'folder' (directory path), or 'files' (list of paths). " f"Received: {list(search_criteria.keys())}" ) # Validate that at least one operation is specified if not any([property_updates, properties_to_remove, add_tags, remove_tags]): raise ValueError( "No operations specified. Provide at least one of: " "property_updates (to add/update properties), " "properties_to_remove (to delete properties), " "add_tags (to add tags), or " "remove_tags (to remove tags)" ) # Find notes to update notes_to_update = [] if 'files' in search_criteria: # Explicit file list for file_path in search_criteria['files']: try: note = await vault.read_note(file_path) if note: notes_to_update.append(file_path) except Exception: pass # Skip invalid files elif 'folder' in search_criteria: # All notes in folder folder = search_criteria['folder'] all_notes = await vault.list_notes(directory=folder, recursive=search_criteria.get('recursive', True)) notes_to_update = [note['path'] for note in all_notes] elif 'query' in search_criteria: # Search results from .search_discovery import search_notes results = await search_notes( query=search_criteria['query'], max_results=search_criteria.get('max_results', 500), ctx=ctx ) notes_to_update = [r['path'] for r in results['results']] if ctx: ctx.info(f"Found {len(notes_to_update)} notes to update") # Process each note results = { 'total_notes': len(notes_to_update), 'updated': 0, 'failed': 0, 'details': [], 'errors': [] } for note_path in notes_to_update: try: # Read note note = await vault.read_note(note_path) if not note: results['errors'].append({ 'path': note_path, 'error': 'Note not found' }) results['failed'] += 1 continue content = note.content original_content = content changes_made = [] # Handle special tag operations first if add_tags or remove_tags: # Get current tags from frontmatter current_tags = note.metadata.frontmatter.get('tags', []) if isinstance(current_tags, str): current_tags = [current_tags] elif not isinstance(current_tags, list): current_tags = list(current_tags) if current_tags else [] updated_tags = current_tags.copy() # Remove tags if remove_tags: for tag in remove_tags: if tag in updated_tags: updated_tags.remove(tag) changes_made.append(f"Removed tag '{tag}' from frontmatter") # Add tags if add_tags: for tag in add_tags: if tag not in updated_tags: updated_tags.append(tag) changes_made.append(f"Added tag '{tag}' to frontmatter") # Update frontmatter if tags changed if updated_tags != current_tags: if property_updates is None: property_updates = {} property_updates['tags'] = updated_tags # Update frontmatter properties if property_updates or properties_to_remove: content = _update_frontmatter_properties( content, property_updates or {}, properties_to_remove ) # Track changes if property_updates: for prop, value in property_updates.items(): if prop != 'tags': # Already tracked above changes_made.append(f"Set {prop} = {value}") if properties_to_remove: for prop in properties_to_remove: changes_made.append(f"Removed property '{prop}'") # Remove inline tags if requested inline_tags_removed = 0 if remove_inline_tags and remove_tags: content, inline_tags_removed = _remove_inline_tags(content, remove_tags) if inline_tags_removed > 0: changes_made.append(f"Removed {inline_tags_removed} inline tags") # Update note if changed if content != original_content: await vault.write_note(note_path, content, overwrite=True) results['updated'] += 1 results['details'].append({ 'path': note_path, 'changes': changes_made }) if ctx and results['updated'] % 10 == 0: ctx.info(f"Updated {results['updated']} notes...") except Exception as e: results['errors'].append({ 'path': note_path, 'error': str(e) }) results['failed'] += 1 if ctx: ctx.info(f"Batch update complete: {results['updated']} updated, {results['failed']} failed") return results

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/suhailnajeeb/obsidian-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server