Skip to main content
Glama

MCP Obsidian

by alexhholmes
mcp_obsidian.py50.2 kB
#!/usr/bin/env python3 import argparse import asyncio import hashlib import json import os import re import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple import chromadb import questionary from chromadb.config import Settings from fastmcp import FastMCP from langchain_text_splitters import RecursiveCharacterTextSplitter from questionary import Style from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer # Disable ChromaDB telemetry before importing to prevent errors os.environ["ANONYMIZED_TELEMETRY"] = "False" class Log: """Simple logger for consistent output formatting.""" @staticmethod def info(msg): print(f"ℹ️ {msg}", file=sys.stderr) @staticmethod def success(msg): print(f"✅ {msg}", file=sys.stderr) @staticmethod def error(msg): print(f"❌ {msg}", file=sys.stderr) @staticmethod def warning(msg): print(f"⚠️ {msg}", file=sys.stderr) # Configuration paths CONFIG_DIR = Path.home() / ".mcp-obsidian" CONFIG_FILE = CONFIG_DIR / "config.json" # ChromaDB settings CHROMA_DB_DIR = "chroma_db" CHROMA_COLLECTION_NAME = "obsidian_notes" CHROMA_COLLECTION_DESC = "Obsidian vault notes" # Text chunking settings CHUNK_SIZE = 1000 CHUNK_OVERLAP = 200 CHUNK_SEPARATORS = ["\n\n", "\n", ". ", " ", ""] MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 # 10 MB # Search settings DEFAULT_SEARCH_LIMIT = 10 MAX_SEARCH_LIMIT = 20 PREVIEW_LENGTH = 500 # File monitoring settings MONITOR_UPDATE_INTERVAL = 10 # seconds MONITOR_DEBOUNCE_SECONDS = 30 # seconds # Create FastMCP server instance mcp = FastMCP("mcp-obsidian") # Global ChromaDB client and collection chroma_client: Optional[chromadb.Client] = None chroma_collection: Optional[chromadb.Collection] = None # Lock for updating global ChromaDB references atomically import threading chroma_ref_lock = threading.Lock() custom_style = Style([ ('qmark', 'fg:#673ab7 bold'), ('question', 'bold'), ('answer', 'fg:#f44336 bold'), ('pointer', 'fg:#673ab7 bold'), ('highlighted', 'fg:#673ab7 bold'), ('selected', 'fg:#cc5454'), ('separator', 'fg:#cc5454'), ('instruction', ''), ('text', ''), ('disabled', 'fg:#858585 italic') ]) def check_vaults_configured(vaults: List, message: str = "No vaults configured yet.") -> bool: """Check if vaults are configured, print message if not.""" if not vaults: print(f"📭 {message}", file=sys.stderr) print("Use 'Add Vault Path' to configure your first vault.", file=sys.stderr) return False return True def add_vault_path(): """Add a new vault path to the configuration""" existing_config = load_config() vaults = existing_config.get("vaults", []) existing_paths = {v["path"] for v in vaults} while True: # Ask for vault path with path autocomplete vault_path = questionary.path( "Enter Obsidian vault path:", only_directories=True, style=custom_style ).ask() if not vault_path: break path = Path(vault_path).expanduser().resolve() # Check if already configured if str(path) in existing_paths: Log.warning(f"Vault already configured: {path}\n") continue # Validate path exists if not path.exists(): if not questionary.confirm( f"Path does not exist: {path}\nAdd anyway?", default=False, style=custom_style ).ask(): continue # Ask for vault name vault_name = questionary.text( "Vault name:", default=path.name, style=custom_style ).ask() vaults.append({ "name": vault_name, "path": str(path) }) existing_paths.add(str(path)) Log.success(f"Added: {vault_name} → {path}\n") if not questionary.confirm( "Add another vault?", default=False, style=custom_style ).ask(): break if vaults: save_config(vaults) print(f"\n📚 Total configured vaults: {len(vaults)}", file=sys.stderr) def list_vault_paths(): """List all configured vault paths""" config = load_config() vaults = config.get("vaults", []) if not check_vaults_configured(vaults): return print(f"📚 Configured Vaults ({len(vaults)}):\n", file=sys.stderr) for i, vault in enumerate(vaults, 1): print(f" {i}. {vault['name']}", file=sys.stderr) print(f" 📁 {vault['path']}", file=sys.stderr) if i < len(vaults): print(file=sys.stderr) def remove_vault_path(): """Remove a vault path from the configuration""" config = load_config() vaults = config.get("vaults", []) if not check_vaults_configured(vaults): return print(f"📚 Select vault to remove:\n", file=sys.stderr) # Create choices list with vault information choices = [] for i, vault in enumerate(vaults, 1): choice_text = f"{vault['name']} → {vault['path']}" choices.append({"name": choice_text, "value": i - 1}) # Add cancel option choices.append({"name": "Cancel", "value": -1}) selected = questionary.select( "Which vault would you like to remove?", choices=choices, style=custom_style ).ask() if selected == -1 or selected is None: print("Removal cancelled.", file=sys.stderr) return # Get the vault to remove vault_to_remove = vaults[selected] # Confirm deletion if questionary.confirm( f"Are you sure you want to remove '{vault_to_remove['name']}'?", default=False, style=custom_style ).ask(): # Remove the vault vaults.pop(selected) save_config(vaults) Log.success(f"Removed: {vault_to_remove['name']} → {vault_to_remove['path']}") else: print("Removal cancelled.", file=sys.stderr) def configure_claude_desktop(): """Configure Claude Desktop to use mcp-obsidian""" import platform # Determine the Claude Desktop config path based on platform system = platform.system() if system == "Darwin": # macOS claude_config_path = Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json" elif system == "Windows": claude_config_path = Path.home() / "AppData" / "Roaming" / "Claude" / "claude_desktop_config.json" else: # Linux claude_config_path = Path.home() / ".config" / "Claude" / "claude_desktop_config.json" print("🤖 Configuring Claude Desktop for mcp-obsidian\n", file=sys.stderr) # Check if Claude config file exists if not claude_config_path.exists(): print(f"📝 Creating Claude Desktop configuration at:\n {claude_config_path}\n", file=sys.stderr) claude_config_path.parent.mkdir(parents=True, exist_ok=True) claude_config = {} else: print(f"📝 Found Claude Desktop configuration at:\n {claude_config_path}\n", file=sys.stderr) try: with open(claude_config_path, 'r') as f: claude_config = json.load(f) except json.JSONDecodeError: Log.warning("Existing configuration is invalid. Creating new configuration.\n") claude_config = {} # Ensure mcpServers section exists if "mcpServers" not in claude_config: claude_config["mcpServers"] = {} # Check if obsidian is already configured if "obsidian" in claude_config["mcpServers"]: existing = claude_config["mcpServers"]["obsidian"] Log.warning("MCP Obsidian is already configured in Claude Desktop:") print(f" Command: {existing.get('command', 'Not set')}", file=sys.stderr) if not questionary.confirm( "\nDo you want to update the existing configuration?", default=True, style=custom_style ).ask(): print("Configuration cancelled.", file=sys.stderr) return # Add or update the obsidian configuration claude_config["mcpServers"]["obsidian"] = { "command": "mcp-obsidian" } # Save the updated configuration try: with open(claude_config_path, 'w') as f: json.dump(claude_config, f, indent=2) Log.success("\nClaude Desktop configuration updated successfully!") print("\n📋 Configuration added:", file=sys.stderr) print(' "obsidian": {', file=sys.stderr) print(' "command": "mcp-obsidian"', file=sys.stderr) print(' }', file=sys.stderr) print("\n🔄 Please restart Claude Desktop for the changes to take effect.", file=sys.stderr) # Check if vaults are configured config = load_config() vaults = config.get("vaults", []) if not vaults: Log.warning("\nNo vaults configured yet!") print(" Run 'mcp-obsidian configure' and select 'Add Vault Path' to add your Obsidian vaults.", file=sys.stderr) except Exception as e: print(f"❌ Failed to update Claude Desktop configuration: {e}", file=sys.stderr) print("\nYou can manually add the following to your Claude Desktop configuration:", file=sys.stderr) print(f"File: {claude_config_path}", file=sys.stderr) print('\n{', file=sys.stderr) print(' "mcpServers": {', file=sys.stderr) print(' "obsidian": {', file=sys.stderr) print(' "command": "mcp-obsidian"', file=sys.stderr) print(' }', file=sys.stderr) print(' }', file=sys.stderr) print('}', file=sys.stderr) def save_config(vaults=None, exclude_patterns=None): """Save the configuration to file Args: vaults: List of vault configurations (if None, keeps existing) exclude_patterns: List of exclude patterns (if None, keeps existing) """ CONFIG_DIR.mkdir(parents=True, exist_ok=True) # Load existing config to preserve unchanged values existing = load_config() config = { "vaults": vaults if vaults is not None else existing.get("vaults", []), "exclude_patterns": exclude_patterns if exclude_patterns is not None else existing.get("exclude_patterns", []), "version": "1.0" } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) print(f"💾 Configuration saved to: {CONFIG_FILE}", file=sys.stderr) def configure_exclude_paths(): """Configure paths to exclude from indexing (glob patterns)""" config = load_config() exclude_patterns = config.get("exclude_patterns", []) print("🚫 Configure Excluded Paths\n", file=sys.stderr) print("Use glob patterns to exclude files/folders from indexing.", file=sys.stderr) print("Examples:", file=sys.stderr) print(" • **/Archive/** (all Archive folders)", file=sys.stderr) print(" • **/Private/* (Private folder contents)", file=sys.stderr) print(" • **/*.tmp.md (temporary markdown files)", file=sys.stderr) print(" • **/Trash/** (Trash/Recycle folders)\n", file=sys.stderr) while True: # Show current patterns if any if exclude_patterns: print("📋 Current exclude patterns:", file=sys.stderr) for i, pattern in enumerate(exclude_patterns, 1): print(f" {i}. {pattern}", file=sys.stderr) print(file=sys.stderr) else: print("📭 No exclude patterns configured yet.\n", file=sys.stderr) # Ask what to do action = questionary.select( "What would you like to do?", choices=[ {"name": "Add Pattern", "value": "add"}, {"name": "Remove Pattern", "value": "remove"}, {"name": "Clear All Patterns", "value": "clear"}, {"name": "Back to Main Menu", "value": "back"} ], style=custom_style ).ask() if action == "back" or action is None: break elif action == "add": # Add new pattern pattern = questionary.text( "Enter glob pattern to exclude:", instruction="(e.g., **/Archive/** or **/*.tmp.md)", style=custom_style ).ask() if pattern and pattern not in exclude_patterns: exclude_patterns.append(pattern) Log.success(f"Added pattern: {pattern}") # Save updated config save_config(exclude_patterns=exclude_patterns) elif pattern in exclude_patterns: print(f"⚠️ Pattern already exists: {pattern}", file=sys.stderr) elif action == "remove" and exclude_patterns: # Remove a pattern choices = [{"name": pattern, "value": i} for i, pattern in enumerate(exclude_patterns)] choices.append({"name": "Cancel", "value": -1}) selected = questionary.select( "Select pattern to remove:", choices=choices, style=custom_style ).ask() if selected != -1 and selected is not None: removed_pattern = exclude_patterns.pop(selected) Log.success(f"Removed pattern: {removed_pattern}") # Save updated config save_config(exclude_patterns=exclude_patterns) elif action == "clear" and exclude_patterns: # Clear all patterns if questionary.confirm( f"Remove all {len(exclude_patterns)} exclude patterns?", default=False, style=custom_style ).ask(): exclude_patterns = [] save_config(exclude_patterns=exclude_patterns) Log.success("All exclude patterns cleared") print(file=sys.stderr) # Add spacing def configure(): """Configure Obsidian vault paths""" print("🗂️ MCP Obsidian Configuration\n", file=sys.stderr) while True: try: choice = questionary.select( "What would you like to do?", choices=[ {"name": "Add Vault Path", "value": "1", "shortcut_key": "1"}, {"name": "List Vault Paths", "value": "2", "shortcut_key": "2"}, {"name": "Remove Vault Path", "value": "3", "shortcut_key": "3"}, {"name": "Exclude Paths", "value": "4", "shortcut_key": "4"}, {"name": "Configure Claude Desktop", "value": "5", "shortcut_key": "5"}, {"name": "Exit (^C)", "value": "6", "shortcut_key": "6"} ], style=custom_style, use_shortcuts=True, use_arrow_keys=True, show_selected=True, use_jk_keys=False ).ask() except KeyboardInterrupt: sys.exit(0) # Exit silently on Ctrl+C if choice == "1": add_vault_path() elif choice == "2": list_vault_paths() elif choice == "3": remove_vault_path() elif choice == "4": configure_exclude_paths() elif choice == "5": configure_claude_desktop() elif choice == "6" or choice is None: break print(file=sys.stderr) # Add spacing between operations def is_file_excluded(file_path: Path, vault_path: Path) -> bool: """Check if a file should be excluded based on configured patterns. Supports patterns like: - **/Archive/** - any Archive directory - **/*.tmp.md - temporary markdown files - **/Private/* - Private folder contents """ import fnmatch config = load_config() exclude_patterns = config.get("exclude_patterns", []) if not exclude_patterns: return False # Get relative path from vault root try: relative_str = str(file_path.relative_to(vault_path)).replace('\\', '/') except ValueError: relative_str = str(file_path).replace('\\', '/') # Check each pattern for pattern in exclude_patterns: # Convert ** to fnmatch format fnmatch_pattern = pattern.replace('**/', '*/').replace('**', '*') # Direct match if fnmatch.fnmatch(relative_str, pattern) or fnmatch.fnmatch(relative_str, fnmatch_pattern): return True # Check if any directory in path matches a directory pattern (e.g., **/Archive/**) if '**/' in pattern: dir_name = pattern.replace('**/', '').rstrip('/*').rstrip('/**') if dir_name in relative_str.split('/'): return True return False def get_markdown_files(vault_path: str) -> List[Path]: """Recursively get all markdown files from a vault""" vault = Path(vault_path) if not vault.exists(): return [] markdown_files = [] # Use rglob for recursive search for file_path in vault.rglob("*.md"): # Skip hidden directories and files if any(part.startswith('.') for part in file_path.parts): continue # Skip files matching exclude patterns if is_file_excluded(file_path, vault): continue markdown_files.append(file_path) return markdown_files def get_line_boundaries(text: str, chunk_start: int, chunk_end: int) -> Tuple[int, int]: """Calculate line numbers for a chunk within the original text""" lines_before_chunk = text[:chunk_start].count('\n') lines_in_chunk = text[chunk_start:chunk_end].count('\n') start_line = lines_before_chunk + 1 end_line = start_line + lines_in_chunk return start_line, end_line def chunk_markdown_content(file_path: Path, vault_path: Path, vault_name: str) -> List[Dict]: """Read and chunk a markdown file with enhanced metadata""" try: # Check file size before loading file_size = os.path.getsize(file_path) if file_size > MAX_FILE_SIZE_BYTES: size_mb = file_size / (1024 * 1024) Log.warning(f"Skipping large file ({size_mb:.1f}MB): {file_path.name}") return [] # Skip files that are too large with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Get file metadata file_stats = os.stat(file_path) modified_time = int(file_stats.st_mtime) # Create file content hash for change detection file_hash = hashlib.md5(content.encode()).hexdigest() # Get relative path from vault root relative_path = file_path.relative_to(vault_path) # Get file stem (filename without extension) title = file_path.stem # Create text splitter for markdown text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separators=CHUNK_SEPARATORS, keep_separator=False ) # Split the content and track positions chunks = text_splitter.create_documents([content]) # Create document chunks with metadata documents = [] current_position = 0 for i, chunk in enumerate(chunks): chunk_text = chunk.page_content # Find the position of this chunk in the original content chunk_start = content.find(chunk_text, current_position) chunk_end = chunk_start + len(chunk_text) current_position = chunk_end # Get line boundaries start_line, end_line = get_line_boundaries(content, chunk_start, chunk_end) doc_id = hashlib.md5(f"{file_path}_{i}".encode()).hexdigest() documents.append({ "id": doc_id, "content": chunk_text, "metadata": { "title": title, "source": str(relative_path), "vault": vault_name, "modified": modified_time, "file_hash": file_hash, # Add hash for change detection "start_line": start_line, "end_line": end_line, "file_path": str(file_path), # Keep absolute path for reference "file_size_bytes": file_size, # Size of the full document "chunk_index": i, "total_chunks": len(chunks) } }) return documents except Exception as e: print(f"Error processing {file_path}: {e}", file=sys.stderr) return [] def get_files_to_update(collection: chromadb.Collection, vault_files: List[Path]) -> Tuple[List[Path], List[str]]: """Determine which files need to be updated based on modification time""" files_to_update = [] ids_to_delete = [] # Get all existing documents metadata try: existing_docs = collection.get(include=['metadatas']) if not existing_docs['ids']: # Empty collection, all files need indexing return vault_files, [] except: # Collection doesn't exist or is empty return vault_files, [] # Create a map of file paths to their metadata existing_files = {} file_to_ids = {} # Map file paths to their document IDs for i, doc_id in enumerate(existing_docs['ids']): metadata = existing_docs['metadatas'][i] file_path = metadata.get('file_path') if file_path: if file_path not in file_to_ids: file_to_ids[file_path] = [] file_to_ids[file_path].append(doc_id) if file_path not in existing_files: existing_files[file_path] = metadata # Check each file in the vault vault_file_paths = set(str(f) for f in vault_files) for file_path in vault_files: file_path_str = str(file_path) if file_path_str not in existing_files: # New file, needs to be indexed files_to_update.append(file_path) else: # Check if file has been modified try: file_stats = os.stat(file_path) current_mtime = int(file_stats.st_mtime) stored_mtime = existing_files[file_path_str].get('modified', 0) if current_mtime > stored_mtime: # File has been modified files_to_update.append(file_path) # Mark old chunks for deletion if file_path_str in file_to_ids: ids_to_delete.extend(file_to_ids[file_path_str]) except FileNotFoundError: # File no longer exists, will be handled below pass # Find files that exist in DB but not on disk (deleted files) for file_path_str in existing_files: if file_path_str not in vault_file_paths: # File has been deleted, remove its chunks if file_path_str in file_to_ids: ids_to_delete.extend(file_to_ids[file_path_str]) return files_to_update, ids_to_delete def initialize_vector_store(vaults: List[Dict], force_rebuild: bool = False) -> Tuple[chromadb.Client, chromadb.Collection]: """Initialize ChromaDB and incrementally index vault content Args: vaults: List of vault configurations force_rebuild: If True, clear existing database before rebuilding """ # Handle force rebuild under lock if force_rebuild: with chroma_ref_lock: print("🔥 Force rebuilding index (clearing existing data)...", file=sys.stderr) # Clear the existing database db_path = CONFIG_DIR / CHROMA_DB_DIR if db_path.exists(): import shutil shutil.rmtree(db_path) print(" Cleared existing index", file=sys.stderr) # Clear global references while rebuilding global chroma_client, chroma_collection chroma_client = None chroma_collection = None print("🔄 Initializing vector store...", file=sys.stderr) # Disable ChromaDB telemetry to avoid errors os.environ["ANONYMIZED_TELEMETRY"] = "False" # Initialize ChromaDB with persistent storage db_path = CONFIG_DIR / CHROMA_DB_DIR db_path.mkdir(parents=True, exist_ok=True) client = chromadb.PersistentClient( path=str(db_path), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Get or create collection (don't delete existing) collection = client.get_or_create_collection( name=CHROMA_COLLECTION_NAME, metadata={"description": CHROMA_COLLECTION_DESC} ) print(f"📚 Checking {len(vaults)} vault(s) for updates...", file=sys.stderr) total_files_updated = 0 total_files_deleted = 0 total_chunks_added = 0 all_ids_to_delete = [] for vault in vaults: vault_path = Path(vault["path"]) vault_name = vault["name"] print(f"\n🗂️ Checking vault: {vault_name}", file=sys.stderr) print(f" Path: {vault_path}", file=sys.stderr) # Get all markdown files recursively markdown_files = get_markdown_files(str(vault_path)) print(f" Found {len(markdown_files)} markdown files", file=sys.stderr) # Determine which files need updating files_to_update, ids_to_delete = get_files_to_update(collection, markdown_files) all_ids_to_delete.extend(ids_to_delete) if not files_to_update and not ids_to_delete: Log.success(" Vault is up to date") continue if ids_to_delete: print(f" Removing {len(set(f.split('_')[0] for f in ids_to_delete if '_' in f))} outdated/deleted files", file=sys.stderr) total_files_deleted += len(set(f.split('_')[0] for f in ids_to_delete if '_' in f)) if files_to_update: print(f" Updating {len(files_to_update)} changed/new files", file=sys.stderr) # Process files that need updating for idx, file_path in enumerate(files_to_update, 1): print(f" Processing file {idx}/{len(files_to_update)}: {file_path.name}", end="\r", file=sys.stderr) documents = chunk_markdown_content(file_path, vault_path, vault_name) if documents: # Prepare data for ChromaDB ids = [doc["id"] for doc in documents] contents = [doc["content"] for doc in documents] metadatas = [doc["metadata"] for doc in documents] # Add to collection under lock protection with chroma_ref_lock: collection.add( ids=ids, documents=contents, metadatas=metadatas ) total_files_updated += 1 total_chunks_added += len(documents) # Clear the progress line and show completion Log.success(f" Updated {len(files_to_update)} files from {vault_name} ") # Delete outdated chunks if any if all_ids_to_delete: print(f"\n🗑️ Removing {len(all_ids_to_delete)} outdated chunks...", file=sys.stderr) try: with chroma_ref_lock: collection.delete(ids=all_ids_to_delete) except Exception as e: print(f" Warning: Could not delete some chunks: {e}", file=sys.stderr) total_docs = collection.count() Log.success("\nIndexing complete!") print(f" Files updated: {total_files_updated}", file=sys.stderr) print(f" Files removed: {total_files_deleted}", file=sys.stderr) print(f" Chunks added: {total_chunks_added}", file=sys.stderr) print(f" Total database size: {total_docs} documents", file=sys.stderr) return client, collection def get_full_path_from_metadata(metadata: Dict) -> str: """ Get the full file path from search result metadata. Args: metadata: The metadata dictionary from search results Returns: The full absolute path to the file """ # Try to get full path from stored file_path first full_path = metadata.get('file_path') # If not available, construct it from vault config if not full_path: vault_name = metadata.get('vault', 'Unknown vault') relative_path = metadata.get('source', 'Unknown') if vault_name != 'Unknown vault' and relative_path != 'Unknown': config = load_config() vaults = config.get("vaults", []) for vault in vaults: if vault["name"] == vault_name: vault_path = vault["path"] full_path = str(Path(vault_path) / relative_path) break # Fallback to relative path if we couldn't construct full path if not full_path: full_path = metadata.get('source', 'Unknown') return full_path def parse_contains_query(query: str) -> Dict: """ Parse query with quoted phrases into semantic and exact phrase components. Supports: - "exact phrase" - Must contain this exact phrase - -"excluded phrase" - Must not contain this phrase - Regular text - For semantic search """ required_phrases = [] excluded_phrases = [] # Pattern to find quoted strings with optional negative prefix pattern = r'(-?)"([^"]+)"' # Extract all quoted phrases remaining = query for match in re.finditer(pattern, query): full_match = match.group(0) is_negative = match.group(1) == '-' phrase = match.group(2) if is_negative: excluded_phrases.append(phrase) else: required_phrases.append(phrase) # Remove from query remaining = remaining.replace(full_match, '', 1) # Clean up remaining semantic query semantic_query = ' '.join(remaining.split()).strip() return { 'semantic_query': semantic_query, 'required_phrases': required_phrases, 'excluded_phrases': excluded_phrases } @mcp.tool async def search( query: str, vault_filter: Optional[str] = None, since_date: Optional[str] = None, until_date: Optional[str] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "desc", limit: int = DEFAULT_SEARCH_LIMIT ) -> str: """ Unified search with semantic, exact phrase, and temporal filtering. Query syntax: - "exact phrase" - Must contain this exact phrase (case-sensitive) - -"excluded phrase" - Must not contain this phrase - Regular text - Semantic similarity search Args: query: The search query with optional quoted phrases vault_filter: Optional filter results to a specific vault name since_date: Optional start date in YYYY-MM-DD format (inclusive) until_date: Optional end date in YYYY-MM-DD format (inclusive) sort_by: Sort results by "relevance" (default), "date", "title", or "path" sort_order: Sort order "desc" (default) or "asc" limit: Maximum number of results to return (default: 10, max: 20) Note: Results include file size metadata to help determine if retrieving the full document is feasible. Files larger than 10MB are not indexed. Examples: - 'PKM "second brain"' - Semantic search for PKM containing "second brain" - 'meeting -"cancelled"' with since_date="2024-01-01" - Recent meeting notes - '"daily note"' with sort_by="date" - Daily notes sorted by modification date - 'python' with sort_by="title", sort_order="asc" - Python notes sorted A-Z """ global chroma_collection # Cap limit at max limit = min(limit, MAX_SEARCH_LIMIT) # Parse the query for quoted phrases parsed = parse_contains_query(query) # Parse dates and convert to timestamps if provided since_timestamp = None until_timestamp = None try: if since_date: since_dt = datetime.strptime(since_date, "%Y-%m-%d") since_timestamp = int(since_dt.timestamp()) if until_date: # Add 23:59:59 to include the entire day until_dt = datetime.strptime(until_date, "%Y-%m-%d") + timedelta(days=1) - timedelta(seconds=1) until_timestamp = int(until_dt.timestamp()) except ValueError as e: return f"Invalid date format. Please use YYYY-MM-DD. Error: {e}" # Acquire lock for the entire ChromaDB operation with chroma_ref_lock: if not chroma_collection: return "Vector store not initialized. Please restart the server." # Build where clause for metadata filtering where_conditions = [] # Add date range filters if provided if since_timestamp and until_timestamp: where_conditions.append({'modified': {'$gte': since_timestamp}}) where_conditions.append({'modified': {'$lte': until_timestamp}}) elif since_timestamp: where_conditions.append({'modified': {'$gte': since_timestamp}}) elif until_timestamp: where_conditions.append({'modified': {'$lte': until_timestamp}}) # Add vault filter if specified if vault_filter: where_conditions.append({'vault': vault_filter}) # Combine metadata conditions where_clause = None if len(where_conditions) > 1: where_clause = {'$and': where_conditions} elif where_conditions: where_clause = where_conditions[0] # Build where_document clause for phrase filtering doc_conditions = [] for phrase in parsed['required_phrases']: doc_conditions.append({"$contains": phrase}) for phrase in parsed['excluded_phrases']: doc_conditions.append({"$not_contains": phrase}) # Combine document conditions where_document = None if len(doc_conditions) > 1: where_document = {"$and": doc_conditions} elif doc_conditions: where_document = doc_conditions[0] # Execute search based on what we have if parsed['semantic_query']: # Semantic search with optional phrase and temporal filters results = chroma_collection.query( query_texts=[parsed['semantic_query']], n_results=limit, where=where_clause, where_document=where_document ) elif where_document or where_clause: # Pure phrase/temporal search (no semantic component) results = chroma_collection.get( limit=limit, where=where_clause, where_document=where_document, include=['documents', 'metadatas'] ) # Restructure to match query() format if results['documents']: results = { 'documents': [results['documents']], 'metadatas': [results['metadatas']], 'distances': [[0.5] * len(results['documents'])] # Default distance } else: results = {'documents': [[]], 'metadatas': [[]], 'distances': [[]]} else: # No query at all return "Please provide a search query." # Process and format the results formatted_results = [] has_temporal_filter = since_date or until_date if results["documents"] and results["documents"][0]: # Collect all docs with metadata docs_with_metadata = [] for i, doc in enumerate(results["documents"][0]): metadata = results["metadatas"][0][i] if results["metadatas"] else {} distance = results["distances"][0][i] if results["distances"] else None docs_with_metadata.append((doc, metadata, distance)) # Apply sorting based on sort_by parameter (default to relevance) if sort_by or sort_order != "desc": # If sort parameters were explicitly set docs_with_metadata = _sort_results(docs_with_metadata, sort_by or "relevance", sort_order) elif not parsed['semantic_query']: # No semantic query means no relevance sorting needed # Default to date sorting for non-semantic queries docs_with_metadata = _sort_results(docs_with_metadata, "date", "desc") # else: Keep ChromaDB's relevance order (already sorted by distance) # Format the sorted results for doc, metadata, distance in docs_with_metadata: # Show date if temporal filter is used formatted_results.append(format_search_result(doc, metadata, distance, parsed, show_date=has_temporal_filter)) if not formatted_results: error_parts = [] if parsed['required_phrases']: error_parts.append(f"containing: {', '.join(['\"' + p + '\"' for p in parsed['required_phrases']])}") if since_date or until_date: date_parts = [] if since_date: date_parts.append(f"since {since_date}") if until_date: date_parts.append(f"until {until_date}") error_parts.append(" and ".join(date_parts)) if error_parts: return f"No results found {' '.join(error_parts)}." return "No results found for your query." # Return results as a single formatted string return "\n\n---\n\n".join(formatted_results) def _sort_results(docs_with_metadata: List[Tuple], sort_by: str, sort_order: str) -> List[Tuple]: """ Sort search results based on the specified criteria. Args: docs_with_metadata: List of tuples (doc, metadata, distance) sort_by: Sort criteria ("relevance", "date", "title", "path") sort_order: Sort order ("asc" or "desc") Returns: Sorted list of tuples """ # Define sort key functions for each type if sort_by == "date": # Sort by modification timestamp sort_key = lambda x: x[1].get('modified', 0) elif sort_by == "title": # Sort alphabetically by title (case-insensitive) sort_key = lambda x: x[1].get('title', '').lower() elif sort_by == "path": # Sort by relative path (source) sort_key = lambda x: x[1].get('source', '').lower() else: # Default to "relevance" # Sort by distance (lower is better) # Note: When sorting by relevance, we want LOWER distances first (better matches) # So for desc order (default), we actually use ascending sort on distance sort_key = lambda x: x[2] if x[2] is not None else float('inf') # Apply sort reverse = (sort_order == "desc") # Special case for relevance: desc means best matches first (lowest distance) if sort_by == "relevance" or sort_by is None: reverse = not reverse # Invert for relevance return sorted(docs_with_metadata, key=sort_key, reverse=reverse) def format_size(bytes_size: int) -> str: """Format bytes to human readable string.""" if bytes_size < 1024: return f"{bytes_size} bytes" elif bytes_size < 1024 * 1024: return f"{bytes_size / 1024:.1f} KB" else: return f"{bytes_size / (1024 * 1024):.1f} MB" def format_relative_time(timestamp: int) -> Tuple[str, str]: """Format timestamp to date string and relative time.""" dt = datetime.fromtimestamp(timestamp) date_str = dt.strftime("%Y-%m-%d %H:%M") now = datetime.now() diff = now - dt if diff.days == 0: relative = f"{diff.seconds // 3600} hours ago" if diff.seconds >= 3600 else f"{diff.seconds // 60} minutes ago" elif diff.days == 1: relative = "Yesterday" elif diff.days < 7: relative = f"{diff.days} days ago" elif diff.days < 30: relative = f"{diff.days // 7} weeks ago" else: relative = f"{diff.days // 30} months ago" return date_str, relative def format_search_result(doc: str, metadata: Dict, distance: Optional[float], parsed: Dict, show_date: bool = False) -> str: """Helper function to format a single search result""" # Create preview and highlight matched phrases preview = doc[:PREVIEW_LENGTH] if len(doc) > PREVIEW_LENGTH: preview += '...' # Bold the required phrases in preview for phrase in parsed.get('required_phrases', []): pattern = re.compile(re.escape(phrase), re.IGNORECASE) preview = pattern.sub(lambda m: f"**{m.group()}**", preview) # Get full path full_path = get_full_path_from_metadata(metadata) # Build result text result_text = f"**{metadata.get('title', 'Untitled')}** ({metadata.get('vault', 'Unknown vault')})\n" result_text += f"Path: {full_path}\n" # Add file size if available file_size_bytes = metadata.get('file_size_bytes') if file_size_bytes: result_text += f"Size: {format_size(file_size_bytes)}\n" # Add date info if requested if show_date: modified_timestamp = metadata.get('modified', 0) if modified_timestamp: date_str, relative_time = format_relative_time(modified_timestamp) result_text += f"Modified: {date_str} ({relative_time})\n" result_text += f"Lines: {metadata.get('start_line', '?')}-{metadata.get('end_line', '?')}\n" if distance is not None: result_text += f"Score: {1 - distance if isinstance(distance, (int, float)) else 'N/A'}\n" result_text += f"\n{preview}" return result_text @mcp.tool async def reindex_vaults() -> str: """ Manually trigger a re-index of all configured Obsidian vaults. This will: - Check all configured vaults for new, modified, or deleted files - Update the vector database with any changes - Return a summary of the indexing operation """ global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: return "No vaults configured. Please configure vaults first using 'mcp-obsidian configure'." try: # Run re-indexing start_time = time.time() # Run the update in a thread to avoid blocking the async context loop = asyncio.get_event_loop() new_client, new_collection = await loop.run_in_executor( None, initialize_vector_store, vaults ) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection elapsed_time = time.time() - start_time # Get statistics total_docs = new_collection.count() return ( f"✅ Manual re-indexing completed successfully!\n\n" f"📊 Statistics:\n" f"• Vaults indexed: {len(vaults)}\n" f"• Total documents in database: {total_docs}\n" f"• Time taken: {elapsed_time:.2f} seconds\n\n" f"The vector store has been updated with the latest changes from your Obsidian vaults." ) except Exception as e: return f"❌ Re-indexing failed with error: {str(e)}\n\nPlease check your vault configurations and try again." class VaultChangeHandler(FileSystemEventHandler): """Handle file system changes in Obsidian vaults""" def __init__(self, vaults: List[Dict], update_callback): self.vaults = vaults self.update_callback = update_callback self.last_update = 0 self.pending_update = False self.debounce_seconds = MONITOR_DEBOUNCE_SECONDS def on_any_event(self, event: FileSystemEvent): """Handle any file system event""" # Only care about markdown files if not event.src_path.endswith('.md'): return # Skip hidden files/directories if any(part.startswith('.') for part in Path(event.src_path).parts): return # Mark that we have a pending update self.pending_update = True self.last_update = time.time() # Log the change event_type = event.event_type file_name = Path(event.src_path).name print(f"\n📝 Detected {event_type}: {file_name}", file=sys.stderr) async def monitor_vaults(vaults: List[Dict], update_interval: int = MONITOR_UPDATE_INTERVAL): """Monitor vaults for changes and trigger re-indexing""" global chroma_client, chroma_collection # Create file system event handler handler = VaultChangeHandler(vaults, lambda: initialize_vector_store(vaults)) # Set up observers for each vault observers = [] for vault in vaults: vault_path = Path(vault["path"]) if vault_path.exists(): observer = Observer() observer.schedule(handler, str(vault_path), recursive=True) observer.start() observers.append(observer) print(f"👁️ Monitoring vault: {vault['name']}", file=sys.stderr) try: while True: await asyncio.sleep(update_interval) # Check if we have pending updates and enough time has passed if handler.pending_update: time_since_last = time.time() - handler.last_update if time_since_last >= handler.debounce_seconds: print(f"\n🔄 Re-indexing vaults after changes...", file=sys.stderr) handler.pending_update = False # Run the update in a thread to avoid blocking loop = asyncio.get_event_loop() new_client, new_collection = await loop.run_in_executor( None, initialize_vector_store, vaults ) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection Log.success("Re-indexing complete! Ready for queries.") except asyncio.CancelledError: # Clean shutdown for observer in observers: observer.stop() observer.join() raise def serve(): """Run the MCP server.""" global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: print("No vaults configured. Run with 'configure' first.", file=sys.stderr) sys.exit(1) print("🚀 Starting MCP Obsidian server...", file=sys.stderr) # Initialize vector store new_client, new_collection = initialize_vector_store(vaults) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection print("\n📡 MCP server ready!", file=sys.stderr) print("Vector store initialized and ready for queries.", file=sys.stderr) # Start the file system monitor in a background thread # Since FastMCP doesn't support startup/shutdown hooks, we'll use a simpler approach import threading def start_monitor(): """Start monitoring in a separate thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(monitor_vaults(vaults)) except KeyboardInterrupt: pass # Start monitoring in a background thread monitor_thread = threading.Thread(target=start_monitor, daemon=True) monitor_thread.start() print("👁️ File system monitoring started", file=sys.stderr) # Run the FastMCP server (it handles its own event loop) mcp.run() def index_vaults(force=False): """Rebuild search index for all configured vaults""" global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: print("No vaults configured. Run 'mcp-obsidian configure' first.", file=sys.stderr) sys.exit(1) print("🔄 Rebuilding search index for all configured vaults...", file=sys.stderr) print(f"Found {len(vaults)} vault(s) to index.\n", file=sys.stderr) # Initialize vector store with force flag (handles locking internally) new_client, new_collection = initialize_vector_store(vaults, force_rebuild=force) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection # Show results if new_collection: doc_count = new_collection.count() Log.success("\nIndex rebuilt successfully!") print(f"📊 Total documents indexed: {doc_count}", file=sys.stderr) # Show vaults that were indexed print("\n📁 Vaults indexed:", file=sys.stderr) for vault in vaults: print(f" - {vault['name']} ({vault['path']})", file=sys.stderr) else: Log.error("Failed to rebuild index.") sys.exit(1) def load_config(): """Load configuration""" if not CONFIG_FILE.exists(): return {"vaults": []} with open(CONFIG_FILE, 'r') as f: return json.load(f) def main(): """Main entry point for the MCP Obsidian server.""" parser = argparse.ArgumentParser(description="MCP Obsidian Server") subparsers = parser.add_subparsers(dest="command", help="Commands") subparsers.add_parser("configure", help="Configure vault paths") index_parser = subparsers.add_parser("index", help="Rebuild search index for all configured vaults") index_parser.add_argument("-f", "--force", action="store_true", help="Force complete rebuild by clearing existing index") # Parse arguments args = parser.parse_args() commands = { "configure": configure, "index": lambda: index_vaults(args.force), None: serve } commands.get(args.command, serve)() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alexhholmes/mcp-obsidian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server