Skip to main content
Glama

Personal Assistant MCP Server

by lemannrus
obsidian.py26.5 kB
import json from pathlib import Path from typing import Optional, List, Tuple import numpy as np from obsidiantools.api import Vault from config import OBSIDIAN_VAULT_PATH, SEMANTIC_SEARCH_ENABLED, EMBEDDINGS_PATH, OBSIDIAN_DEFAULT_FOLDER, \ SIMILARITY_THRESHOLD, MODEL_TOKEN_LIMIT, CHUNK_SIZE_TOKENS from services.logger import logger from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from transformers import GPT2TokenizerFast _cached_vault: Optional[Vault] = None _cached_embeddings: Optional[np.ndarray] = None _cached_paths: Optional[List[str]] = None _semantic_model: Optional[SentenceTransformer] = None _tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") # Keep tokenizer global def _get_vault() -> Vault: """ Initializes and returns a cached ObsidianVault instance. Reads OBSIDIAN_VAULT_PATH from config. Returns: An initialized ObsidianVault instance. Raises: FileNotFoundError: If the vault path is not a valid directory. ImportError: If obsidiantools is not installed. """ global _cached_vault try: if _cached_vault is None: if not OBSIDIAN_VAULT_PATH: logger.error("OBSIDIAN_VAULT_PATH is not set in config.") raise ValueError("OBSIDIAN_VAULT_PATH is not configured.") vault_path = Path(OBSIDIAN_VAULT_PATH) if not vault_path.is_dir(): logger.error(f"Obsidian vault path is not a valid directory: {vault_path}") raise FileNotFoundError(f"Obsidian vault not found at {vault_path}") try: logger.info(f"Initializing ObsidianVault at: {vault_path}") # safe_mode=True can prevent execution of dataview/js, often safer for API usage _cached_vault = Vault(vault_path) logger.info(f"ObsidianVault initialized.") except ImportError: logger.error("obsidiantools library not found. Please install it: pip install obsidiantools") raise except Exception as e: logger.error(f"Failed to initialize ObsidianVault: {e}", exc_info=True) raise RuntimeError(f"Failed to initialize ObsidianVault: {e}") from e return _cached_vault except Exception as e: logger.error(f"Failed to get ObsidianVault: {e}", exc_info=True) raise RuntimeError(f"Failed to get ObsidianVault: {e}") from e def resolve_note_path( title: str, folder_name: Optional[str] = None, ensure_exists: bool = False, ) -> tuple[Path, str] | tuple[None, str] | None: """ Resolves the absolute path for a note and its display folder name. Handles finding existing notes or determining the path for creation. Uses vault.markdown_notes for efficient searching of existing notes. Args: title: The title of the note (filename without extension). folder_name: Optional subfolder name within the vault. ensure_exists: If True, only returns a path if the note *file* already exists. If False, returns the intended path even if it doesn't exist. Returns: Tuple (note_path, display_folder): - note_path: Path object to the note file, or None if not found when ensure_exists=True. - display_folder: String name of the folder for display purposes. Raises: ValueError: If title is empty or vault is invalid. """ try: vault = _get_vault() if not title: raise ValueError("Note title cannot be empty.") if not isinstance(vault, Vault): raise TypeError("Invalid vault object provided.") filename = f"{title}.md" target_dir: Path display_folder: str logger.debug(f"Resolving path for title='{title}', folder='{folder_name}', ensure_exists={ensure_exists}") if folder_name: relative_folder_path = Path(folder_name.strip('/\\')) target_dir = vault.dirpath / relative_folder_path display_folder = relative_folder_path.as_posix() else: target_dir = vault.dirpath display_folder = OBSIDIAN_DEFAULT_FOLDER intended_path = target_dir / filename if intended_path.exists() and intended_path.is_file(): logger.debug(f"Note found at specific path: {intended_path}") note_exists_in_index = title in vault.md_file_index indexed_path = vault.md_file_index.get(title) if note_exists_in_index and indexed_path == intended_path: return intended_path, display_folder else: logger.warning(f"Path {intended_path} exists but not in vault.markdown_notes. Treating as found.") return intended_path, display_folder if folder_name and ensure_exists: logger.debug(f"Note not found at specific path '{intended_path}' and ensure_exists=True.") return None, display_folder logger.debug(f"Note not at '{intended_path}', searching known markdown notes globally.") found_globally: Optional[Path] = None for note_path in vault.md_file_index.values(): if note_path.name == filename: found_globally = note_path logger.debug(f"Found matching filename in vault.markdown_notes: {found_globally}") break if found_globally: actual_path = found_globally try: actual_relative_path = actual_path.parent.relative_to(vault.dirpath) actual_display_folder = actual_relative_path.as_posix() if actual_display_folder == '.': actual_display_folder = "vault root" except ValueError: logger.warning(f"Found path {actual_path} is outside vault root {vault.dirpath}?") actual_display_folder = "[external?]" logger.debug(f"Note found globally at: {actual_path} (display folder: '{actual_display_folder}')") return actual_path, actual_display_folder else: logger.debug(f"Note '{filename}' not found anywhere in the vault's known markdown notes.") if ensure_exists: return None, display_folder else: logger.debug(f"Returning intended path for creation: {intended_path}") return intended_path, display_folder except Exception as e: logger.error(f"Unexpected error resolving note path: {e}", exc_info=True) raise RuntimeError(f"Unexpected error resolving note path: {e}") from e def tokenize_text(text: str) -> List[int]: """Tokenizes text using the preloaded tokenizer.""" logger.debug(f"Tokenizing text of length {len(text)}") return _tokenizer.encode(text) def split_text_into_chunks(text: str, max_tokens: int) -> List[str]: """Splits text into chunks based on word boundaries and a maximum token count.""" logger.debug(f"Splitting text into chunks with max_tokens={max_tokens}") words = text.split() chunks, current_chunk_words = [], [] current_token_count = 0 for word in words: word_token_count = len(_tokenizer.encode(word)) if current_token_count + word_token_count + (1 if current_chunk_words else 0) > max_tokens: if current_chunk_words: chunks.append(" ".join(current_chunk_words)) current_chunk_words = [word] current_token_count = word_token_count if word_token_count > max_tokens: logger.warning(f"Word '{word[:30]}...' exceeds max_tokens ({max_tokens}), chunk may be oversized.") chunks.append(" ".join(current_chunk_words)) current_chunk_words = [] current_token_count = 0 else: current_chunk_words.append(word) current_token_count += word_token_count + (1 if len(current_chunk_words) > 1 else 0) if current_chunk_words: chunks.append(" ".join(current_chunk_words)) logger.debug(f"Split into {len(chunks)} chunks") return chunks def summarize_chunk(text_chunk: str) -> str: """Creates a simple summary of a text chunk (first 200 chars).""" logger.debug(f"Summarizing chunk with {len(text_chunk)} characters") return f"Summary: {text_chunk[:200]}..." def create_note( title: str, folder_name: Optional[str] = None, content: Optional[str] = None ) -> str: """Creates a new markdown note.""" note_content = content if content is not None else "" logger.debug(f"Creating note: title='{title}', folder_name='{folder_name}'") try: note_path, display_folder = resolve_note_path(title, folder_name, ensure_exists=False) if note_path.exists(): logger.warning(f"Note '{note_path}' already exists.") return f"Note '{title}' already exists in '{display_folder}'." logger.debug(f"Ensuring directory exists: '{note_path.parent}'") note_path.parent.mkdir(parents=True, exist_ok=True) note_path.write_text(note_content, encoding="utf-8") logger.info(f"Successfully created note at '{note_path}'") return f"Note '{title}' created successfully in '{display_folder}'." except (FileNotFoundError, ValueError, TypeError, RuntimeError, OSError) as e: logger.error(f"Failed to create note '{title}': {e}", exc_info=isinstance(e, (RuntimeError, OSError))) return f"Failed to create note '{title}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error creating note '{title}': {e}", exc_info=True) return f"Failed to create note '{title}'. Unexpected error: {e}" def read_note(title: str, folder_name: Optional[str] = None) -> str: """Reads note content, summarizes if too large based on MODEL_TOKEN_LIMIT.""" logger.debug(f"Reading note: title='{title}', folder_name='{folder_name}'") try: note_path, display_folder = resolve_note_path(title, folder_name, ensure_exists=True) if note_path is None: folder_info = f" in folder '{display_folder}'" if folder_name else "" logger.warning(f"Note '{title}'{folder_info} not found.") return f"Note '{title}'{folder_info} not found." logger.debug(f"Reading content from: {note_path}") full_text = note_path.read_text(encoding="utf-8") num_tokens = len(tokenize_text(full_text)) if num_tokens <= MODEL_TOKEN_LIMIT: return full_text else: chunks = split_text_into_chunks(full_text, CHUNK_SIZE_TOKENS) logger.info( f"Note '{title}' is too large ({num_tokens} tokens > {MODEL_TOKEN_LIMIT}). Returning summary." ) summarized_chunks = [summarize_chunk(chunk) for chunk in chunks] full_summary = "\n\n".join(summarized_chunks) return f"Note '{title}' in '{display_folder}' is too large, providing summarized version:\n\n{full_summary}" except (FileNotFoundError, ValueError, TypeError, RuntimeError, OSError) as e: logger.error(f"Failed to read note '{title}': {e}", exc_info=isinstance(e, (RuntimeError, OSError))) return f"Failed to read note '{title}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error reading note '{title}': {e}", exc_info=True) return f"Failed to read note '{title}'. Unexpected error: {e}" def update_note(title: str, new_content: str, folder_name: Optional[str] = None) -> str: """Updates the content of an existing note.""" logger.debug(f"Updating note: title='{title}', folder_name='{folder_name}'") try: note_path, display_folder = resolve_note_path(title, folder_name, ensure_exists=True) if note_path is None: folder_info = f" in folder '{display_folder}'" if folder_name else "" logger.warning(f"Note '{title}'{folder_info} not found for update.") return f"Note '{title}'{folder_info} not found." logger.debug(f"Writing update to: {note_path}") note_path.write_text(new_content, encoding="utf-8") return f"Note '{title}' in '{display_folder}' updated." except (FileNotFoundError, ValueError, TypeError, RuntimeError, OSError) as e: logger.error(f"Failed to update note '{title}': {e}", exc_info=isinstance(e, (RuntimeError, OSError))) return f"Failed to update note '{title}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error updating note '{title}': {e}", exc_info=True) return f"Failed to update note '{title}'. Unexpected error: {e}" def delete_note(title: str, folder_name: Optional[str] = None) -> str: """Deletes an existing note.""" logger.debug(f"Deleting note: title='{title}', folder_name='{folder_name}'") try: note_path, display_folder = resolve_note_path(title, folder_name, ensure_exists=True) if note_path is None: folder_info = f" in folder '{display_folder}'" if folder_name else "" logger.warning(f"Note '{title}'{folder_info} not found for deletion.") return f"Note '{title}'{folder_info} not found." logger.debug(f"Deleting file: {note_path}") note_path.unlink() return f"Note '{title}' in '{display_folder}' deleted." except (FileNotFoundError, ValueError, TypeError, RuntimeError, OSError) as e: logger.error(f"Failed to delete note '{title}': {e}", exc_info=isinstance(e, (RuntimeError, OSError))) return f"Failed to delete note '{title}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error deleting note '{title}': {e}", exc_info=True) return f"Failed to delete note '{title}'. Unexpected error: {e}" def load_vectors(json_path: Path) -> Tuple[np.ndarray, List[str]]: """Loads embeddings and paths from JSON.""" logger.debug(f"Loading vectors from: {json_path}") if not json_path.is_file(): logger.error(f"Embeddings JSON file not found: {json_path}") raise FileNotFoundError(f"Embeddings JSON file not found: {json_path}") try: with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) if "vectors" not in data or not isinstance(data["vectors"], list): raise ValueError("Invalid format in embeddings JSON file: 'vectors' key missing or not a list.") embeddings = [] paths = [] for item in data["vectors"]: if "embedding" in item and "path" in item: embeddings.append(item["embedding"]) paths.append(item["path"]) else: logger.warning("Skipping invalid item in embeddings JSON.") if not embeddings: raise ValueError("No valid embeddings found in JSON file.") logger.info(f"Loaded {len(embeddings)} vectors from {json_path}") return np.array(embeddings, dtype=np.float32), paths except json.JSONDecodeError as e: logger.error(f"Error decoding JSON from {json_path}: {e}") raise ValueError(f"Invalid JSON in embeddings file: {e}") from e except Exception as e: logger.error(f"Failed to load vectors from {json_path}: {e}", exc_info=True) raise def initialize_semantic_search( model_name: str = "nomic-ai/nomic-embed-text-v1.5", # Or get from config force_reload: bool = False ): """Initializes semantic search model and loads vectors.""" global _cached_embeddings, _cached_paths, _semantic_model logger.debug(f"Initializing semantic search with model '{model_name}'") vector_json_path = OBSIDIAN_VAULT_PATH / EMBEDDINGS_PATH logger.info(f"Embeddings path: {vector_json_path}") if not force_reload and _cached_embeddings is not None and _cached_paths is not None and _semantic_model is not None: logger.info("Semantic search components already initialized.") return True try: logger.info("Loading embeddings...") _cached_embeddings, _cached_paths = load_vectors(vector_json_path) logger.info(f"Loading sentence transformer model: {model_name}...") _semantic_model = SentenceTransformer(model_name, trust_remote_code=True) logger.info( f"Semantic search initialized: {_cached_embeddings.shape[0]} embeddings loaded." ) return True except (FileNotFoundError, ValueError, ImportError) as e: logger.error(f"Semantic search initialization failed: {e}") _cached_embeddings, _cached_paths, _semantic_model = None, None, None return False except Exception as e: logger.error(f"Unexpected error during semantic search initialization: {e}", exc_info=True) _cached_embeddings, _cached_paths, _semantic_model = None, None, None return False def recursive_filename_search(query: str, vault: Vault) -> List[str]: """Searches markdown note filenames within the vault.""" logger.debug(f"Recursively searching filenames for query: '{query}' using obsidiantools") matches = [] query_lower = query.lower() try: for note_path in vault.dirpath.rglob("*.md"): if query_lower in note_path.name.lower(): matches.append(str(note_path)) except Exception as e: logger.error(f"Error during recursive filename search: {e}", exc_info=True) return matches def semantic_search(query: str, top_k: int = 5) -> List[str]: """Performs semantic search using loaded model and embeddings.""" global _cached_embeddings, _cached_paths, _semantic_model logger.debug(f"Performing semantic search for query: '{query}'") if not SEMANTIC_SEARCH_ENABLED: logger.warning("Semantic search is disabled in config.") return [] # Ensure initialization if _cached_embeddings is None or _cached_paths is None or _semantic_model is None: logger.warning("Semantic search not initialized. Attempting initialization...") if not initialize_semantic_search(): logger.error("Cannot perform semantic search: Initialization failed.") return [] try: vault = _get_vault() query_vec = _semantic_model.encode([query], convert_to_numpy=True) similarities = cosine_similarity(query_vec, _cached_embeddings)[0] effective_top_k = min(top_k, len(_cached_paths)) if effective_top_k <= 0: return [] top_indices = np.argsort(similarities)[-effective_top_k:][::-1] top_scores = similarities[top_indices] logger.info(f"Top {effective_top_k} semantic matches scores: {top_scores}") strong_match_indices = [idx for idx, score in zip(top_indices, top_scores) if score >= SIMILARITY_THRESHOLD] if not strong_match_indices: logger.info( f"No strong semantic matches found (threshold={SIMILARITY_THRESHOLD}). Falling back to filename search." ) matched_paths = recursive_filename_search(query, vault) return matched_paths[:top_k] else: logger.info(f"Found {len(strong_match_indices)} strong semantic matches.") return [_cached_paths[i] for i in strong_match_indices] except Exception as e: logger.error(f"Error during semantic search query: {e}", exc_info=True) return [] def simple_search_by_keyword(keyword: str) -> List[str]: """Performs keyword search in markdown note filenames and content using obsidiantools.""" logger.debug(f"Simple keyword search for: '{keyword}'") matching_notes_relative: List[str] = [] try: vault = _get_vault() keyword_lower = keyword.lower() for note_path in vault.dirpath.rglob("*.md"): try: match_found = False if keyword_lower in note_path.name.lower(): match_found = True if not match_found: content = note_path.read_text(encoding="utf-8") if keyword_lower in content.lower(): match_found = True if match_found: relative_path = str(note_path.relative_to(vault.dirpath)) matching_notes_relative.append(relative_path) logger.debug(f"Keyword found in: {relative_path}") except Exception as e: logger.warning(f"Failed to process file {note_path} during keyword search: {e}") except (FileNotFoundError, ValueError, RuntimeError) as e: logger.error(f"Failed to perform keyword search: {e}") except Exception as e: logger.error(f"Unexpected error during keyword search: {e}", exc_info=True) return sorted(list(set(matching_notes_relative))) def search_notes_by_content(query: str, top_k: int = 5) -> List[str]: """Searches notes by content/keyword. Uses semantic or simple search based on config.""" logger.debug(f"Unified search for: '{query}', top_k={top_k}") if SEMANTIC_SEARCH_ENABLED: logger.info("Using semantic search.") results = semantic_search(query, top_k) logger.info(f"Semantic search returned {len(results)} results.") return results else: logger.info("Using simple keyword search.") results = simple_search_by_keyword(query) logger.info(f"Simple keyword search returned {len(results)} results.") return results[:top_k] def create_folder(folder_name: str) -> str: """Creates a folder within the vault.""" logger.debug(f"Creating folder: '{folder_name}'") if not folder_name: return "Folder name cannot be empty." try: vault = _get_vault() relative_folder_path = Path(folder_name.strip('/\\')) folder_path = vault.dirpath / relative_folder_path display_name = relative_folder_path.as_posix() if folder_path.exists(): if folder_path.is_dir(): logger.warning(f"Folder '{display_name}' already exists.") return f"Folder '{display_name}' already exists." else: logger.error(f"Path '{display_name}' exists but is not a folder.") return f"Path '{display_name}' exists but is not a folder." logger.debug(f"Creating directory: {folder_path}") folder_path.mkdir(parents=True, exist_ok=True) return f"Folder '{display_name}' created." except (FileNotFoundError, ValueError, RuntimeError, OSError) as e: logger.error(f"Failed to create folder '{folder_name}': {e}", exc_info=isinstance(e, (RuntimeError, OSError))) return f"Failed to create folder '{folder_name}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error creating folder '{folder_name}': {e}", exc_info=True) return f"Failed to create folder '{folder_name}'. Unexpected error: {e}" def delete_folder(folder_name: str) -> str: """Deletes an *empty* folder from the vault.""" logger.debug(f"Deleting folder: '{folder_name}'") display_name = "" if not folder_name: return "Folder name cannot be empty." try: vault = _get_vault() relative_folder_path = Path(folder_name.strip('/\\')) folder_path = vault.dirpath / relative_folder_path display_name = relative_folder_path.as_posix() if not folder_path.exists(): logger.warning(f"Folder '{display_name}' not found for deletion.") return f"Folder '{display_name}' not found." if not folder_path.is_dir(): logger.warning(f"Path '{display_name}' is not a folder.") return f"Path '{display_name}' is not a folder." logger.debug(f"Attempting to delete directory: {folder_path}") folder_path.rmdir() return f"Folder '{display_name}' deleted." except OSError as e: if "Directory not empty" in str(e) or "[Errno 39]" in str(e) or "[WinError 145]" in str(e) : logger.warning(f"Folder '{display_name}' is not empty.") return f"Folder '{display_name}' is not empty. Delete content first." else: logger.error(f"OS Error deleting folder '{display_name}': {e}", exc_info=True) return f"Failed to delete folder '{display_name}'. OS Error: {e}" except (FileNotFoundError, ValueError, RuntimeError) as e: logger.error(f"Failed to delete folder '{display_name}': {e}", exc_info=isinstance(e, RuntimeError)) return f"Failed to delete folder '{display_name}'. Error: {e}" except Exception as e: logger.error(f"Unexpected error deleting folder '{display_name}': {e}", exc_info=True) return f"Failed to delete folder '{display_name}'. Unexpected error: {e}" def search_folders(keyword: str) -> List[str]: """Searches folder names within the vault.""" logger.debug(f"Searching folders with keyword: '{keyword}'") matching_folders_relative: List[str] = [] try: vault = _get_vault() keyword_lower = keyword.lower() for item in vault.dirpath.rglob("*"): if item.is_dir() and keyword_lower in item.name.lower(): relative_path = str(item.relative_to(vault.dirpath)) matching_folders_relative.append(relative_path) except (FileNotFoundError, ValueError, RuntimeError) as e: logger.error(f"Failed to search folders: {e}") except Exception as e: logger.error(f"Unexpected error searching folders: {e}", exc_info=True) return sorted(list(set(matching_folders_relative))) def list_folders() -> List[str]: """Lists all folders within the Obsidian vault.""" logger.debug("Listing all folders") folders_relative: List[str] = [] try: vault = _get_vault() for item in vault.dirpath.rglob("*"): if item.is_dir(): relative_path = str(item.relative_to(vault.dirpath)) folders_relative.append(relative_path) except (FileNotFoundError, ValueError, RuntimeError) as e: logger.error(f"Failed to list folders: {e}") except Exception as e: logger.error(f"Unexpected error listing folders: {e}", exc_info=True) return sorted([f for f in list(set(folders_relative)) if f != '.']) if SEMANTIC_SEARCH_ENABLED: initialize_semantic_search()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lemannrus/personal-assistant-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server