Skip to main content
Glama

Obsidian MCP Server

by pmmvr
search.py16.5 kB
import os import math from typing import Any, Dict, List, Optional, Union from datetime import datetime import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from obsidian_mcp.client import ObsidianClient from obsidian_mcp.utils import format_timestamp, parse_date_filter class SearchProcessor: """ Processes search queries against an Obsidian vault, handling various filters, pagination, and result formatting. """ def __init__(self, client: ObsidianClient): self.client = client async def _get_file_metadata(self, file_path: str, include_content_for_tags: bool = False) -> Optional[Dict[str, Any]]: try: note_metadata = await self.client.get_note_metadata(file_path) result = { "mtime": note_metadata["stat"]["mtime"], "ctime": note_metadata["stat"]["ctime"] } # Include content and tags if needed for tag filtering if include_content_for_tags: result["content"] = note_metadata.get("content", "") result["tags"] = note_metadata.get("frontmatter", {}).get("tags", []) return result except Exception: return None def _apply_filters(self, file_path: str, metadata: Dict[str, Any], search_path_prefix: str, title_contains: Optional[Union[str, List[str]]], title_match_mode: str, tag: Optional[Union[str, List[str]]], tag_match_mode: str, since_date: Optional[datetime], until_date: Optional[datetime], created_since_date: Optional[datetime], created_until_date: Optional[datetime]) -> bool: """ Applies various filters to a file based on its path, metadata, and specified criteria. Returns True if the file passes all filters, False otherwise. """ if search_path_prefix and not file_path.startswith(search_path_prefix): return False if title_contains: filename = os.path.basename(file_path).lower() if isinstance(title_contains, str): if title_contains.lower() not in filename: return False else: terms = [term.lower() for term in title_contains] if title_match_mode == "all": if not all(term in filename for term in terms): return False else: if not any(term in filename for term in terms): return False # Check tag filter - tags are stored in frontmatter or content if tag: tags_found = metadata.get("tags", []) # Also check for inline tags in content if available content = metadata.get("content", "") if content: # Look for #tag format in content import re inline_tags = re.findall(r'#(\w+)', content) tags_found.extend(inline_tags) # Convert to lowercase for case-insensitive matching tags_found = [t.lower() for t in tags_found] # Handle multiple tags with AND/OR logic if isinstance(tag, str): # Single tag if tag.lower() not in tags_found: return False else: # Multiple tags - apply OR/AND logic tags_to_match = [t.lower() for t in tag] if tag_match_mode == "all": # ALL tags must be present (AND logic) if not all(tag_term in tags_found for tag_term in tags_to_match): return False else: # tag_match_mode == "any" (default) # ANY tag must be present (OR logic) if not any(tag_term in tags_found for tag_term in tags_to_match): return False file_mod_time = datetime.fromtimestamp(metadata["mtime"] / 1000) if since_date and file_mod_time < since_date: return False if until_date and file_mod_time > until_date: return False file_created_time = datetime.fromtimestamp(metadata["ctime"] / 1000) if created_since_date and file_created_time < created_since_date: return False if created_until_date and file_created_time > created_until_date: return False return True def _process_matches(self, api_result: Dict[str, Any], max_matches_per_file: int) -> List[Dict[str, Any]]: matches = [] for match in api_result.get("matches", []): matches.append({ "context": match.get("context", ""), "match_start": match.get("match", {}).get("start", 0), "match_end": match.get("match", {}).get("end", 0) }) return matches[:max_matches_per_file] def _create_result_item(self, file_path: str, matches: List[Dict[str, Any]], metadata: Dict[str, Any], score: int) -> Dict[str, Any]: return { "path": file_path, "filename": os.path.basename(file_path), "matches": matches, "modified_time": format_timestamp(metadata["mtime"]), "created_time": format_timestamp(metadata["ctime"]), "score": score } def _paginate_results(self, results: List[Dict[str, Any]], page: int, page_size: int) -> tuple: total_files_found = len(results) total_pages = math.ceil(total_files_found / page_size) start_index = (page - 1) * page_size end_index = start_index + page_size paginated_results = results[start_index:end_index] also_found_in_files = None if total_pages > 1: # Collect filenames from other pages if pagination is active paginated_paths = {result["path"] for result in paginated_results} also_found_in_files = [ result["filename"] for result in results if result["path"] not in paginated_paths ] return paginated_results, total_files_found, total_pages, also_found_in_files async def search(self, query: Optional[str] = None, query_type: str = "text", search_in_path: Optional[str] = None, title_contains: Optional[Union[str, List[str]]] = None, title_match_mode: str = "any", tag: Optional[Union[str, List[str]]] = None, tag_match_mode: str = "any", context_length: int = 100, include_content: bool = False, modified_since: Optional[str] = None, modified_until: Optional[str] = None, created_since: Optional[str] = None, created_until: Optional[str] = None, page_size: int = 50, page: int = 1, max_matches_per_file: int = 5) -> Dict[str, Any]: date_filters = self._parse_date_filters(modified_since, modified_until, created_since, created_until) search_path_prefix = self._normalize_search_path(search_in_path) try: # Determine the base path for API search if a prefix is provided base_search_path = search_path_prefix.rstrip('/') if search_path_prefix else "" api_results = await self._get_api_results(query, query_type, context_length, base_search_path) filtered_results, total_matches_count = await self._process_results( api_results, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode, date_filters, max_matches_per_file, query, include_content ) filtered_results.sort(key=lambda x: x["modified_time"], reverse=True) paginated_results, total_files_found, total_pages, also_found_in_files = self._paginate_results( filtered_results, page, page_size ) message = self._create_response_message( total_matches_count, total_files_found, page, total_pages, len(paginated_results), search_path_prefix ) return { "success": True, "message": message, "results": paginated_results, "total_files_found": total_files_found, "total_matches_found": total_matches_count, "current_page": page, "page_size": page_size, "total_pages": total_pages, "also_found_in_files": also_found_in_files } except Exception as e: return self._create_error_response(str(e), page, page_size) def _parse_date_filters(self, modified_since: Optional[str], modified_until: Optional[str], created_since: Optional[str], created_until: Optional[str]) -> Dict[str, Optional[datetime]]: return { "since_date": parse_date_filter(modified_since) if modified_since else None, "until_date": parse_date_filter(modified_until) if modified_until else None, "created_since_date": parse_date_filter(created_since) if created_since else None, "created_until_date": parse_date_filter(created_until) if created_until else None } def _normalize_search_path(self, search_in_path: Optional[str]) -> str: if not search_in_path: return "" search_path_prefix = search_in_path.strip("/") return search_path_prefix + "/" if search_path_prefix else "" async def _get_api_results(self, query: Optional[str], query_type: str, context_length: int, search_path: str = "") -> List[Dict[str, Any]]: if query and query.strip(): if query_type == "regex": return await self._execute_regex_search(query, search_path) else: # Default to simple text search if query type is not regex or not specified return await self.client.search_simple(query, context_length) else: # If no query is provided, list all markdown files in the specified path all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000) return [ { "filename": file_path, "score": 0, "matches": [] } for file_path in all_files if file_path.endswith('.md') ] async def _execute_regex_search(self, regex_pattern: str, search_path: str = "") -> List[Dict[str, Any]]: import re try: if not regex_pattern.startswith('(?'): # Default to case-insensitive regex search if no flags are provided case_insensitive_pattern = f"(?i){regex_pattern}" else: case_insensitive_pattern = regex_pattern regex = re.compile(case_insensitive_pattern) all_files = await self.client.list_all_files(search_path, max_depth=8, max_files=1000) md_files = [f for f in all_files if f.endswith('.md')] formatted_results = [] for file_path in md_files: try: note_data = await self.client.get_note_metadata(file_path) content = note_data.get("content", "") matches = list(regex.finditer(content)) if matches: match_data = [] for match in matches[:5]: # Create a context window around each match start = max(0, match.start() - 50) end = min(len(content), match.end() + 50) context = content[start:end] match_data.append({ "context": context, "match": { "start": match.start() - start, "end": match.end() - start } }) formatted_results.append({ "filename": file_path, "score": len(matches), "matches": match_data }) except Exception: continue return formatted_results except Exception as e: print(f"Regex search failed: {e}, falling back to simple search") return await self.client.search_simple(regex_pattern, 100) async def _process_results(self, api_results: List[Dict[str, Any]], search_path_prefix: str, title_contains: Optional[Union[str, List[str]]], title_match_mode: str, tag: Optional[Union[str, List[str]]], tag_match_mode: str, date_filters: Dict[str, Optional[datetime]], max_matches_per_file: int, query: Optional[str], include_content: bool = False) -> tuple: filtered_results = [] total_matches_count = 0 for api_result in api_results: file_path = api_result["filename"] # Include content if we need to filter by tags metadata = await self._get_file_metadata(file_path, include_content_for_tags=bool(tag)) if not metadata: continue if not self._apply_filters( file_path, metadata, search_path_prefix, title_contains, title_match_mode, tag, tag_match_mode, date_filters["since_date"], date_filters["until_date"], date_filters["created_since_date"], date_filters["created_until_date"] ): continue all_matches = api_result.get("matches", []) matches = self._process_matches(api_result, max_matches_per_file) total_matches_count += len(all_matches) if include_content or (query is None or query.strip() == ""): # If include_content is true, or if there's no search query (listing all files), # attempt to fetch and include the full note content. try: full_content = await self.client.get_note_metadata(file_path) content_text = full_content.get("content", "") if content_text: matches = [{ "context": content_text, "match_start": 0, "match_end": len(content_text) }] except Exception: pass if matches or (query is None or query.strip() == ""): result_item = self._create_result_item( file_path, matches, metadata, api_result.get("score", 0) ) filtered_results.append(result_item) return filtered_results, total_matches_count def _create_response_message(self, total_matches_count: int, total_files_found: int, page: int, total_pages: int, current_page_files: int, search_path_prefix: str) -> str: message = (f"Found {total_matches_count} matches across {total_files_found} files. " f"Showing page {page} of {total_pages} ({current_page_files} files on this page).") if search_path_prefix: message += f" Searched in path: {search_path_prefix}" return message def _create_error_response(self, error_msg: str, page: int, page_size: int) -> Dict[str, Any]: return { "success": False, "message": f"Search failed: {error_msg}", "results": [], "total_files_found": 0, "total_matches_found": 0, "current_page": page, "page_size": page_size, "total_pages": 0 }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pmmvr/obsidian-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server