Skip to main content
Glama

Obsidian MCP Server

by suhailnajeeb
persistent_index.pyโ€ข28.6 kB
"""Persistent search index using SQLite for Obsidian vault.""" import os import hashlib import json import asyncio import aiosqlite from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime import logging logger = logging.getLogger(__name__) class PersistentSearchIndex: """SQLite-based persistent search index for efficient vault searching.""" def __init__(self, vault_path: Path, index_path: Optional[Path] = None): """ Initialize persistent search index. Args: vault_path: Path to the Obsidian vault index_path: Path to store the SQLite database (defaults to vault/.obsidian/search-index.db) """ self.vault_path = vault_path # Default index location in .obsidian folder if index_path is None: obsidian_dir = vault_path / ".obsidian" obsidian_dir.mkdir(exist_ok=True) self.index_path = obsidian_dir / "mcp-search-index.db" else: self.index_path = index_path self.db: Optional[aiosqlite.Connection] = None self._lock = asyncio.Lock() async def initialize(self): """Initialize database connection and create tables if needed.""" self.db = await aiosqlite.connect(str(self.index_path)) # Enable WAL mode for better concurrent access await self.db.execute("PRAGMA journal_mode=WAL") # Create tables await self.db.execute(""" CREATE TABLE IF NOT EXISTS file_index ( filepath TEXT PRIMARY KEY, content TEXT NOT NULL, content_lower TEXT NOT NULL, mtime REAL NOT NULL, size INTEGER NOT NULL, content_hash TEXT NOT NULL, last_indexed REAL NOT NULL, metadata TEXT, line_offsets TEXT ) """) # Check if we need to add line_offsets column (for existing databases) cursor = await self.db.execute("PRAGMA table_info(file_index)") columns = await cursor.fetchall() column_names = [col[1] for col in columns] if 'line_offsets' not in column_names: await self.db.execute("ALTER TABLE file_index ADD COLUMN line_offsets TEXT") logger.info("Added line_offsets column to existing database") # Create properties table for efficient property searches await self.db.execute(""" CREATE TABLE IF NOT EXISTS file_properties ( filepath TEXT NOT NULL, property_name TEXT NOT NULL, property_value TEXT, property_type TEXT, PRIMARY KEY (filepath, property_name), FOREIGN KEY (filepath) REFERENCES file_index(filepath) ON DELETE CASCADE ) """) # Create indexes for faster searching await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_mtime ON file_index(mtime) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_size ON file_index(size) """) # Create indexes for property searches await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_property_name ON file_properties(property_name) """) await self.db.execute(""" CREATE INDEX IF NOT EXISTS idx_property_value ON file_properties(property_value) """) # Create FTS5 virtual table for full-text search await self.db.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS file_search USING fts5( filepath UNINDEXED, content, content_lower, tokenize='porter unicode61' ) """) await self.db.commit() async def close(self): """Close database connection.""" if self.db: await self.db.close() self.db = None async def get_file_info(self, filepath: str) -> Optional[Dict[str, Any]]: """Get cached file information.""" async with self._lock: cursor = await self.db.execute( "SELECT mtime, size, content_hash, last_indexed FROM file_index WHERE filepath = ?", (filepath,) ) row = await cursor.fetchone() if row: return { "mtime": row[0], "size": row[1], "content_hash": row[2], "last_indexed": row[3] } return None async def needs_update(self, filepath: str, current_mtime: float, current_size: int) -> bool: """Check if a file needs to be re-indexed.""" file_info = await self.get_file_info(filepath) if not file_info: return True # Check if file has been modified if file_info["mtime"] != current_mtime or file_info["size"] != current_size: return True return False def _compute_hash(self, content: str) -> str: """Compute SHA256 hash of content.""" return hashlib.sha256(content.encode('utf-8')).hexdigest() def _determine_property_type(self, value: Any) -> str: """Determine the type of a property value.""" if isinstance(value, bool): return 'boolean' elif isinstance(value, int): return 'number' elif isinstance(value, float): return 'number' elif isinstance(value, list): return 'list' elif isinstance(value, dict): return 'object' elif isinstance(value, str): # Check if it's a date-like string try: datetime.fromisoformat(value.replace('Z', '+00:00')) return 'date' except: return 'text' else: return 'text' def _calculate_line_offsets(self, content: str) -> List[int]: """Calculate byte offsets of each line start.""" line_offsets = [0] for i, char in enumerate(content): if char == '\n': line_offsets.append(i + 1) return line_offsets async def index_file(self, filepath: str, content: str, mtime: float, size: int, metadata: Optional[Dict] = None): """Index a single file with its content and properties.""" content_hash = self._compute_hash(content) content_lower = content.lower() metadata_json = json.dumps(metadata) if metadata else None # Calculate line offsets for efficient line number lookups line_offsets = self._calculate_line_offsets(content) line_offsets_json = json.dumps(line_offsets) now = datetime.now().timestamp() async with self._lock: # Update main index await self.db.execute(""" INSERT OR REPLACE INTO file_index (filepath, content, content_lower, mtime, size, content_hash, last_indexed, metadata, line_offsets) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (filepath, content, content_lower, mtime, size, content_hash, now, metadata_json, line_offsets_json)) # Update FTS index await self.db.execute( "DELETE FROM file_search WHERE filepath = ?", (filepath,) ) await self.db.execute( "INSERT INTO file_search (filepath, content, content_lower) VALUES (?, ?, ?)", (filepath, content, content_lower) ) # Update properties if metadata contains frontmatter if metadata and 'frontmatter' in metadata: # Clear existing properties for this file await self.db.execute( "DELETE FROM file_properties WHERE filepath = ?", (filepath,) ) # Insert new properties frontmatter = metadata['frontmatter'] for prop_name, prop_value in frontmatter.items(): # Determine property type prop_type = self._determine_property_type(prop_value) # Convert value to string for storage if isinstance(prop_value, (list, dict)): prop_value_str = json.dumps(prop_value) else: prop_value_str = str(prop_value) await self.db.execute(""" INSERT INTO file_properties (filepath, property_name, property_value, property_type) VALUES (?, ?, ?, ?) """, (filepath, prop_name, prop_value_str, prop_type)) await self.db.commit() async def remove_file(self, filepath: str): """Remove a file from the index.""" async with self._lock: await self.db.execute("DELETE FROM file_index WHERE filepath = ?", (filepath,)) await self.db.execute("DELETE FROM file_search WHERE filepath = ?", (filepath,)) await self.db.commit() async def search_content(self, query: str, limit: int = 50) -> List[Tuple[str, str]]: """ Search for content using FTS5. Returns list of (filepath, snippet) tuples. """ # Use FTS5 for efficient full-text search cursor = await self.db.execute(""" SELECT filepath, snippet(file_search, 1, '<b>', '</b>', '...', 32) FROM file_search WHERE file_search MATCH ? ORDER BY rank LIMIT ? """, (query, limit)) results = await cursor.fetchall() return results async def search_simple(self, query: str, limit: int = 50) -> Dict[str, Any]: """ Simple substring search with total count. Returns dictionary with results and metadata. """ query_lower = query.lower() search_pattern = f"%{query_lower}%" # First get total count count_cursor = await self.db.execute(""" SELECT COUNT(*) FROM file_index WHERE content_lower LIKE ? """, (search_pattern,)) total_count = (await count_cursor.fetchone())[0] # Then get limited results cursor = await self.db.execute(""" SELECT filepath, content, mtime, size FROM file_index WHERE content_lower LIKE ? LIMIT ? """, (search_pattern, limit)) results = [] async for row in cursor: results.append({ "filepath": row[0], "content": row[1], "mtime": row[2], "size": row[3] }) return { "results": results, "total_count": total_count, "limit": limit, "truncated": len(results) < total_count } async def search_regex(self, pattern: str, flags: int = 0, limit: int = 50, context_length: int = 100, max_parallel: int = 10) -> List[Dict[str, Any]]: """ Search using regular expressions with efficient streaming and parallel processing. Args: pattern: Regular expression pattern flags: Regex flags (e.g., re.IGNORECASE) limit: Maximum number of results context_length: Characters to show around match max_parallel: Maximum number of files to process in parallel Returns: List of search results with matches and context """ import re # Compile regex pattern try: regex = re.compile(pattern, flags) except re.error as e: raise ValueError(f"Invalid regex pattern: {e}") # Check if we can use FTS5 pre-filtering literal_prefix = self._extract_literal_prefix(pattern) # Build query - order by size for faster initial results if literal_prefix and len(literal_prefix) >= 3: # Escape special characters for FTS5 # FTS5 requires double quotes around phrases with special characters fts5_query = f'"{literal_prefix}"' # Use FTS5 to pre-filter files containing the literal prefix query = """ SELECT f.filepath, f.content, f.mtime, f.size, f.line_offsets FROM file_index f JOIN file_search s ON f.filepath = s.filepath WHERE file_search MATCH ? ORDER BY f.size ASC, f.mtime DESC """ params = (fts5_query,) else: # Full scan, but ordered by size query = """ SELECT filepath, content, mtime, size, line_offsets FROM file_index ORDER BY size ASC, mtime DESC """ params = () cursor = await self.db.execute(query, params) rows = await cursor.fetchall() # Process files in batches for parallel execution results = [] total_results = 0 batch_size = max_parallel for i in range(0, len(rows), batch_size): if total_results >= limit: break batch = rows[i:i + batch_size] # Process batch in parallel batch_tasks = [] for row in batch: if total_results >= limit: break filepath, content, mtime, size, line_offsets_json = row # Create task for processing this file task = self._process_file_regex( filepath, content, size, line_offsets_json, regex, context_length ) batch_tasks.append(task) # Wait for batch to complete batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) # Collect results for result in batch_results: if isinstance(result, Exception): logger.error(f"Error processing file: {result}") continue if result and result['match_contexts']: results.append({ "filepath": result['filepath'], "match_count": len(result['match_contexts']), "matches": result['match_contexts'], "score": min(len(result['match_contexts']) / 5.0 + 1.0, 5.0) }) total_results += 1 if total_results >= limit: break return results[:limit] async def _process_file_regex(self, filepath: str, content: str, size: int, line_offsets_json: Optional[str], regex, context_length: int, max_matches: int = 5) -> Dict[str, Any]: """Process a single file for regex matches (for parallel execution).""" # Parse line offsets if available try: line_offsets = json.loads(line_offsets_json) if line_offsets_json else None except: line_offsets = None # For large files, use streaming approach if size > 1024 * 1024: # 1MB threshold match_contexts = await self._search_large_file_streaming( content, regex, line_offsets, context_length, max_matches ) else: # For smaller files, use the optimized standard approach match_contexts = self._search_file_content( content, regex, line_offsets, context_length, max_matches ) return { 'filepath': filepath, 'match_contexts': match_contexts } def _extract_literal_prefix(self, pattern: str) -> Optional[str]: """Extract a literal prefix from regex pattern for FTS5 pre-filtering.""" # Simple extraction - look for literal characters at the start literal = "" escaped = False for char in pattern: if escaped: # If it's a regex escape sequence, stop extraction if char in 'dDwWsSbBAZ': # Common regex escape sequences break elif char in 'nrtfv': # Special character escapes break elif char in '.^$*+?{[|(\\': # Escaped metacharacters literal += char else: # Unknown escape, stop extraction to be safe break escaped = False elif char == '\\': escaped = True elif char in '.^$*+?{[|(': break # Regex metacharacter else: literal += char return literal if len(literal) >= 3 else None def _search_file_content(self, content: str, regex, line_offsets: Optional[List[int]], context_length: int, max_matches: int = 5) -> List[Dict[str, Any]]: """Search content and return match contexts.""" match_contexts = [] match_count = 0 # Use finditer for streaming matches for match in regex.finditer(content): if match_count >= max_matches: break match_start = match.start() match_end = match.end() # Find line number efficiently if line_offsets: line_num = self._find_line_number(line_offsets, match_start) else: # Fallback: count newlines before match line_num = content[:match_start].count('\n') + 1 # Extract context context_start = max(0, match_start - context_length // 2) context_end = min(len(content), match_end + context_length // 2) context = content[context_start:context_end].strip() # Add ellipsis if truncated if context_start > 0: context = "..." + context if context_end < len(content): context = context + "..." match_contexts.append({ "match": match.group(0), "line": line_num, "context": context, "groups": match.groups() if match.groups() else None }) match_count += 1 return match_contexts async def _search_large_file_streaming(self, content: str, regex, line_offsets: Optional[List[int]], context_length: int, max_matches: int = 5) -> List[Dict[str, Any]]: """Search large files using a streaming approach with chunks.""" match_contexts = [] chunk_size = 512 * 1024 # 512KB chunks overlap = 1024 # 1KB overlap to catch matches at boundaries pos = 0 while pos < len(content) and len(match_contexts) < max_matches: # Extract chunk with overlap chunk_start = max(0, pos - overlap) chunk_end = min(len(content), pos + chunk_size) chunk = content[chunk_start:chunk_end] # Find matches in chunk for match in regex.finditer(chunk): # Adjust match position to full content coordinates match_start = chunk_start + match.start() match_end = chunk_start + match.end() # Skip if this match was already found in overlap if match_contexts and match_start <= match_contexts[-1]["match_start"]: continue # Find line number if line_offsets: line_num = self._find_line_number(line_offsets, match_start) else: line_num = content[:match_start].count('\n') + 1 # Extract context from full content context_start = max(0, match_start - context_length // 2) context_end = min(len(content), match_end + context_length // 2) context = content[context_start:context_end].strip() # Add ellipsis if truncated if context_start > 0: context = "..." + context if context_end < len(content): context = context + "..." match_contexts.append({ "match": match.group(0), "line": line_num, "context": context, "groups": match.groups() if match.groups() else None, "match_start": match_start # For duplicate detection }) if len(match_contexts) >= max_matches: break # Move to next chunk pos += chunk_size # Remove the match_start field used for duplicate detection for match_ctx in match_contexts: match_ctx.pop("match_start", None) return match_contexts def _find_line_number(self, line_starts: List[int], position: int) -> int: """Find line number using binary search.""" left, right = 0, len(line_starts) - 1 while left <= right: mid = (left + right) // 2 if mid + 1 < len(line_starts): if line_starts[mid] <= position < line_starts[mid + 1]: return mid + 1 elif position < line_starts[mid]: right = mid - 1 else: left = mid + 1 else: return mid + 1 return len(line_starts) async def get_all_files(self) -> List[str]: """Get list of all indexed files.""" cursor = await self.db.execute("SELECT filepath FROM file_index") files = [row[0] for row in await cursor.fetchall()] return files async def get_stats(self) -> Dict[str, Any]: """Get index statistics.""" cursor = await self.db.execute("SELECT COUNT(*), SUM(size), MAX(last_indexed) FROM file_index") row = await cursor.fetchone() return { "total_files": row[0] or 0, "total_size": row[1] or 0, "last_update": datetime.fromtimestamp(row[2]) if row[2] else None } async def clear_orphaned_entries(self, existing_files: set): """Remove index entries for files that no longer exist.""" async with self._lock: cursor = await self.db.execute("SELECT filepath FROM file_index") indexed_files = {row[0] for row in await cursor.fetchall()} orphaned = indexed_files - existing_files for filepath in orphaned: await self.remove_file(filepath) logger.info(f"Removed orphaned index entry: {filepath}") async def search_by_property(self, property_name: str, operator: str, value: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: """ Search for files by property values. Args: property_name: Name of the property to search operator: Comparison operator (=, !=, >, <, >=, <=, contains, exists) value: Value to compare against (optional for 'exists') limit: Maximum number of results Returns: List of file info with matching properties """ # Build SQL query based on operator if operator == 'exists': sql = """ SELECT DISTINCT f.filepath, f.content, p.property_value, p.property_type FROM file_index f JOIN file_properties p ON f.filepath = p.filepath WHERE p.property_name = ? ORDER BY f.mtime DESC LIMIT ? """ params = (property_name, limit) elif operator == 'contains': sql = """ SELECT DISTINCT f.filepath, f.content, p.property_value, p.property_type FROM file_index f JOIN file_properties p ON f.filepath = p.filepath WHERE p.property_name = ? AND p.property_value LIKE ? ORDER BY f.mtime DESC LIMIT ? """ params = (property_name, f"%{value}%", limit) elif operator == '!=': # For not equal, we need to include files without the property sql = """ SELECT DISTINCT f.filepath, f.content, COALESCE(p.property_value, '') as property_value, COALESCE(p.property_type, '') as property_type FROM file_index f LEFT JOIN file_properties p ON f.filepath = p.filepath AND p.property_name = ? WHERE p.property_value IS NULL OR p.property_value != ? ORDER BY f.mtime DESC LIMIT ? """ params = (property_name, value, limit) else: # =, >, <, >=, <= # For numeric comparisons, we need to handle type conversion if operator in ['>', '<', '>=', '<=']: sql = f""" SELECT DISTINCT f.filepath, f.content, p.property_value, p.property_type FROM file_index f JOIN file_properties p ON f.filepath = p.filepath WHERE p.property_name = ? AND ((p.property_type = 'number' AND CAST(p.property_value AS REAL) {operator} CAST(? AS REAL)) OR (p.property_type != 'number' AND p.property_value {operator} ?)) ORDER BY f.mtime DESC LIMIT ? """ params = (property_name, value, value, limit) else: # = operator sql = """ SELECT DISTINCT f.filepath, f.content, p.property_value, p.property_type FROM file_index f JOIN file_properties p ON f.filepath = p.filepath WHERE p.property_name = ? AND LOWER(p.property_value) = LOWER(?) ORDER BY f.mtime DESC LIMIT ? """ params = (property_name, value, limit) cursor = await self.db.execute(sql, params) results = [] async for row in cursor: results.append({ "filepath": row[0], "content": row[1], "property_value": row[2], "property_type": row[3] if len(row) > 3 else None }) return results async def get_all_property_names(self) -> List[str]: """Get a list of all unique property names in the index.""" cursor = await self.db.execute(""" SELECT DISTINCT property_name FROM file_properties ORDER BY property_name """) return [row[0] for row in await cursor.fetchall()] async def get_property_values(self, property_name: str) -> List[Tuple[str, int]]: """Get all unique values for a property with counts.""" cursor = await self.db.execute(""" SELECT property_value, COUNT(*) as count FROM file_properties WHERE property_name = ? GROUP BY property_value ORDER BY count DESC, property_value """, (property_name,)) return await cursor.fetchall()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/suhailnajeeb/obsidian-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server