Skip to main content
Glama

Obsidian MCP Server

server.py41.6 kB
""" Custom Obsidian MCP Server MCP server providing tools for Obsidian vault operations with a focus on Zettelkasten note creation. Connects to Obsidian Local REST API plugin. """ import json import re from typing import Optional, List, Dict, Any from enum import Enum from contextlib import asynccontextmanager from mcp.server.fastmcp import FastMCP, Context from pydantic import BaseModel, Field, field_validator, ConfigDict from .obsidian_client import ObsidianClient, ObsidianAPIError # Constants CHARACTER_LIMIT = 25000 # Maximum response size in characters # Shared Obsidian client instance obsidian_client: Optional[ObsidianClient] = None @asynccontextmanager async def app_lifespan(): """Manage ObsidianClient lifecycle.""" global obsidian_client obsidian_client = ObsidianClient() try: yield {"obsidian_client": obsidian_client} finally: if obsidian_client: await obsidian_client.close() # Initialize FastMCP server mcp = FastMCP("obsidian_mcp", lifespan=app_lifespan) # ============================================================================= # PYDANTIC INPUT MODELS # ============================================================================= class ListFilesInput(BaseModel): """Input for listing files in a directory.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') dirpath: Optional[str] = Field( default="", description="Relative directory path to list (empty string for vault root)", max_length=500 ) class GetFileInput(BaseModel): """Input for getting file contents.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file relative to vault root (e.g., 'Notes/zettelkasten/202411061234.md')", min_length=1, max_length=500 ) class BatchGetFilesInput(BaseModel): """Input for batch reading multiple files.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepaths: List[str] = Field( description="List of file paths to read", min_items=1, max_items=20 ) class SearchInput(BaseModel): """Input for searching vault content.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') query: str = Field( description="Text to search for in vault files", min_length=1, max_length=200 ) context_length: int = Field( default=100, description="Number of characters to show around each match for context", ge=0, le=500 ) class ComplexSearchInput(BaseModel): """Input for complex JsonLogic-based searches.""" model_config = ConfigDict(extra='forbid') query: Dict[str, Any] = Field( description="JsonLogic query object (e.g., {'glob': ['*.md', {'var': 'path'}]} matches all markdown files)" ) class WriteMode(str, Enum): """Mode for writing content to files.""" CREATE = "create" OVERWRITE = "overwrite" APPEND = "append" PREPEND = "prepend" class WriteNoteInput(BaseModel): """Input for writing/creating notes.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path where the note should be written (e.g., 'Zettelkasten/202411061234 - Note Title.md')", min_length=1, max_length=500 ) content: str = Field( description="The content to write to the note", min_length=0, max_length=50000 ) mode: WriteMode = Field( default=WriteMode.CREATE, description="Write mode: 'create' for new files only, 'overwrite' to replace, 'append' to add to end, 'prepend' to add to beginning" ) frontmatter: Optional[Dict[str, Any]] = Field( default=None, description="Optional frontmatter metadata to add/update (e.g., {'tags': ['zettelkasten'], 'created': '2024-11-06'})" ) class AppendContentInput(BaseModel): """Input for appending content to files.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file to append to", min_length=1, max_length=500 ) content: str = Field( description="Content to append to the file", min_length=1, max_length=50000 ) class PatchOperation(str, Enum): """Operations for patching content.""" APPEND = "append" PREPEND = "prepend" REPLACE = "replace" class TargetType(str, Enum): """Types of targets for patching.""" HEADING = "heading" BLOCK = "block" FRONTMATTER = "frontmatter" class PatchContentInput(BaseModel): """Input for patching content relative to headings/blocks/frontmatter.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file to patch", min_length=1, max_length=500 ) target_type: TargetType = Field( description="Type of target: 'heading' for markdown headers, 'block' for block references, 'frontmatter' for YAML metadata" ) target: str = Field( description="Target identifier: heading path (e.g., 'Section/Subsection'), block reference (e.g., '^block-id'), or frontmatter field name", min_length=1, max_length=200 ) operation: PatchOperation = Field( description="Operation: 'append' to add after target, 'prepend' to add before target, 'replace' to overwrite target" ) content: str = Field( description="Content to insert", min_length=1, max_length=50000 ) class DeleteFileInput(BaseModel): """Input for deleting files.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file or directory to delete", min_length=1, max_length=500 ) confirm: bool = Field( description="Must be set to true to confirm deletion", default=False ) class GetFrontmatterInput(BaseModel): """Input for getting frontmatter.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file", min_length=1, max_length=500 ) class UpdateFrontmatterInput(BaseModel): """Input for updating frontmatter.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the file", min_length=1, max_length=500 ) updates: Dict[str, Any] = Field( description="Frontmatter fields to update or add (e.g., {'tags': ['new-tag'], 'status': 'published'})" ) class TagAction(str, Enum): """Actions for tag management.""" ADD = "add" REMOVE = "remove" LIST = "list" class ManageTagsInput(BaseModel): """Input for managing tags.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepath: str = Field( description="Path to the note", min_length=1, max_length=500 ) action: TagAction = Field( description="Action: 'add' to add tags, 'remove' to delete tags, 'list' to show current tags" ) tags: Optional[List[str]] = Field( default=None, description="Tags to add or remove (not needed for 'list' action)", max_items=50 ) class GetNotesInfoInput(BaseModel): """Input for getting metadata about notes.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') filepaths: List[str] = Field( description="List of file paths to get info about", min_items=1, max_items=50 ) class PeriodType(str, Enum): """Types of periodic notes.""" DAILY = "daily" WEEKLY = "weekly" MONTHLY = "monthly" QUARTERLY = "quarterly" YEARLY = "yearly" class GetPeriodicNoteInput(BaseModel): """Input for getting periodic notes.""" model_config = ConfigDict(str_strip_whitespace=True, extra='forbid') period: PeriodType = Field( description="Period type: daily, weekly, monthly, quarterly, or yearly" ) class GetRecentChangesInput(BaseModel): """Input for getting recently modified files.""" model_config = ConfigDict(extra='forbid') days: int = Field( default=90, description="Only include files modified within this many days", ge=1, le=365 ) limit: int = Field( default=10, description="Maximum number of files to return", ge=1, le=100 ) # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def truncate_response(content: str, description: str = "response") -> str: """Truncate content if it exceeds CHARACTER_LIMIT.""" if len(content) <= CHARACTER_LIMIT: return content truncated = content[:CHARACTER_LIMIT] message = f"\n\n[Response truncated at {CHARACTER_LIMIT} characters. Original {description} was {len(content)} characters. Use filters or pagination to reduce results.]" return truncated + message def format_file_list(files: List[str], directories: List[str]) -> str: """Format file and directory lists for display.""" result = [] if directories: result.append("## Directories") for d in sorted(directories): result.append(f"- 📁 {d}/") result.append("") if files: result.append("## Files") for f in sorted(files): result.append(f"- 📄 {f}") return "\n".join(result) if result else "No files or directories found." def find_heading_position(content: str, heading_path: str) -> Optional[int]: """ Find the position of a heading in markdown content. Args: content: The markdown content heading_path: Path like "Section/Subsection" Returns: Position after the heading line, or None if not found """ parts = heading_path.split("/") lines = content.split("\n") current_level = 0 current_path = [] for i, line in enumerate(lines): # Check if this is a heading heading_match = re.match(r'^(#{1,6})\s+(.+)$', line.strip()) if heading_match: level = len(heading_match.group(1)) title = heading_match.group(2).strip() # Adjust path based on heading level if level <= current_level: current_path = current_path[:level-1] current_path.append(title) current_level = level # Check if we found our target if current_path == parts: # Return position at end of this line return sum(len(l) + 1 for l in lines[:i+1]) return None def find_block_position(content: str, block_ref: str) -> Optional[int]: """ Find the position of a block reference. Args: content: The markdown content block_ref: Block reference like "^block-id" Returns: Position after the block, or None if not found """ # Remove ^ prefix if present block_id = block_ref.lstrip("^") # Look for the block reference pattern = rf'\^{re.escape(block_id)}\b' match = re.search(pattern, content) if match: return match.end() return None # ============================================================================= # MCP TOOLS # ============================================================================= @mcp.tool( name="obsidian_list_files_in_vault", annotations={ "title": "List Files in Vault Root", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def list_files_in_vault() -> str: """List all files and directories in the vault root. This tool shows the top-level structure of your Obsidian vault, helping you understand the organization and locate folders for Zettelkasten notes. Returns: str: Formatted list of directories and files in the vault root Example: Returns a markdown-formatted list showing all top-level folders and files. """ try: result = await obsidian_client.get("/vault/") files = result.get("files", []) directories = result.get("directories", []) return format_file_list(files, directories) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "success": False }, indent=2) @mcp.tool( name="obsidian_list_files_in_dir", annotations={ "title": "List Files in Directory", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def list_files_in_dir(params: ListFilesInput) -> str: """List files and directories in a specific vault directory. Use this tool to explore the contents of a specific folder, such as your Zettelkasten directory or any other organized section of your vault. Args: params (ListFilesInput): Contains: - dirpath (str): Relative path to directory (empty for root) Returns: str: Formatted list of directories and files in the specified path Example: For dirpath="Zettelkasten", lists all notes in your Zettelkasten folder. """ try: endpoint = f"/vault/{params.dirpath}" if params.dirpath else "/vault/" result = await obsidian_client.get(endpoint) files = result.get("files", []) directories = result.get("directories", []) return format_file_list(files, directories) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "success": False }, indent=2) @mcp.tool( name="obsidian_get_file_contents", annotations={ "title": "Get File Contents", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_file_contents(params: GetFileInput) -> str: """Read the complete contents of a single file from the vault. Use this to read existing Zettelkasten notes, understand their structure, and find connections for creating new atomic notes. Args: params (GetFileInput): Contains: - filepath (str): Path to file relative to vault root Returns: str: File contents including frontmatter and body Example: For filepath="Zettelkasten/202411061234.md", returns the full note content. """ try: content = await obsidian_client.read_file(params.filepath) # Add filepath header for context output = f"# File: {params.filepath}\n\n{content}" return truncate_response(output, f"file {params.filepath}") except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_batch_get_file_contents", annotations={ "title": "Batch Get File Contents", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def batch_get_files(params: BatchGetFilesInput) -> str: """Read multiple files at once, concatenated with headers. Efficient way to read several related Zettelkasten notes together to understand connections and context before creating new atomic notes. Args: params (BatchGetFilesInput): Contains: - filepaths (List[str]): List of file paths to read (max 20) Returns: str: All file contents concatenated with clear separators Example: Reads multiple related notes to understand a concept network. """ results = [] for filepath in params.filepaths: try: content = await obsidian_client.read_file(filepath) results.append(f"# File: {filepath}\n\n{content}\n\n{'='*80}\n") except ObsidianAPIError as e: results.append(f"# File: {filepath}\n\nError: {str(e)}\n\n{'='*80}\n") output = "\n".join(results) return truncate_response(output, "batch file read") @mcp.tool( name="obsidian_simple_search", annotations={ "title": "Simple Text Search", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def simple_search(params: SearchInput) -> str: """Search for text across all vault files with context. Essential for Zettelkasten workflow: find existing notes related to concepts before creating new atomic notes. Helps maintain atomic principle and discover connections between ideas. Args: params (SearchInput): Contains: - query (str): Text to search for - context_length (int): Characters of context around matches (default 100) Returns: str: Matching files with highlighted context showing where terms appear Example: Search for "systems thinking" to find related notes before creating a new one. """ try: result = await obsidian_client.post( "/search/simple/", {"query": params.query, "contextLength": params.context_length} ) results = result.get("results", []) if not results: return f"No results found for query: '{params.query}'" output = [f"# Search Results for: '{params.query}'\n"] output.append(f"Found {len(results)} matching files\n") for item in results: filepath = item.get("filename", "unknown") matches = item.get("matches", []) output.append(f"\n## 📄 {filepath}") output.append(f"Matches: {len(matches)}\n") for i, match in enumerate(matches[:5], 1): # Limit to 5 matches per file context = match.get("match", "") output.append(f"**Match {i}:**") output.append(f"```\n{context}\n```\n") response = "\n".join(output) return truncate_response(response, "search results") except ObsidianAPIError as e: return json.dumps({ "error": str(e), "query": params.query, "success": False }, indent=2) @mcp.tool( name="obsidian_complex_search", annotations={ "title": "Complex JsonLogic Search", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def complex_search(params: ComplexSearchInput) -> str: """Execute complex searches using JsonLogic queries. Advanced search for power users: find notes by patterns, tags, or complex criteria. Useful for organizing and discovering notes in a mature Zettelkasten system. Args: params (ComplexSearchInput): Contains: - query (Dict): JsonLogic query (e.g., {'glob': ['*.md', {'var': 'path'}]}) Returns: str: List of matching files with their properties Example: Find all markdown files: {'glob': ['*.md', {'var': 'path'}]} Find files with specific tags in frontmatter: more complex JsonLogic expressions """ try: result = await obsidian_client.post("/search/", {"query": params.query}) matches = result.get("matches", []) if not matches: return "No files matched the search criteria." output = [f"# Complex Search Results\n"] output.append(f"Found {len(matches)} matching files\n") for filepath in matches: output.append(f"- 📄 {filepath}") response = "\n".join(output) return truncate_response(response, "complex search results") except ObsidianAPIError as e: return json.dumps({ "error": str(e), "success": False }, indent=2) @mcp.tool( name="obsidian_write_note", annotations={ "title": "Write Note", "readOnlyHint": False, "destructiveHint": False, # CREATE mode is safe "idempotentHint": False, "openWorldHint": False } ) async def write_note(params: WriteNoteInput) -> str: """Create or modify notes with content and optional frontmatter. Primary tool for Zettelkasten note creation. Supports multiple modes: - CREATE: Only creates new notes (safe, won't overwrite) - OVERWRITE: Replaces entire file - APPEND: Adds content to end - PREPEND: Adds content to beginning Args: params (WriteNoteInput): Contains: - filepath (str): Where to write the note - content (str): Note content - mode (WriteMode): create/overwrite/append/prepend (default: create) - frontmatter (Dict, optional): YAML frontmatter metadata Returns: str: Success message with note location Example: Create atomic note: filepath="Zettelkasten/202411061234 Systems Thinking.md", content="# Systems Thinking...", frontmatter={'tags': ['zettelkasten', 'concepts']} """ try: final_content = params.content # Handle frontmatter if provided if params.frontmatter: final_content = obsidian_client.serialize_with_frontmatter( params.frontmatter, params.content ) # Handle different write modes if params.mode == WriteMode.CREATE: # Check if file exists try: await obsidian_client.read_file(params.filepath) return json.dumps({ "error": f"File already exists: {params.filepath}. Use mode='overwrite' to replace it.", "filepath": params.filepath, "success": False }, indent=2) except ObsidianAPIError: # File doesn't exist, proceed with creation pass elif params.mode == WriteMode.APPEND: try: existing = await obsidian_client.read_file(params.filepath) final_content = existing + "\n\n" + params.content if params.frontmatter: # Preserve existing frontmatter and merge fm, body = obsidian_client.parse_frontmatter(existing) fm.update(params.frontmatter) final_content = obsidian_client.serialize_with_frontmatter(fm, body + "\n\n" + params.content) except ObsidianAPIError: # File doesn't exist, just write content pass elif params.mode == WriteMode.PREPEND: try: existing = await obsidian_client.read_file(params.filepath) if params.frontmatter: fm, body = obsidian_client.parse_frontmatter(existing) fm.update(params.frontmatter) final_content = obsidian_client.serialize_with_frontmatter(fm, params.content + "\n\n" + body) else: final_content = params.content + "\n\n" + existing except ObsidianAPIError: # File doesn't exist, just write content pass # Write the file await obsidian_client.write_file(params.filepath, final_content) return json.dumps({ "success": True, "message": f"Note written successfully in {params.mode.value} mode", "filepath": params.filepath, "content_length": len(final_content), "has_frontmatter": params.frontmatter is not None }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_append_content", annotations={ "title": "Append Content to File", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def append_content(params: AppendContentInput) -> str: """Append content to the end of an existing file or create new file. Quick way to add content to notes. Useful for adding new thoughts, references, or connections to existing Zettelkasten notes. Args: params (AppendContentInput): Contains: - filepath (str): Path to file - content (str): Content to append Returns: str: Success message with updated file info Example: Add a new related concept to an existing note. """ try: result = await obsidian_client.append_to_file(params.filepath, params.content) return json.dumps({ "success": True, "message": "Content appended successfully", "filepath": params.filepath }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_patch_content", annotations={ "title": "Patch Content at Specific Location", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def patch_content(params: PatchContentInput) -> str: """Insert content relative to headings, block references, or frontmatter. Precise content insertion for structured notes. Insert content at specific locations within notes to maintain organization and structure. Args: params (PatchContentInput): Contains: - filepath (str): Path to file - target_type (TargetType): 'heading', 'block', or 'frontmatter' - target (str): Heading path, block reference, or frontmatter field - operation (PatchOperation): 'append', 'prepend', or 'replace' - content (str): Content to insert Returns: str: Success message with patch details Example: Add content after "## Related Concepts" heading in a Zettelkasten note. """ try: # Read current content current_content = await obsidian_client.read_file(params.filepath) if params.target_type == TargetType.FRONTMATTER: # Update frontmatter field fm, body = obsidian_client.parse_frontmatter(current_content) if params.operation == PatchOperation.REPLACE: fm[params.target] = params.content elif params.operation == PatchOperation.APPEND: current_value = fm.get(params.target, "") fm[params.target] = str(current_value) + "\n" + params.content if current_value else params.content elif params.operation == PatchOperation.PREPEND: current_value = fm.get(params.target, "") fm[params.target] = params.content + "\n" + str(current_value) if current_value else params.content new_content = obsidian_client.serialize_with_frontmatter(fm, body) elif params.target_type == TargetType.HEADING: # Find heading position position = find_heading_position(current_content, params.target) if position is None: return json.dumps({ "error": f"Heading not found: {params.target}", "filepath": params.filepath, "success": False }, indent=2) if params.operation == PatchOperation.APPEND: new_content = current_content[:position] + "\n" + params.content + current_content[position:] elif params.operation == PatchOperation.PREPEND: # Find start of heading line lines_before = current_content[:position].split("\n") heading_line_start = position - len(lines_before[-1]) - 1 new_content = current_content[:heading_line_start] + params.content + "\n" + current_content[heading_line_start:] else: # REPLACE # Find next heading or end of file lines = current_content[position:].split("\n") next_heading_idx = None for i, line in enumerate(lines): if re.match(r'^#{1,6}\s+', line.strip()): next_heading_idx = i break if next_heading_idx: section_end = position + sum(len(l) + 1 for l in lines[:next_heading_idx]) new_content = current_content[:position] + "\n" + params.content + "\n" + current_content[section_end:] else: new_content = current_content[:position] + "\n" + params.content elif params.target_type == TargetType.BLOCK: # Find block reference position = find_block_position(current_content, params.target) if position is None: return json.dumps({ "error": f"Block reference not found: {params.target}", "filepath": params.filepath, "success": False }, indent=2) if params.operation == PatchOperation.APPEND: new_content = current_content[:position] + "\n" + params.content + current_content[position:] elif params.operation == PatchOperation.PREPEND: # Find start of block line lines_before = current_content[:position].split("\n") block_line_start = position - len(lines_before[-1]) - 1 new_content = current_content[:block_line_start] + params.content + "\n" + current_content[block_line_start:] else: # REPLACE - replace the entire line containing the block reference lines_before = current_content[:position].split("\n") block_line_start = position - len(lines_before[-1]) - 1 line_end = current_content.find("\n", position) if line_end == -1: line_end = len(current_content) new_content = current_content[:block_line_start] + params.content + current_content[line_end:] # Write updated content await obsidian_client.write_file(params.filepath, new_content) return json.dumps({ "success": True, "message": f"Content patched successfully using {params.operation.value} on {params.target_type.value}", "filepath": params.filepath, "target": params.target }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_delete_file", annotations={ "title": "Delete File or Directory", "readOnlyHint": False, "destructiveHint": True, "idempotentHint": True, "openWorldHint": False } ) async def delete_file(params: DeleteFileInput) -> str: """Delete a file or directory from the vault. DESTRUCTIVE OPERATION. Requires explicit confirmation. Use carefully when removing outdated or duplicate notes from your Zettelkasten. Args: params (DeleteFileInput): Contains: - filepath (str): Path to file/directory to delete - confirm (bool): Must be True to proceed with deletion Returns: str: Success or error message Example: Delete a duplicate note after merging content into another note. """ if not params.confirm: return json.dumps({ "error": "Deletion requires explicit confirmation. Set 'confirm' to true.", "filepath": params.filepath, "success": False }, indent=2) try: await obsidian_client.delete(f"/vault/{params.filepath}") return json.dumps({ "success": True, "message": f"Successfully deleted: {params.filepath}", "filepath": params.filepath }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_get_frontmatter", annotations={ "title": "Get Note Frontmatter", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_frontmatter(params: GetFrontmatterInput) -> str: """Extract YAML frontmatter metadata from a note. Read metadata like tags, creation date, and other properties from Zettelkasten notes without loading the full content. Args: params (GetFrontmatterInput): Contains: - filepath (str): Path to file Returns: str: JSON object containing frontmatter fields Example: Get tags and metadata from a note to understand its classification. """ try: frontmatter = await obsidian_client.get_file_frontmatter(params.filepath) return json.dumps({ "success": True, "filepath": params.filepath, "frontmatter": frontmatter }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_update_frontmatter", annotations={ "title": "Update Note Frontmatter", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def update_frontmatter(params: UpdateFrontmatterInput) -> str: """Update YAML frontmatter metadata without modifying note content. Add or update metadata fields like tags, status, or custom properties in Zettelkasten notes while preserving all content. Args: params (UpdateFrontmatterInput): Contains: - filepath (str): Path to file - updates (Dict): Frontmatter fields to add/update Returns: str: Success message with updated frontmatter Example: Add tags to existing note: updates={'tags': ['zettelkasten', 'systems-thinking']} """ try: result = await obsidian_client.update_file_frontmatter( params.filepath, params.updates ) return json.dumps({ "success": True, "message": "Frontmatter updated successfully", "filepath": params.filepath, "frontmatter": result["frontmatter"] }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_manage_tags", annotations={ "title": "Manage Note Tags", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def manage_tags(params: ManageTagsInput) -> str: """Add, remove, or list tags in note frontmatter. Manage tags for organizing Zettelkasten notes. Essential for maintaining topic clusters and enabling efficient retrieval of related atomic notes. Args: params (ManageTagsInput): Contains: - filepath (str): Path to note - action (TagAction): 'add', 'remove', or 'list' - tags (List[str], optional): Tags to add/remove (not needed for 'list') Returns: str: Current tags after operation Example: Add tags: action='add', tags=['systems-thinking', 'mental-models'] Remove tag: action='remove', tags=['draft'] List tags: action='list' """ try: frontmatter = await obsidian_client.get_file_frontmatter(params.filepath) current_tags = frontmatter.get("tags", []) # Ensure tags is a list if isinstance(current_tags, str): current_tags = [current_tags] elif not isinstance(current_tags, list): current_tags = [] if params.action == TagAction.LIST: return json.dumps({ "success": True, "filepath": params.filepath, "tags": current_tags }, indent=2) elif params.action == TagAction.ADD: if not params.tags: return json.dumps({ "error": "Tags list required for 'add' action", "success": False }, indent=2) # Add new tags, avoiding duplicates updated_tags = list(set(current_tags + params.tags)) await obsidian_client.update_file_frontmatter( params.filepath, {"tags": updated_tags} ) return json.dumps({ "success": True, "message": f"Added {len(params.tags)} tag(s)", "filepath": params.filepath, "tags": updated_tags, "added": params.tags }, indent=2) elif params.action == TagAction.REMOVE: if not params.tags: return json.dumps({ "error": "Tags list required for 'remove' action", "success": False }, indent=2) # Remove specified tags updated_tags = [t for t in current_tags if t not in params.tags] await obsidian_client.update_file_frontmatter( params.filepath, {"tags": updated_tags} ) return json.dumps({ "success": True, "message": f"Removed {len(params.tags)} tag(s)", "filepath": params.filepath, "tags": updated_tags, "removed": params.tags }, indent=2) except ObsidianAPIError as e: return json.dumps({ "error": str(e), "filepath": params.filepath, "success": False }, indent=2) @mcp.tool( name="obsidian_get_notes_info", annotations={ "title": "Get Notes Metadata", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_notes_info(params: GetNotesInfoInput) -> str: """Get metadata for multiple notes including tags, dates, and sizes. Efficient way to get overview information about several Zettelkasten notes without reading full content. Useful for analyzing note collections. Args: params (GetNotesInfoInput): Contains: - filepaths (List[str]): Paths to files (max 50) Returns: str: JSON array with metadata for each file Example: Get info about all notes in a topic cluster to understand their relationships. """ results = [] for filepath in params.filepaths: try: # Get frontmatter frontmatter = await obsidian_client.get_file_frontmatter(filepath) # Get file content for size content = await obsidian_client.read_file(filepath) results.append({ "filepath": filepath, "success": True, "tags": frontmatter.get("tags", []), "created": frontmatter.get("created", None), "modified": frontmatter.get("modified", None), "size_chars": len(content), "has_frontmatter": bool(frontmatter) }) except ObsidianAPIError as e: results.append({ "filepath": filepath, "success": False, "error": str(e) }) return json.dumps(results, indent=2) # ============================================================================= # SERVER ENTRY POINT # ============================================================================= def main(): """Run the MCP server.""" mcp.run() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Shepherd-Creative/obsidian-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server