Skip to main content
Glama

MCP Obsidian

by alexhholmes
mcp_obsidian.py33.3 kB
#!/usr/bin/env python3 import argparse import json import sys import hashlib import os import asyncio from pathlib import Path from typing import List, Dict, Tuple, Optional import time from datetime import datetime, timedelta # Disable ChromaDB telemetry before importing to prevent errors os.environ["ANONYMIZED_TELEMETRY"] = "False" import questionary from questionary import Style import chromadb from chromadb.config import Settings from langchain_text_splitters import RecursiveCharacterTextSplitter from fastmcp import FastMCP from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, FileSystemEvent CONFIG_DIR = Path.home() / ".mcp-obsidian" CONFIG_FILE = CONFIG_DIR / "config.json" # Create FastMCP server instance mcp = FastMCP("mcp-obsidian") # Global ChromaDB client and collection chroma_client: Optional[chromadb.Client] = None chroma_collection: Optional[chromadb.Collection] = None # Lock for updating global ChromaDB references atomically import threading chroma_ref_lock = threading.Lock() custom_style = Style([ ('qmark', 'fg:#673ab7 bold'), ('question', 'bold'), ('answer', 'fg:#f44336 bold'), ('pointer', 'fg:#673ab7 bold'), ('highlighted', 'fg:#673ab7 bold'), ('selected', 'fg:#cc5454'), ('separator', 'fg:#cc5454'), ('instruction', ''), ('text', ''), ('disabled', 'fg:#858585 italic') ]) def add_vault_path(): """Add a new vault path to the configuration""" existing_config = load_config() vaults = existing_config.get("vaults", []) existing_paths = {v["path"] for v in vaults} while True: # Ask for vault path with path autocomplete vault_path = questionary.path( "Enter Obsidian vault path:", only_directories=True, style=custom_style ).ask() if not vault_path: break path = Path(vault_path).expanduser().resolve() # Check if already configured if str(path) in existing_paths: print(f"⚠️ Vault already configured: {path}\n", file=sys.stderr) continue # Validate path exists if not path.exists(): if not questionary.confirm( f"Path does not exist: {path}\nAdd anyway?", default=False, style=custom_style ).ask(): continue # Ask for vault name vault_name = questionary.text( "Vault name:", default=path.name, style=custom_style ).ask() vaults.append({ "name": vault_name, "path": str(path) }) existing_paths.add(str(path)) print(f"✅ Added: {vault_name} → {path}\n", file=sys.stderr) if not questionary.confirm( "Add another vault?", default=False, style=custom_style ).ask(): break if vaults: save_config(vaults) print(f"\n📚 Total configured vaults: {len(vaults)}", file=sys.stderr) def list_vault_paths(): """List all configured vault paths""" config = load_config() vaults = config.get("vaults", []) if not vaults: print("📭 No vaults configured yet.", file=sys.stderr) print("Use 'Add Vault Path' to configure your first vault.", file=sys.stderr) return print(f"📚 Configured Vaults ({len(vaults)}):\n", file=sys.stderr) for i, vault in enumerate(vaults, 1): print(f" {i}. {vault['name']}", file=sys.stderr) print(f" 📁 {vault['path']}", file=sys.stderr) if i < len(vaults): print(file=sys.stderr) def remove_vault_path(): """Remove a vault path from the configuration""" config = load_config() vaults = config.get("vaults", []) if not vaults: print("📭 No vaults configured yet.", file=sys.stderr) print("Use 'Add Vault Path' to configure your first vault.", file=sys.stderr) return print(f"📚 Select vault to remove:\n", file=sys.stderr) # Create choices list with vault information choices = [] for i, vault in enumerate(vaults, 1): choice_text = f"{vault['name']} → {vault['path']}" choices.append({"name": choice_text, "value": i - 1}) # Add cancel option choices.append({"name": "Cancel", "value": -1}) selected = questionary.select( "Which vault would you like to remove?", choices=choices, style=custom_style ).ask() if selected == -1 or selected is None: print("Removal cancelled.", file=sys.stderr) return # Get the vault to remove vault_to_remove = vaults[selected] # Confirm deletion if questionary.confirm( f"Are you sure you want to remove '{vault_to_remove['name']}'?", default=False, style=custom_style ).ask(): # Remove the vault vaults.pop(selected) # Save updated configuration CONFIG_DIR.mkdir(parents=True, exist_ok=True) config = { "vaults": vaults, "version": "1.0" } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) print(f"✅ Removed: {vault_to_remove['name']} → {vault_to_remove['path']}", file=sys.stderr) print(f"💾 Configuration updated: {CONFIG_FILE}", file=sys.stderr) else: print("Removal cancelled.", file=sys.stderr) def save_config(vaults): """Save the configuration to file""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) config = { "vaults": vaults, "version": "1.0" } with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) print(f"💾 Configuration saved to: {CONFIG_FILE}", file=sys.stderr) def configure(): """Configure Obsidian vault paths""" print("🗂️ MCP Obsidian Configuration\n", file=sys.stderr) while True: try: choice = questionary.select( "What would you like to do?", choices=[ {"name": "Add Vault Path", "value": "1", "shortcut_key": "1"}, {"name": "List Vault Paths", "value": "2", "shortcut_key": "2"}, {"name": "Remove Vault Path", "value": "3", "shortcut_key": "3"}, {"name": "Exit (^C)", "value": "4", "shortcut_key": "4"} ], style=custom_style, use_shortcuts=True, use_arrow_keys=True, show_selected=True, use_jk_keys=False ).ask() except KeyboardInterrupt: sys.exit(0) # Exit silently on Ctrl+C if choice == "1": add_vault_path() elif choice == "2": list_vault_paths() elif choice == "3": remove_vault_path() elif choice == "4" or choice is None: break print(file=sys.stderr) # Add spacing between operations def get_markdown_files(vault_path: str) -> List[Path]: """Recursively get all markdown files from a vault""" vault = Path(vault_path) if not vault.exists(): return [] markdown_files = [] # Use rglob for recursive search for file_path in vault.rglob("*.md"): # Skip hidden directories and files if any(part.startswith('.') for part in file_path.parts): continue markdown_files.append(file_path) return markdown_files def get_line_boundaries(text: str, chunk_start: int, chunk_end: int) -> Tuple[int, int]: """Calculate line numbers for a chunk within the original text""" lines_before_chunk = text[:chunk_start].count('\n') lines_in_chunk = text[chunk_start:chunk_end].count('\n') start_line = lines_before_chunk + 1 end_line = start_line + lines_in_chunk return start_line, end_line def chunk_markdown_content(file_path: Path, vault_path: Path, vault_name: str) -> List[Dict]: """Read and chunk a markdown file with enhanced metadata""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Get file metadata file_stats = os.stat(file_path) modified_time = int(file_stats.st_mtime) # Create file content hash for change detection file_hash = hashlib.md5(content.encode()).hexdigest() # Get relative path from vault root relative_path = file_path.relative_to(vault_path) # Get file stem (filename without extension) title = file_path.stem # Create text splitter for markdown text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", ". ", " ", ""], keep_separator=False ) # Split the content and track positions chunks = text_splitter.create_documents([content]) # Create document chunks with metadata documents = [] current_position = 0 for i, chunk in enumerate(chunks): chunk_text = chunk.page_content # Find the position of this chunk in the original content chunk_start = content.find(chunk_text, current_position) chunk_end = chunk_start + len(chunk_text) current_position = chunk_end # Get line boundaries start_line, end_line = get_line_boundaries(content, chunk_start, chunk_end) doc_id = hashlib.md5(f"{file_path}_{i}".encode()).hexdigest() documents.append({ "id": doc_id, "content": chunk_text, "metadata": { "title": title, "source": str(relative_path), "vault": vault_name, "modified": modified_time, "file_hash": file_hash, # Add hash for change detection "start_line": start_line, "end_line": end_line, "file_path": str(file_path), # Keep absolute path for reference "chunk_index": i, "total_chunks": len(chunks) } }) return documents except Exception as e: print(f"Error processing {file_path}: {e}", file=sys.stderr) return [] def get_files_to_update(collection: chromadb.Collection, vault_files: List[Path]) -> Tuple[List[Path], List[str]]: """Determine which files need to be updated based on modification time""" files_to_update = [] ids_to_delete = [] # Get all existing documents metadata try: existing_docs = collection.get(include=['metadatas']) if not existing_docs['ids']: # Empty collection, all files need indexing return vault_files, [] except: # Collection doesn't exist or is empty return vault_files, [] # Create a map of file paths to their metadata existing_files = {} file_to_ids = {} # Map file paths to their document IDs for i, doc_id in enumerate(existing_docs['ids']): metadata = existing_docs['metadatas'][i] file_path = metadata.get('file_path') if file_path: if file_path not in file_to_ids: file_to_ids[file_path] = [] file_to_ids[file_path].append(doc_id) if file_path not in existing_files: existing_files[file_path] = metadata # Check each file in the vault vault_file_paths = set(str(f) for f in vault_files) for file_path in vault_files: file_path_str = str(file_path) if file_path_str not in existing_files: # New file, needs to be indexed files_to_update.append(file_path) else: # Check if file has been modified try: file_stats = os.stat(file_path) current_mtime = int(file_stats.st_mtime) stored_mtime = existing_files[file_path_str].get('modified', 0) if current_mtime > stored_mtime: # File has been modified files_to_update.append(file_path) # Mark old chunks for deletion if file_path_str in file_to_ids: ids_to_delete.extend(file_to_ids[file_path_str]) except FileNotFoundError: # File no longer exists, will be handled below pass # Find files that exist in DB but not on disk (deleted files) for file_path_str in existing_files: if file_path_str not in vault_file_paths: # File has been deleted, remove its chunks if file_path_str in file_to_ids: ids_to_delete.extend(file_to_ids[file_path_str]) return files_to_update, ids_to_delete def initialize_vector_store(vaults: List[Dict]) -> Tuple[chromadb.Client, chromadb.Collection]: """Initialize ChromaDB and incrementally index vault content""" print("🔄 Initializing vector store...", file=sys.stderr) # Disable ChromaDB telemetry to avoid errors os.environ["ANONYMIZED_TELEMETRY"] = "False" # Initialize ChromaDB with persistent storage db_path = CONFIG_DIR / "chroma_db" db_path.mkdir(parents=True, exist_ok=True) client = chromadb.PersistentClient( path=str(db_path), settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) # Get or create collection (don't delete existing) collection = client.get_or_create_collection( name="obsidian_notes", metadata={"description": "Obsidian vault notes"} ) print(f"📚 Checking {len(vaults)} vault(s) for updates...", file=sys.stderr) total_files_updated = 0 total_files_deleted = 0 total_chunks_added = 0 all_ids_to_delete = [] for vault in vaults: vault_path = Path(vault["path"]) vault_name = vault["name"] print(f"\n🗂️ Checking vault: {vault_name}", file=sys.stderr) print(f" Path: {vault_path}", file=sys.stderr) # Get all markdown files recursively markdown_files = get_markdown_files(str(vault_path)) print(f" Found {len(markdown_files)} markdown files", file=sys.stderr) # Determine which files need updating files_to_update, ids_to_delete = get_files_to_update(collection, markdown_files) all_ids_to_delete.extend(ids_to_delete) if not files_to_update and not ids_to_delete: print(f" ✅ Vault is up to date", file=sys.stderr) continue if ids_to_delete: print(f" Removing {len(set(f.split('_')[0] for f in ids_to_delete if '_' in f))} outdated/deleted files", file=sys.stderr) total_files_deleted += len(set(f.split('_')[0] for f in ids_to_delete if '_' in f)) if files_to_update: print(f" Updating {len(files_to_update)} changed/new files", file=sys.stderr) # Process files that need updating for idx, file_path in enumerate(files_to_update, 1): print(f" Processing file {idx}/{len(files_to_update)}: {file_path.name}", end="\r", file=sys.stderr) documents = chunk_markdown_content(file_path, vault_path, vault_name) if documents: # Prepare data for ChromaDB ids = [doc["id"] for doc in documents] contents = [doc["content"] for doc in documents] metadatas = [doc["metadata"] for doc in documents] # Add to collection collection.add( ids=ids, documents=contents, metadatas=metadatas ) total_files_updated += 1 total_chunks_added += len(documents) # Clear the progress line and show completion print(f" ✅ Updated {len(files_to_update)} files from {vault_name} ", file=sys.stderr) # Delete outdated chunks if any if all_ids_to_delete: print(f"\n🗑️ Removing {len(all_ids_to_delete)} outdated chunks...", file=sys.stderr) try: collection.delete(ids=all_ids_to_delete) except Exception as e: print(f" Warning: Could not delete some chunks: {e}", file=sys.stderr) total_docs = collection.count() print(f"\n✅ Indexing complete!", file=sys.stderr) print(f" Files updated: {total_files_updated}", file=sys.stderr) print(f" Files removed: {total_files_deleted}", file=sys.stderr) print(f" Chunks added: {total_chunks_added}", file=sys.stderr) print(f" Total database size: {total_docs} documents", file=sys.stderr) return client, collection @mcp.tool async def semantic_search( query: str, vault_filter: Optional[str] = None, limit: int = 10 ) -> str: """ Search Obsidian vault notes using semantic similarity. Args: query: The search query to find similar content vault_filter: Optional filter results to a specific vault name limit: Maximum number of results to return (default: 10, max: 20) """ global chroma_collection if not chroma_collection: return "Vector store not initialized. Please restart the server." # Cap limit at 20 limit = min(limit, 20) # Build where clause for filtering where_clause = None if vault_filter: where_clause = {"vault": vault_filter} # Query the collection results = chroma_collection.query( query_texts=[query], n_results=limit, where=where_clause ) # Format the results formatted_results = [] if results["documents"] and results["documents"][0]: for i, doc in enumerate(results["documents"][0]): metadata = results["metadatas"][0][i] if results["metadatas"] else {} distance = results["distances"][0][i] if results["distances"] else None result_text = ( f"**{metadata.get('title', 'Untitled')}** ({metadata.get('vault', 'Unknown vault')})\n" f"Source: {metadata.get('source', 'Unknown')}\n" f"Lines: {metadata.get('start_line', '?')}-{metadata.get('end_line', '?')}\n" f"Score: {1 - distance if distance else 'N/A'}\n\n" f"{doc[:500]}{'...' if len(doc) > 500 else ''}" ) formatted_results.append(result_text) if not formatted_results: return "No results found for your query." # Return results as a single formatted string return "\n\n---\n\n".join(formatted_results) if formatted_results else "No results found." @mcp.tool async def temporal_search( since_date: Optional[str] = None, until_date: Optional[str] = None, query: Optional[str] = None, vault_filter: Optional[str] = None, limit: int = 10 ) -> str: """ Search Obsidian vault notes based on modification dates with optional semantic search. Args: since_date: Start date in YYYY-MM-DD format (inclusive) until_date: End date in YYYY-MM-DD format (inclusive) query: Optional semantic search query to filter results within date range vault_filter: Optional filter results to a specific vault name limit: Maximum number of results to return (default: 10, max: 20) Examples: - Find recently modified notes: since_date="2024-01-01" - Find notes from a specific period: since_date="2024-01-01", until_date="2024-01-31" - Find notes about "LSM trees" modified this month: query="LSM trees", since_date="2024-01-01" """ if not chroma_collection: return "Vector store not initialized. Please restart the server." # Cap limit at 20 limit = min(limit, 20) # Parse dates and convert to timestamps since_timestamp = None until_timestamp = None try: if since_date: since_dt = datetime.strptime(since_date, "%Y-%m-%d") since_timestamp = int(since_dt.timestamp()) if until_date: # Add 23:59:59 to include the entire day until_dt = datetime.strptime(until_date, "%Y-%m-%d") + timedelta(days=1) - timedelta(seconds=1) until_timestamp = int(until_dt.timestamp()) except ValueError as e: return f"Invalid date format. Please use YYYY-MM-DD. Error: {e}" # Build where clause where_conditions = [] # Add date range filters if since_timestamp and until_timestamp: where_conditions.append({'modified': {'$gte': since_timestamp}}) where_conditions.append({'modified': {'$lte': until_timestamp}}) elif since_timestamp: where_conditions.append({'modified': {'$gte': since_timestamp}}) elif until_timestamp: where_conditions.append({'modified': {'$lte': until_timestamp}}) # Add vault filter if specified if vault_filter: where_conditions.append({'vault': vault_filter}) # Combine conditions where_clause = None if len(where_conditions) > 1: where_clause = {'$and': where_conditions} elif where_conditions: where_clause = where_conditions[0] # Execute query if query: # Semantic search within date range results = chroma_collection.query( query_texts=[query], n_results=limit, where=where_clause ) else: # Get all documents in date range (no semantic search) results = chroma_collection.get( limit=limit, where=where_clause, include=['documents', 'metadatas'] ) # Restructure to match query() format if results['documents']: results = { 'documents': [results['documents']], 'metadatas': [results['metadatas']], 'distances': [[None] * len(results['documents'])] # No distances for get() } else: results = {'documents': [[]], 'metadatas': [[]], 'distances': [[]]} # Format the results with enhanced date information formatted_results = [] now = datetime.now() if results["documents"] and results["documents"][0]: # Sort by modification time (most recent first) docs_with_metadata = [] for i, doc in enumerate(results["documents"][0]): metadata = results["metadatas"][0][i] if results["metadatas"] else {} distance = results["distances"][0][i] if results["distances"] and results["distances"][0] else None docs_with_metadata.append((doc, metadata, distance)) # Sort by modified timestamp (most recent first) docs_with_metadata.sort(key=lambda x: x[1].get('modified', 0), reverse=True) for doc, metadata, distance in docs_with_metadata: # Format modification date modified_timestamp = metadata.get('modified', 0) if modified_timestamp: modified_dt = datetime.fromtimestamp(modified_timestamp) modified_str = modified_dt.strftime("%Y-%m-%d %H:%M") # Calculate relative time time_diff = now - modified_dt if time_diff.days == 0: if time_diff.seconds < 3600: relative_time = f"{time_diff.seconds // 60} minutes ago" else: relative_time = f"{time_diff.seconds // 3600} hours ago" elif time_diff.days == 1: relative_time = "Yesterday" elif time_diff.days < 7: relative_time = f"{time_diff.days} days ago" elif time_diff.days < 30: relative_time = f"{time_diff.days // 7} weeks ago" else: relative_time = f"{time_diff.days // 30} months ago" else: modified_str = "Unknown" relative_time = "" result_text = ( f"**{metadata.get('title', 'Untitled')}** ({metadata.get('vault', 'Unknown vault')})\n" f"Modified: {modified_str} ({relative_time})\n" f"Source: {metadata.get('source', 'Unknown')}\n" f"Lines: {metadata.get('start_line', '?')}-{metadata.get('end_line', '?')}\n" ) if distance is not None: result_text += f"Score: {1 - distance}\n" result_text += f"\n{doc[:500]}{'...' if len(doc) > 500 else ''}" formatted_results.append(result_text) if not formatted_results: date_desc = [] if since_date: date_desc.append(f"since {since_date}") if until_date: date_desc.append(f"until {until_date}") date_str = " and ".join(date_desc) if date_desc else "" if query: return f"No results found for '{query}' {date_str}.".strip() else: return f"No documents found {date_str}.".strip() if date_str else "No documents found." # Return results as a single formatted string return "\n\n---\n\n".join(formatted_results) @mcp.tool async def reindex_vaults() -> str: """ Manually trigger a re-index of all configured Obsidian vaults. This will: - Check all configured vaults for new, modified, or deleted files - Update the vector database with any changes - Return a summary of the indexing operation """ global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: return "No vaults configured. Please configure vaults first using 'mcp-obsidian configure'." try: # Run re-indexing start_time = time.time() # Run the update in a thread to avoid blocking the async context loop = asyncio.get_event_loop() new_client, new_collection = await loop.run_in_executor( None, initialize_vector_store, vaults ) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection elapsed_time = time.time() - start_time # Get statistics total_docs = new_collection.count() return ( f"✅ Manual re-indexing completed successfully!\n\n" f"📊 Statistics:\n" f"• Vaults indexed: {len(vaults)}\n" f"• Total documents in database: {total_docs}\n" f"• Time taken: {elapsed_time:.2f} seconds\n\n" f"The vector store has been updated with the latest changes from your Obsidian vaults." ) except Exception as e: return f"❌ Re-indexing failed with error: {str(e)}\n\nPlease check your vault configurations and try again." class VaultChangeHandler(FileSystemEventHandler): """Handle file system changes in Obsidian vaults""" def __init__(self, vaults: List[Dict], update_callback): self.vaults = vaults self.update_callback = update_callback self.last_update = 0 self.pending_update = False self.debounce_seconds = 30 # Wait 30 seconds after last change before updating def on_any_event(self, event: FileSystemEvent): """Handle any file system event""" # Only care about markdown files if not event.src_path.endswith('.md'): return # Skip hidden files/directories if any(part.startswith('.') for part in Path(event.src_path).parts): return # Mark that we have a pending update self.pending_update = True self.last_update = time.time() # Log the change event_type = event.event_type file_name = Path(event.src_path).name print(f"\n📝 Detected {event_type}: {file_name}", file=sys.stderr) async def monitor_vaults(vaults: List[Dict], update_interval: int = 10): """Monitor vaults for changes and trigger re-indexing""" global chroma_client, chroma_collection # Create file system event handler handler = VaultChangeHandler(vaults, lambda: initialize_vector_store(vaults)) # Set up observers for each vault observers = [] for vault in vaults: vault_path = Path(vault["path"]) if vault_path.exists(): observer = Observer() observer.schedule(handler, str(vault_path), recursive=True) observer.start() observers.append(observer) print(f"👁️ Monitoring vault: {vault['name']}", file=sys.stderr) try: while True: await asyncio.sleep(update_interval) # Check if we have pending updates and enough time has passed if handler.pending_update: time_since_last = time.time() - handler.last_update if time_since_last >= handler.debounce_seconds: print(f"\n🔄 Re-indexing vaults after changes...", file=sys.stderr) handler.pending_update = False # Run the update in a thread to avoid blocking loop = asyncio.get_event_loop() new_client, new_collection = await loop.run_in_executor( None, initialize_vector_store, vaults ) # Update global references atomically with chroma_ref_lock: chroma_client = new_client chroma_collection = new_collection print("✅ Re-indexing complete! Ready for queries.", file=sys.stderr) except asyncio.CancelledError: # Clean shutdown for observer in observers: observer.stop() observer.join() raise def serve(): """Run the MCP server.""" global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: print("No vaults configured. Run with 'configure' first.", file=sys.stderr) sys.exit(1) print("🚀 Starting MCP Obsidian server...", file=sys.stderr) # Initialize vector store chroma_client, chroma_collection = initialize_vector_store(vaults) print("\n📡 MCP server ready!", file=sys.stderr) print("Vector store initialized and ready for queries.", file=sys.stderr) # Start the file system monitor in a background thread # Since FastMCP doesn't support startup/shutdown hooks, we'll use a simpler approach import threading def start_monitor(): """Start monitoring in a separate thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(monitor_vaults(vaults)) except KeyboardInterrupt: pass # Start monitoring in a background thread monitor_thread = threading.Thread(target=start_monitor, daemon=True) monitor_thread.start() print("👁️ File system monitoring started", file=sys.stderr) # Run the FastMCP server (it handles its own event loop) mcp.run() def index_vaults(): """Rebuild search index for all configured vaults""" global chroma_client, chroma_collection config = load_config() vaults = config.get("vaults", []) if not vaults: print("No vaults configured. Run 'mcp-obsidian configure' first.", file=sys.stderr) sys.exit(1) print("🔄 Rebuilding search index for all configured vaults...", file=sys.stderr) print(f"Found {len(vaults)} vault(s) to index.\n", file=sys.stderr) # Initialize vector store (this will rebuild the entire index) chroma_client, chroma_collection = initialize_vector_store(vaults) # Show results if chroma_collection: doc_count = chroma_collection.count() print(f"\n✅ Index rebuilt successfully!", file=sys.stderr) print(f"📊 Total documents indexed: {doc_count}", file=sys.stderr) # Show vaults that were indexed print("\n📁 Vaults indexed:", file=sys.stderr) for vault in vaults: print(f" - {vault['name']} ({vault['path']})", file=sys.stderr) else: print("❌ Failed to rebuild index.", file=sys.stderr) sys.exit(1) def load_config(): """Load configuration""" if not CONFIG_FILE.exists(): return {"vaults": []} with open(CONFIG_FILE, 'r') as f: return json.load(f) def main(): """Main entry point for the MCP Obsidian server.""" parser = argparse.ArgumentParser(description="MCP Obsidian Server") subparsers = parser.add_subparsers(dest="command", help="Commands") # Configure subcommand subparsers.add_parser("configure", help="Configure vault paths") # Index subcommand subparsers.add_parser("index", help="Rebuild search index for all configured vaults") # Parse arguments args = parser.parse_args() if args.command == "configure": configure() elif args.command == "index": index_vaults() else: # Default to server mode serve() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alexhholmes/mcp-obsidian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server