server.py•41.6 kB
"""
Custom Obsidian MCP Server
MCP server providing tools for Obsidian vault operations with a focus on
Zettelkasten note creation. Connects to Obsidian Local REST API plugin.
"""
import json
import re
from typing import Optional, List, Dict, Any
from enum import Enum
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field, field_validator, ConfigDict
from .obsidian_client import ObsidianClient, ObsidianAPIError
# Constants
CHARACTER_LIMIT = 25000 # Maximum response size in characters
# Shared Obsidian client instance
obsidian_client: Optional[ObsidianClient] = None
@asynccontextmanager
async def app_lifespan():
"""Manage ObsidianClient lifecycle."""
global obsidian_client
obsidian_client = ObsidianClient()
try:
yield {"obsidian_client": obsidian_client}
finally:
if obsidian_client:
await obsidian_client.close()
# Initialize FastMCP server
mcp = FastMCP("obsidian_mcp", lifespan=app_lifespan)
# =============================================================================
# PYDANTIC INPUT MODELS
# =============================================================================
class ListFilesInput(BaseModel):
"""Input for listing files in a directory."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
dirpath: Optional[str] = Field(
default="",
description="Relative directory path to list (empty string for vault root)",
max_length=500
)
class GetFileInput(BaseModel):
"""Input for getting file contents."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file relative to vault root (e.g., 'Notes/zettelkasten/202411061234.md')",
min_length=1,
max_length=500
)
class BatchGetFilesInput(BaseModel):
"""Input for batch reading multiple files."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepaths: List[str] = Field(
description="List of file paths to read",
min_items=1,
max_items=20
)
class SearchInput(BaseModel):
"""Input for searching vault content."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
query: str = Field(
description="Text to search for in vault files",
min_length=1,
max_length=200
)
context_length: int = Field(
default=100,
description="Number of characters to show around each match for context",
ge=0,
le=500
)
class ComplexSearchInput(BaseModel):
"""Input for complex JsonLogic-based searches."""
model_config = ConfigDict(extra='forbid')
query: Dict[str, Any] = Field(
description="JsonLogic query object (e.g., {'glob': ['*.md', {'var': 'path'}]} matches all markdown files)"
)
class WriteMode(str, Enum):
"""Mode for writing content to files."""
CREATE = "create"
OVERWRITE = "overwrite"
APPEND = "append"
PREPEND = "prepend"
class WriteNoteInput(BaseModel):
"""Input for writing/creating notes."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path where the note should be written (e.g., 'Zettelkasten/202411061234 - Note Title.md')",
min_length=1,
max_length=500
)
content: str = Field(
description="The content to write to the note",
min_length=0,
max_length=50000
)
mode: WriteMode = Field(
default=WriteMode.CREATE,
description="Write mode: 'create' for new files only, 'overwrite' to replace, 'append' to add to end, 'prepend' to add to beginning"
)
frontmatter: Optional[Dict[str, Any]] = Field(
default=None,
description="Optional frontmatter metadata to add/update (e.g., {'tags': ['zettelkasten'], 'created': '2024-11-06'})"
)
class AppendContentInput(BaseModel):
"""Input for appending content to files."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file to append to",
min_length=1,
max_length=500
)
content: str = Field(
description="Content to append to the file",
min_length=1,
max_length=50000
)
class PatchOperation(str, Enum):
"""Operations for patching content."""
APPEND = "append"
PREPEND = "prepend"
REPLACE = "replace"
class TargetType(str, Enum):
"""Types of targets for patching."""
HEADING = "heading"
BLOCK = "block"
FRONTMATTER = "frontmatter"
class PatchContentInput(BaseModel):
"""Input for patching content relative to headings/blocks/frontmatter."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file to patch",
min_length=1,
max_length=500
)
target_type: TargetType = Field(
description="Type of target: 'heading' for markdown headers, 'block' for block references, 'frontmatter' for YAML metadata"
)
target: str = Field(
description="Target identifier: heading path (e.g., 'Section/Subsection'), block reference (e.g., '^block-id'), or frontmatter field name",
min_length=1,
max_length=200
)
operation: PatchOperation = Field(
description="Operation: 'append' to add after target, 'prepend' to add before target, 'replace' to overwrite target"
)
content: str = Field(
description="Content to insert",
min_length=1,
max_length=50000
)
class DeleteFileInput(BaseModel):
"""Input for deleting files."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file or directory to delete",
min_length=1,
max_length=500
)
confirm: bool = Field(
description="Must be set to true to confirm deletion",
default=False
)
class GetFrontmatterInput(BaseModel):
"""Input for getting frontmatter."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file",
min_length=1,
max_length=500
)
class UpdateFrontmatterInput(BaseModel):
"""Input for updating frontmatter."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the file",
min_length=1,
max_length=500
)
updates: Dict[str, Any] = Field(
description="Frontmatter fields to update or add (e.g., {'tags': ['new-tag'], 'status': 'published'})"
)
class TagAction(str, Enum):
"""Actions for tag management."""
ADD = "add"
REMOVE = "remove"
LIST = "list"
class ManageTagsInput(BaseModel):
"""Input for managing tags."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepath: str = Field(
description="Path to the note",
min_length=1,
max_length=500
)
action: TagAction = Field(
description="Action: 'add' to add tags, 'remove' to delete tags, 'list' to show current tags"
)
tags: Optional[List[str]] = Field(
default=None,
description="Tags to add or remove (not needed for 'list' action)",
max_items=50
)
class GetNotesInfoInput(BaseModel):
"""Input for getting metadata about notes."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
filepaths: List[str] = Field(
description="List of file paths to get info about",
min_items=1,
max_items=50
)
class PeriodType(str, Enum):
"""Types of periodic notes."""
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
QUARTERLY = "quarterly"
YEARLY = "yearly"
class GetPeriodicNoteInput(BaseModel):
"""Input for getting periodic notes."""
model_config = ConfigDict(str_strip_whitespace=True, extra='forbid')
period: PeriodType = Field(
description="Period type: daily, weekly, monthly, quarterly, or yearly"
)
class GetRecentChangesInput(BaseModel):
"""Input for getting recently modified files."""
model_config = ConfigDict(extra='forbid')
days: int = Field(
default=90,
description="Only include files modified within this many days",
ge=1,
le=365
)
limit: int = Field(
default=10,
description="Maximum number of files to return",
ge=1,
le=100
)
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def truncate_response(content: str, description: str = "response") -> str:
"""Truncate content if it exceeds CHARACTER_LIMIT."""
if len(content) <= CHARACTER_LIMIT:
return content
truncated = content[:CHARACTER_LIMIT]
message = f"\n\n[Response truncated at {CHARACTER_LIMIT} characters. Original {description} was {len(content)} characters. Use filters or pagination to reduce results.]"
return truncated + message
def format_file_list(files: List[str], directories: List[str]) -> str:
"""Format file and directory lists for display."""
result = []
if directories:
result.append("## Directories")
for d in sorted(directories):
result.append(f"- 📁 {d}/")
result.append("")
if files:
result.append("## Files")
for f in sorted(files):
result.append(f"- 📄 {f}")
return "\n".join(result) if result else "No files or directories found."
def find_heading_position(content: str, heading_path: str) -> Optional[int]:
"""
Find the position of a heading in markdown content.
Args:
content: The markdown content
heading_path: Path like "Section/Subsection"
Returns:
Position after the heading line, or None if not found
"""
parts = heading_path.split("/")
lines = content.split("\n")
current_level = 0
current_path = []
for i, line in enumerate(lines):
# Check if this is a heading
heading_match = re.match(r'^(#{1,6})\s+(.+)$', line.strip())
if heading_match:
level = len(heading_match.group(1))
title = heading_match.group(2).strip()
# Adjust path based on heading level
if level <= current_level:
current_path = current_path[:level-1]
current_path.append(title)
current_level = level
# Check if we found our target
if current_path == parts:
# Return position at end of this line
return sum(len(l) + 1 for l in lines[:i+1])
return None
def find_block_position(content: str, block_ref: str) -> Optional[int]:
"""
Find the position of a block reference.
Args:
content: The markdown content
block_ref: Block reference like "^block-id"
Returns:
Position after the block, or None if not found
"""
# Remove ^ prefix if present
block_id = block_ref.lstrip("^")
# Look for the block reference
pattern = rf'\^{re.escape(block_id)}\b'
match = re.search(pattern, content)
if match:
return match.end()
return None
# =============================================================================
# MCP TOOLS
# =============================================================================
@mcp.tool(
name="obsidian_list_files_in_vault",
annotations={
"title": "List Files in Vault Root",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def list_files_in_vault() -> str:
"""List all files and directories in the vault root.
This tool shows the top-level structure of your Obsidian vault, helping you
understand the organization and locate folders for Zettelkasten notes.
Returns:
str: Formatted list of directories and files in the vault root
Example:
Returns a markdown-formatted list showing all top-level folders and files.
"""
try:
result = await obsidian_client.get("/vault/")
files = result.get("files", [])
directories = result.get("directories", [])
return format_file_list(files, directories)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_list_files_in_dir",
annotations={
"title": "List Files in Directory",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def list_files_in_dir(params: ListFilesInput) -> str:
"""List files and directories in a specific vault directory.
Use this tool to explore the contents of a specific folder, such as your
Zettelkasten directory or any other organized section of your vault.
Args:
params (ListFilesInput): Contains:
- dirpath (str): Relative path to directory (empty for root)
Returns:
str: Formatted list of directories and files in the specified path
Example:
For dirpath="Zettelkasten", lists all notes in your Zettelkasten folder.
"""
try:
endpoint = f"/vault/{params.dirpath}" if params.dirpath else "/vault/"
result = await obsidian_client.get(endpoint)
files = result.get("files", [])
directories = result.get("directories", [])
return format_file_list(files, directories)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_get_file_contents",
annotations={
"title": "Get File Contents",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_file_contents(params: GetFileInput) -> str:
"""Read the complete contents of a single file from the vault.
Use this to read existing Zettelkasten notes, understand their structure,
and find connections for creating new atomic notes.
Args:
params (GetFileInput): Contains:
- filepath (str): Path to file relative to vault root
Returns:
str: File contents including frontmatter and body
Example:
For filepath="Zettelkasten/202411061234.md", returns the full note content.
"""
try:
content = await obsidian_client.read_file(params.filepath)
# Add filepath header for context
output = f"# File: {params.filepath}\n\n{content}"
return truncate_response(output, f"file {params.filepath}")
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_batch_get_file_contents",
annotations={
"title": "Batch Get File Contents",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def batch_get_files(params: BatchGetFilesInput) -> str:
"""Read multiple files at once, concatenated with headers.
Efficient way to read several related Zettelkasten notes together to understand
connections and context before creating new atomic notes.
Args:
params (BatchGetFilesInput): Contains:
- filepaths (List[str]): List of file paths to read (max 20)
Returns:
str: All file contents concatenated with clear separators
Example:
Reads multiple related notes to understand a concept network.
"""
results = []
for filepath in params.filepaths:
try:
content = await obsidian_client.read_file(filepath)
results.append(f"# File: {filepath}\n\n{content}\n\n{'='*80}\n")
except ObsidianAPIError as e:
results.append(f"# File: {filepath}\n\nError: {str(e)}\n\n{'='*80}\n")
output = "\n".join(results)
return truncate_response(output, "batch file read")
@mcp.tool(
name="obsidian_simple_search",
annotations={
"title": "Simple Text Search",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def simple_search(params: SearchInput) -> str:
"""Search for text across all vault files with context.
Essential for Zettelkasten workflow: find existing notes related to concepts
before creating new atomic notes. Helps maintain atomic principle and discover
connections between ideas.
Args:
params (SearchInput): Contains:
- query (str): Text to search for
- context_length (int): Characters of context around matches (default 100)
Returns:
str: Matching files with highlighted context showing where terms appear
Example:
Search for "systems thinking" to find related notes before creating a new one.
"""
try:
result = await obsidian_client.post(
"/search/simple/",
{"query": params.query, "contextLength": params.context_length}
)
results = result.get("results", [])
if not results:
return f"No results found for query: '{params.query}'"
output = [f"# Search Results for: '{params.query}'\n"]
output.append(f"Found {len(results)} matching files\n")
for item in results:
filepath = item.get("filename", "unknown")
matches = item.get("matches", [])
output.append(f"\n## 📄 {filepath}")
output.append(f"Matches: {len(matches)}\n")
for i, match in enumerate(matches[:5], 1): # Limit to 5 matches per file
context = match.get("match", "")
output.append(f"**Match {i}:**")
output.append(f"```\n{context}\n```\n")
response = "\n".join(output)
return truncate_response(response, "search results")
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"query": params.query,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_complex_search",
annotations={
"title": "Complex JsonLogic Search",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def complex_search(params: ComplexSearchInput) -> str:
"""Execute complex searches using JsonLogic queries.
Advanced search for power users: find notes by patterns, tags, or complex criteria.
Useful for organizing and discovering notes in a mature Zettelkasten system.
Args:
params (ComplexSearchInput): Contains:
- query (Dict): JsonLogic query (e.g., {'glob': ['*.md', {'var': 'path'}]})
Returns:
str: List of matching files with their properties
Example:
Find all markdown files: {'glob': ['*.md', {'var': 'path'}]}
Find files with specific tags in frontmatter: more complex JsonLogic expressions
"""
try:
result = await obsidian_client.post("/search/", {"query": params.query})
matches = result.get("matches", [])
if not matches:
return "No files matched the search criteria."
output = [f"# Complex Search Results\n"]
output.append(f"Found {len(matches)} matching files\n")
for filepath in matches:
output.append(f"- 📄 {filepath}")
response = "\n".join(output)
return truncate_response(response, "complex search results")
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_write_note",
annotations={
"title": "Write Note",
"readOnlyHint": False,
"destructiveHint": False, # CREATE mode is safe
"idempotentHint": False,
"openWorldHint": False
}
)
async def write_note(params: WriteNoteInput) -> str:
"""Create or modify notes with content and optional frontmatter.
Primary tool for Zettelkasten note creation. Supports multiple modes:
- CREATE: Only creates new notes (safe, won't overwrite)
- OVERWRITE: Replaces entire file
- APPEND: Adds content to end
- PREPEND: Adds content to beginning
Args:
params (WriteNoteInput): Contains:
- filepath (str): Where to write the note
- content (str): Note content
- mode (WriteMode): create/overwrite/append/prepend (default: create)
- frontmatter (Dict, optional): YAML frontmatter metadata
Returns:
str: Success message with note location
Example:
Create atomic note: filepath="Zettelkasten/202411061234 Systems Thinking.md",
content="# Systems Thinking...", frontmatter={'tags': ['zettelkasten', 'concepts']}
"""
try:
final_content = params.content
# Handle frontmatter if provided
if params.frontmatter:
final_content = obsidian_client.serialize_with_frontmatter(
params.frontmatter,
params.content
)
# Handle different write modes
if params.mode == WriteMode.CREATE:
# Check if file exists
try:
await obsidian_client.read_file(params.filepath)
return json.dumps({
"error": f"File already exists: {params.filepath}. Use mode='overwrite' to replace it.",
"filepath": params.filepath,
"success": False
}, indent=2)
except ObsidianAPIError:
# File doesn't exist, proceed with creation
pass
elif params.mode == WriteMode.APPEND:
try:
existing = await obsidian_client.read_file(params.filepath)
final_content = existing + "\n\n" + params.content
if params.frontmatter:
# Preserve existing frontmatter and merge
fm, body = obsidian_client.parse_frontmatter(existing)
fm.update(params.frontmatter)
final_content = obsidian_client.serialize_with_frontmatter(fm, body + "\n\n" + params.content)
except ObsidianAPIError:
# File doesn't exist, just write content
pass
elif params.mode == WriteMode.PREPEND:
try:
existing = await obsidian_client.read_file(params.filepath)
if params.frontmatter:
fm, body = obsidian_client.parse_frontmatter(existing)
fm.update(params.frontmatter)
final_content = obsidian_client.serialize_with_frontmatter(fm, params.content + "\n\n" + body)
else:
final_content = params.content + "\n\n" + existing
except ObsidianAPIError:
# File doesn't exist, just write content
pass
# Write the file
await obsidian_client.write_file(params.filepath, final_content)
return json.dumps({
"success": True,
"message": f"Note written successfully in {params.mode.value} mode",
"filepath": params.filepath,
"content_length": len(final_content),
"has_frontmatter": params.frontmatter is not None
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_append_content",
annotations={
"title": "Append Content to File",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def append_content(params: AppendContentInput) -> str:
"""Append content to the end of an existing file or create new file.
Quick way to add content to notes. Useful for adding new thoughts, references,
or connections to existing Zettelkasten notes.
Args:
params (AppendContentInput): Contains:
- filepath (str): Path to file
- content (str): Content to append
Returns:
str: Success message with updated file info
Example:
Add a new related concept to an existing note.
"""
try:
result = await obsidian_client.append_to_file(params.filepath, params.content)
return json.dumps({
"success": True,
"message": "Content appended successfully",
"filepath": params.filepath
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_patch_content",
annotations={
"title": "Patch Content at Specific Location",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def patch_content(params: PatchContentInput) -> str:
"""Insert content relative to headings, block references, or frontmatter.
Precise content insertion for structured notes. Insert content at specific
locations within notes to maintain organization and structure.
Args:
params (PatchContentInput): Contains:
- filepath (str): Path to file
- target_type (TargetType): 'heading', 'block', or 'frontmatter'
- target (str): Heading path, block reference, or frontmatter field
- operation (PatchOperation): 'append', 'prepend', or 'replace'
- content (str): Content to insert
Returns:
str: Success message with patch details
Example:
Add content after "## Related Concepts" heading in a Zettelkasten note.
"""
try:
# Read current content
current_content = await obsidian_client.read_file(params.filepath)
if params.target_type == TargetType.FRONTMATTER:
# Update frontmatter field
fm, body = obsidian_client.parse_frontmatter(current_content)
if params.operation == PatchOperation.REPLACE:
fm[params.target] = params.content
elif params.operation == PatchOperation.APPEND:
current_value = fm.get(params.target, "")
fm[params.target] = str(current_value) + "\n" + params.content if current_value else params.content
elif params.operation == PatchOperation.PREPEND:
current_value = fm.get(params.target, "")
fm[params.target] = params.content + "\n" + str(current_value) if current_value else params.content
new_content = obsidian_client.serialize_with_frontmatter(fm, body)
elif params.target_type == TargetType.HEADING:
# Find heading position
position = find_heading_position(current_content, params.target)
if position is None:
return json.dumps({
"error": f"Heading not found: {params.target}",
"filepath": params.filepath,
"success": False
}, indent=2)
if params.operation == PatchOperation.APPEND:
new_content = current_content[:position] + "\n" + params.content + current_content[position:]
elif params.operation == PatchOperation.PREPEND:
# Find start of heading line
lines_before = current_content[:position].split("\n")
heading_line_start = position - len(lines_before[-1]) - 1
new_content = current_content[:heading_line_start] + params.content + "\n" + current_content[heading_line_start:]
else: # REPLACE
# Find next heading or end of file
lines = current_content[position:].split("\n")
next_heading_idx = None
for i, line in enumerate(lines):
if re.match(r'^#{1,6}\s+', line.strip()):
next_heading_idx = i
break
if next_heading_idx:
section_end = position + sum(len(l) + 1 for l in lines[:next_heading_idx])
new_content = current_content[:position] + "\n" + params.content + "\n" + current_content[section_end:]
else:
new_content = current_content[:position] + "\n" + params.content
elif params.target_type == TargetType.BLOCK:
# Find block reference
position = find_block_position(current_content, params.target)
if position is None:
return json.dumps({
"error": f"Block reference not found: {params.target}",
"filepath": params.filepath,
"success": False
}, indent=2)
if params.operation == PatchOperation.APPEND:
new_content = current_content[:position] + "\n" + params.content + current_content[position:]
elif params.operation == PatchOperation.PREPEND:
# Find start of block line
lines_before = current_content[:position].split("\n")
block_line_start = position - len(lines_before[-1]) - 1
new_content = current_content[:block_line_start] + params.content + "\n" + current_content[block_line_start:]
else: # REPLACE - replace the entire line containing the block reference
lines_before = current_content[:position].split("\n")
block_line_start = position - len(lines_before[-1]) - 1
line_end = current_content.find("\n", position)
if line_end == -1:
line_end = len(current_content)
new_content = current_content[:block_line_start] + params.content + current_content[line_end:]
# Write updated content
await obsidian_client.write_file(params.filepath, new_content)
return json.dumps({
"success": True,
"message": f"Content patched successfully using {params.operation.value} on {params.target_type.value}",
"filepath": params.filepath,
"target": params.target
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_delete_file",
annotations={
"title": "Delete File or Directory",
"readOnlyHint": False,
"destructiveHint": True,
"idempotentHint": True,
"openWorldHint": False
}
)
async def delete_file(params: DeleteFileInput) -> str:
"""Delete a file or directory from the vault.
DESTRUCTIVE OPERATION. Requires explicit confirmation. Use carefully when
removing outdated or duplicate notes from your Zettelkasten.
Args:
params (DeleteFileInput): Contains:
- filepath (str): Path to file/directory to delete
- confirm (bool): Must be True to proceed with deletion
Returns:
str: Success or error message
Example:
Delete a duplicate note after merging content into another note.
"""
if not params.confirm:
return json.dumps({
"error": "Deletion requires explicit confirmation. Set 'confirm' to true.",
"filepath": params.filepath,
"success": False
}, indent=2)
try:
await obsidian_client.delete(f"/vault/{params.filepath}")
return json.dumps({
"success": True,
"message": f"Successfully deleted: {params.filepath}",
"filepath": params.filepath
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_get_frontmatter",
annotations={
"title": "Get Note Frontmatter",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_frontmatter(params: GetFrontmatterInput) -> str:
"""Extract YAML frontmatter metadata from a note.
Read metadata like tags, creation date, and other properties from Zettelkasten
notes without loading the full content.
Args:
params (GetFrontmatterInput): Contains:
- filepath (str): Path to file
Returns:
str: JSON object containing frontmatter fields
Example:
Get tags and metadata from a note to understand its classification.
"""
try:
frontmatter = await obsidian_client.get_file_frontmatter(params.filepath)
return json.dumps({
"success": True,
"filepath": params.filepath,
"frontmatter": frontmatter
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_update_frontmatter",
annotations={
"title": "Update Note Frontmatter",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def update_frontmatter(params: UpdateFrontmatterInput) -> str:
"""Update YAML frontmatter metadata without modifying note content.
Add or update metadata fields like tags, status, or custom properties in
Zettelkasten notes while preserving all content.
Args:
params (UpdateFrontmatterInput): Contains:
- filepath (str): Path to file
- updates (Dict): Frontmatter fields to add/update
Returns:
str: Success message with updated frontmatter
Example:
Add tags to existing note: updates={'tags': ['zettelkasten', 'systems-thinking']}
"""
try:
result = await obsidian_client.update_file_frontmatter(
params.filepath,
params.updates
)
return json.dumps({
"success": True,
"message": "Frontmatter updated successfully",
"filepath": params.filepath,
"frontmatter": result["frontmatter"]
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_manage_tags",
annotations={
"title": "Manage Note Tags",
"readOnlyHint": False,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": False
}
)
async def manage_tags(params: ManageTagsInput) -> str:
"""Add, remove, or list tags in note frontmatter.
Manage tags for organizing Zettelkasten notes. Essential for maintaining
topic clusters and enabling efficient retrieval of related atomic notes.
Args:
params (ManageTagsInput): Contains:
- filepath (str): Path to note
- action (TagAction): 'add', 'remove', or 'list'
- tags (List[str], optional): Tags to add/remove (not needed for 'list')
Returns:
str: Current tags after operation
Example:
Add tags: action='add', tags=['systems-thinking', 'mental-models']
Remove tag: action='remove', tags=['draft']
List tags: action='list'
"""
try:
frontmatter = await obsidian_client.get_file_frontmatter(params.filepath)
current_tags = frontmatter.get("tags", [])
# Ensure tags is a list
if isinstance(current_tags, str):
current_tags = [current_tags]
elif not isinstance(current_tags, list):
current_tags = []
if params.action == TagAction.LIST:
return json.dumps({
"success": True,
"filepath": params.filepath,
"tags": current_tags
}, indent=2)
elif params.action == TagAction.ADD:
if not params.tags:
return json.dumps({
"error": "Tags list required for 'add' action",
"success": False
}, indent=2)
# Add new tags, avoiding duplicates
updated_tags = list(set(current_tags + params.tags))
await obsidian_client.update_file_frontmatter(
params.filepath,
{"tags": updated_tags}
)
return json.dumps({
"success": True,
"message": f"Added {len(params.tags)} tag(s)",
"filepath": params.filepath,
"tags": updated_tags,
"added": params.tags
}, indent=2)
elif params.action == TagAction.REMOVE:
if not params.tags:
return json.dumps({
"error": "Tags list required for 'remove' action",
"success": False
}, indent=2)
# Remove specified tags
updated_tags = [t for t in current_tags if t not in params.tags]
await obsidian_client.update_file_frontmatter(
params.filepath,
{"tags": updated_tags}
)
return json.dumps({
"success": True,
"message": f"Removed {len(params.tags)} tag(s)",
"filepath": params.filepath,
"tags": updated_tags,
"removed": params.tags
}, indent=2)
except ObsidianAPIError as e:
return json.dumps({
"error": str(e),
"filepath": params.filepath,
"success": False
}, indent=2)
@mcp.tool(
name="obsidian_get_notes_info",
annotations={
"title": "Get Notes Metadata",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False
}
)
async def get_notes_info(params: GetNotesInfoInput) -> str:
"""Get metadata for multiple notes including tags, dates, and sizes.
Efficient way to get overview information about several Zettelkasten notes
without reading full content. Useful for analyzing note collections.
Args:
params (GetNotesInfoInput): Contains:
- filepaths (List[str]): Paths to files (max 50)
Returns:
str: JSON array with metadata for each file
Example:
Get info about all notes in a topic cluster to understand their relationships.
"""
results = []
for filepath in params.filepaths:
try:
# Get frontmatter
frontmatter = await obsidian_client.get_file_frontmatter(filepath)
# Get file content for size
content = await obsidian_client.read_file(filepath)
results.append({
"filepath": filepath,
"success": True,
"tags": frontmatter.get("tags", []),
"created": frontmatter.get("created", None),
"modified": frontmatter.get("modified", None),
"size_chars": len(content),
"has_frontmatter": bool(frontmatter)
})
except ObsidianAPIError as e:
results.append({
"filepath": filepath,
"success": False,
"error": str(e)
})
return json.dumps(results, indent=2)
# =============================================================================
# SERVER ENTRY POINT
# =============================================================================
def main():
"""Run the MCP server."""
mcp.run()
if __name__ == "__main__":
main()