"""Whoosh-based search index for Obsidian notes."""
import logging
import os
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from whoosh import fields, index
from whoosh.filedb.filestore import FileStorage
from whoosh.index import EmptyIndexError, IndexError, LockError
from whoosh.qparser import MultifieldParser, QueryParser
from whoosh.query import Query
from whoosh.searching import Results
from whoosh.writing import IndexWriter
from .config import ServerConfig
from .parser import ObsidianNote
from .vector_search import VectorSearchEngine
logger = logging.getLogger(__name__)
class SearchResult:
"""Represents a search result."""
def __init__(self, note: ObsidianNote, score: float, highlights: Dict[str, str]):
self.note = note
self.score = score
self.highlights = highlights
class HybridSearchEngine:
"""Hybrid search engine combining Whoosh text search and vector search."""
def __init__(self, config: ServerConfig):
"""Initialize the hybrid search engine.
Args:
config: Server configuration
"""
self.config = config
# Initialize Whoosh text search
if config.index_path is None:
raise ValueError("index_path cannot be None")
self.text_search = ObsidianSearchIndex(config.index_path)
# Initialize vector search
if config.vector_index_path is None:
raise ValueError("vector_index_path cannot be None")
self.vector_search = VectorSearchEngine(
index_path=config.vector_index_path, embedding_model=config.embedding_model
)
logger.info(f"Initialized hybrid search with alpha={config.hybrid_alpha}")
def add_note(self, note: ObsidianNote) -> None:
"""Add a note to both text and vector indices."""
self.text_search.add_note(note)
self.vector_search.add_note(note)
def remove_note(self, file_path: Path) -> None:
"""Remove a note from both indices."""
self.text_search.remove_note(file_path)
self.vector_search.remove_note(str(file_path))
def bulk_add_notes(self, notes: List[ObsidianNote]) -> None:
"""Add multiple notes to both indices efficiently."""
self.text_search.bulk_add_notes(notes)
# Vector search handles batching internally
for note in notes:
self.vector_search.add_note(note)
def search(
self,
query: str,
limit: int = 50,
tags: Optional[Set[str]] = None,
search_fields: Optional[List[str]] = None,
search_mode: str = "hybrid",
) -> List[Dict[str, Any]]:
"""Search using hybrid approach combining text and vector search.
Args:
query: Search query
limit: Maximum number of results
tags: Optional tag filter
search_fields: Fields to search in (for text search)
search_mode: "text", "vector", or "hybrid"
Returns:
List of search results with combined ranking
"""
if not query.strip():
return []
if search_mode == "text":
return self.text_search.search(query, limit, tags, search_fields)
elif search_mode == "vector":
return self._format_vector_results(
self.vector_search.search(query, limit, list(tags) if tags else None)
)
else: # hybrid
return self._hybrid_search(query, limit, tags, search_fields)
def _hybrid_search(
self,
query: str,
limit: int,
tags: Optional[Set[str]] = None,
search_fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Perform hybrid search with Reciprocal Rank Fusion."""
# Get results from both search methods
# Use higher limits to ensure good fusion results
search_limit = min(limit * 3, 150)
text_results = self.text_search.search(query, search_limit, tags, search_fields)
vector_results = self.vector_search.search(
query, search_limit, list(tags) if tags else None
)
# Apply Reciprocal Rank Fusion (RRF)
fused_results = self._reciprocal_rank_fusion(
text_results, vector_results, limit
)
return fused_results
def _reciprocal_rank_fusion(
self,
text_results: List[Dict[str, Any]],
vector_results: List[Dict[str, Any]],
limit: int,
k: int = 60,
) -> List[Dict[str, Any]]:
"""Apply Reciprocal Rank Fusion to combine text and vector results.
Args:
text_results: Results from text search
vector_results: Results from vector search
limit: Maximum number of results to return
k: RRF parameter (typical value: 60)
Returns:
Fused and ranked results
"""
# Create a mapping of document paths to combined scores
doc_scores: dict[str, float] = {}
doc_info: dict[str, dict[str, Any]] = {}
# Process text search results
for rank, result in enumerate(text_results):
path = result["path"]
rrf_score = 1.0 / (k + rank + 1)
doc_scores[path] = doc_scores.get(path, 0) + rrf_score * (
1 - self.config.hybrid_alpha
)
doc_info[path] = {
**result,
"text_rank": rank + 1,
"text_score": result.get("score", 0),
"vector_rank": None,
"vector_score": None,
}
# Process vector search results
formatted_vector_results = self._format_vector_results(vector_results)
for rank, result in enumerate(formatted_vector_results):
path = result["path"]
rrf_score = 1.0 / (k + rank + 1)
doc_scores[path] = (
doc_scores.get(path, 0) + rrf_score * self.config.hybrid_alpha
)
if path in doc_info:
# Update existing entry
doc_info[path]["vector_rank"] = rank + 1
doc_info[path]["vector_score"] = result.get("similarity_score", 0)
else:
# New entry from vector search only
doc_info[path] = {
**result,
"text_rank": None,
"text_score": None,
"vector_rank": rank + 1,
"vector_score": result.get("similarity_score", 0),
}
# Sort by combined RRF score and take top results
sorted_results = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[
:limit
]
# Format final results
final_results = []
for path, combined_score in sorted_results:
result = doc_info[path].copy()
result["combined_score"] = combined_score
result["search_mode"] = "hybrid"
final_results.append(result)
return final_results
def _format_vector_results(
self, vector_results: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Format vector search results to match text search result format."""
formatted_results = []
for result in vector_results:
# Get full note content from text index if available
note_data = self.text_search.get_note_by_path(Path(result["path"]))
formatted_result = {
"path": result["path"],
"title": result["title"],
"content": (
note_data["content"][:500] + "..."
if note_data and len(note_data["content"]) > 500
else (note_data["content"] if note_data else "")
),
"tags": result["tags"],
"score": result["similarity_score"],
"similarity_score": result["similarity_score"],
"highlights": {}, # Vector search doesn't provide highlights
"created_date": result.get("created_date"),
"modified_date": result.get("modified_date"),
"search_mode": "vector",
}
formatted_results.append(formatted_result)
return formatted_results
def semantic_search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Perform pure semantic/vector search."""
vector_results = self.vector_search.search(query, limit)
return self._format_vector_results(vector_results)
def find_similar_notes(
self, note_path: str, limit: int = 10
) -> List[Dict[str, Any]]:
"""Find notes similar to a given note."""
similar_results = self.vector_search.get_similar_notes(note_path, limit)
return self._format_vector_results(similar_results)
def get_note_by_path(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Get a specific note by its path."""
return self.text_search.get_note_by_path(file_path)
def list_all_tags(self) -> List[str]:
"""Get all unique tags in the index."""
return self.text_search.list_all_tags()
def get_recent_notes(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recently modified notes."""
return self.text_search.get_recent_notes(limit)
def rebuild_index(self, notes: List[ObsidianNote]) -> None:
"""Completely rebuild both search indices."""
logger.info("Rebuilding hybrid search indices...")
self.text_search.rebuild_index(notes)
self.vector_search.rebuild_index(notes)
logger.info("Hybrid search indices rebuild complete")
def optimize_index(self) -> None:
"""Optimize the text search index."""
self.text_search.optimize_index()
def get_stats(self) -> Dict[str, Any]:
"""Get statistics for both search indices."""
text_stats = self.text_search.get_stats()
vector_stats = self.vector_search.get_stats()
return {
"text_search": text_stats,
"vector_search": vector_stats,
"hybrid_alpha": self.config.hybrid_alpha,
"embedding_model": self.config.embedding_model,
}
def needs_update(self, vault_path: Path) -> bool:
"""Check if indices need updating."""
return self.text_search.needs_update(vault_path)
def is_properly_initialized(self, expected_note_count: int = 0) -> bool:
"""Fast check if both text and vector indices are properly initialized.
Args:
expected_note_count: Expected number of notes (0 to skip count check)
Returns:
True if both indices appear to be properly initialized, False otherwise.
"""
# Check text search index
if not self.text_search.is_properly_initialized(expected_note_count):
return False
# Check vector search index (more lenient check)
# Vector index can be rebuilt separately if needed, so don't block on it
try:
vector_stats = self.vector_search.get_stats()
vector_count = vector_stats.get("document_count", 0)
# Only log if vector index appears empty - don't block initialization
if expected_note_count > 0 and vector_count == 0:
logger.info(
f"Vector index appears empty ({vector_count} docs) while text index has content"
)
logger.info(
"Allowing initialization to proceed; vector index can be rebuilt separately"
)
except Exception as e:
logger.warning(f"Error checking vector index status: {e}")
# Don't block initialization due to vector index issues
return True
def quick_health_check(self) -> Dict[str, Any]:
"""Perform a quick health check of both text and vector indices.
Returns:
Dictionary with health check results for both indices.
"""
health: Dict[str, Any] = {
"text_index": self.text_search.quick_health_check(),
"vector_index": {"status": "unknown", "errors": []},
"overall_healthy": False,
}
# Check vector index health
try:
vector_stats = self.vector_search.get_stats()
health["vector_index"] = {
"status": "healthy",
"document_count": vector_stats.get("document_count", 0),
"errors": [],
}
except Exception as e:
health["vector_index"] = {
"status": "error",
"errors": [f"Vector index error: {str(e)}"],
}
# Determine overall health
text_errors = health["text_index"].get("errors", [])
vector_errors = health["vector_index"].get("errors", [])
text_healthy = len(text_errors) == 0 if isinstance(text_errors, list) else False
vector_healthy = (
len(vector_errors) == 0 if isinstance(vector_errors, list) else False
)
health["overall_healthy"] = text_healthy and vector_healthy
return health
def incremental_update(self, vault_path: Path, parser: Any) -> Dict[str, int]:
"""Perform incremental update of both indices."""
stats = self.text_search.incremental_update(vault_path, parser)
# Update vector index for the same files
# This is a simplified approach - in practice you'd want to track
# which files were updated and only update those in vector index
try:
index_mtime_str = self.text_search._get_index_last_modified()
if index_mtime_str:
index_dt = datetime.fromisoformat(index_mtime_str)
for md_file in vault_path.rglob("*.md"):
if ".obsidian" in md_file.parts:
continue
file_mtime = datetime.fromtimestamp(md_file.stat().st_mtime)
if file_mtime > index_dt:
note = parser.parse_note(md_file)
if note:
self.vector_search.add_note(note)
except Exception as e:
logger.error(f"Error during vector index incremental update: {e}")
return stats
class ObsidianSearchIndex:
"""Whoosh-based search index for Obsidian notes."""
# Index schema
SCHEMA = fields.Schema(
path=fields.ID(stored=True, unique=True),
title=fields.TEXT(stored=True, phrase=True),
content=fields.TEXT(stored=True),
tags=fields.KEYWORD(stored=True, commas=True),
wikilinks=fields.KEYWORD(stored=True, commas=True),
created_date=fields.DATETIME(stored=True),
modified_date=fields.DATETIME(stored=True),
frontmatter=fields.TEXT(stored=True),
)
def __init__(self, index_path: Path):
"""Initialize search index."""
self.index_path = index_path
self.index_path.mkdir(parents=True, exist_ok=True)
self._index: Optional[index.Index] = None
self._ensure_index()
def _ensure_index(self) -> None:
"""Ensure the index exists and is properly initialized."""
max_retries = 2
for attempt in range(max_retries):
try:
if index.exists_in(str(self.index_path)):
# Try to open existing index
self._index = index.open_dir(str(self.index_path))
# Validate the opened index
self._validate_index()
logger.info(
f"Successfully opened existing index at {self.index_path}"
)
return
else:
# Create new index
self._index = index.create_in(str(self.index_path), self.SCHEMA)
logger.info(f"Created new index at {self.index_path}")
return
except (IndexError, EmptyIndexError, LockError, TypeError, ValueError) as e:
logger.warning(
f"Index corruption detected (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Try to recover by rebuilding
if self._recover_from_corruption():
continue
else:
# Final attempt failed
logger.error(
f"Failed to initialize index after {max_retries} attempts"
)
raise RuntimeError(f"Unable to initialize search index: {e}")
except Exception as e:
logger.error(f"Unexpected error during index initialization: {e}")
if attempt < max_retries - 1 and self._recover_from_corruption():
continue
raise
def _validate_index(self) -> None:
"""Validate that the index can be used for basic operations."""
try:
with self._index.searcher() as searcher: # type: ignore[union-attr]
# Try to get document count - this will fail if index is corrupted
searcher.doc_count()
except Exception as e:
logger.warning(f"Index validation failed: {e}")
raise IndexError(f"Index validation failed: {e}")
def _recover_from_corruption(self) -> bool:
"""Attempt to recover from index corruption by removing corrupted files."""
try:
logger.info(
f"Attempting to recover from index corruption at {self.index_path}"
)
# Remove corrupted index directory
if self.index_path.exists():
shutil.rmtree(self.index_path)
logger.info(f"Removed corrupted index directory: {self.index_path}")
# Recreate directory
self.index_path.mkdir(parents=True, exist_ok=True)
# Create fresh index
self._index = index.create_in(str(self.index_path), self.SCHEMA)
logger.info(f"Created fresh index after corruption recovery")
return True
except Exception as e:
logger.error(f"Failed to recover from index corruption: {e}")
return False
def _retry_with_exponential_backoff(
self, func: Callable[[], Any], max_retries: int = 5, initial_delay: float = 0.1
) -> Any:
"""Retry a function with exponential backoff on LockError.
Args:
func: Function to retry (should be a callable that may raise LockError)
max_retries: Maximum number of retry attempts
initial_delay: Initial delay in seconds (doubles each retry)
Returns:
Result of successful function call
Raises:
LockError: If all retries fail
"""
delay = initial_delay
last_exception = None
for attempt in range(max_retries + 1): # +1 for initial attempt
try:
return func()
except LockError as e:
last_exception = e
if attempt == max_retries:
logger.error(
f"Failed to acquire index lock after {max_retries + 1} attempts. "
f"This may indicate multiple server instances are running concurrently."
)
break
logger.warning(
f"Index lock contention detected (attempt {attempt + 1}/{max_retries + 1}). "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)
delay *= 2 # Exponential backoff
except Exception as e:
# Don't retry on non-lock errors
raise
# If we get here, all retries failed
raise last_exception or LockError("Failed to acquire lock after retries")
def add_note(self, note: ObsidianNote) -> None:
"""Add or update a note in the index."""
def add_note_operation() -> None:
with self._index.writer() as writer: # type: ignore[union-attr]
writer.update_document(
path=str(note.path),
title=note.title,
content=note.content,
tags=",".join(note.tags),
wikilinks=",".join(note.wikilinks),
created_date=note.created_date,
modified_date=note.modified_date,
frontmatter=str(note.frontmatter),
)
try:
# Use retry mechanism for lock errors
self._retry_with_exponential_backoff(add_note_operation)
except (IndexError, LockError) as e:
logger.error(f"Failed to add note {note.path} to index: {e}")
# Try to recover from corruption and retry once more
if isinstance(e, IndexError) and self._recover_from_corruption():
logger.info(
f"Retrying add_note for {note.path} after corruption recovery"
)
self._retry_with_exponential_backoff(add_note_operation)
else:
raise
def remove_note(self, file_path: Path) -> None:
"""Remove a note from the index."""
def remove_operation() -> None:
with self._index.writer() as writer: # type: ignore[union-attr]
writer.delete_by_term("path", str(file_path))
# Use retry mechanism for lock errors
self._retry_with_exponential_backoff(remove_operation)
def bulk_add_notes(self, notes: List[ObsidianNote]) -> None:
"""Add multiple notes to the index efficiently."""
def bulk_add_operation() -> None:
with self._index.writer() as writer: # type: ignore[union-attr]
for note in notes:
writer.update_document(
path=str(note.path),
title=note.title,
content=note.content,
tags=",".join(note.tags),
wikilinks=",".join(note.wikilinks),
created_date=note.created_date,
modified_date=note.modified_date,
frontmatter=str(note.frontmatter),
)
# Use retry mechanism for lock errors
self._retry_with_exponential_backoff(bulk_add_operation)
def search(
self,
query: str,
limit: int = 50,
tags: Optional[Set[str]] = None,
search_fields: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Search the index and return results."""
if not query.strip():
return []
# Default search fields
if search_fields is None:
search_fields = ["title", "content", "tags"]
try:
with self._index.searcher() as searcher: # type: ignore[union-attr]
# Create parser for multi-field search
parser = MultifieldParser(search_fields, self._index.schema) # type: ignore[union-attr]
# Parse the query
try:
parsed_query = parser.parse(query)
except Exception:
# Fall back to simple content search if parsing fails
parser = QueryParser("content", self._index.schema) # type: ignore[union-attr]
parsed_query = parser.parse(query)
# Add tag filter if specified
if tags:
tag_queries = []
for tag in tags:
tag_parser = QueryParser("tags", self._index.schema) # type: ignore[union-attr]
tag_queries.append(tag_parser.parse(tag))
if tag_queries:
from whoosh.query import And, Or
tag_query = (
Or(tag_queries) if len(tag_queries) > 1 else tag_queries[0]
)
parsed_query = And([parsed_query, tag_query])
# Execute search
results = searcher.search(parsed_query, limit=limit)
# Convert to our format
search_results = []
for result in results:
# Get highlights - this is a method that needs to be called
highlights = {}
try:
if hasattr(result, "highlights"):
highlights = dict(result.highlights())
except:
highlights = {}
search_results.append(
{
"path": result["path"],
"title": result["title"],
"content": (
result["content"][:500] + "..."
if len(result["content"]) > 500
else result["content"]
),
"tags": result["tags"].split(",") if result["tags"] else [],
"score": result.score,
"highlights": highlights,
"created_date": (
result["created_date"].isoformat()
if result["created_date"]
else None
),
"modified_date": (
result["modified_date"].isoformat()
if result["modified_date"]
else None
),
}
)
return search_results
except (IndexError, LockError) as e:
if isinstance(e, LockError):
logger.error(f"Search failed due to index lock contention: {e}")
logger.info(
"This may indicate multiple server instances are accessing the index concurrently."
)
else:
logger.error(f"Search failed due to index corruption: {e}")
# Try to recover and return empty results rather than crashing
if self._recover_from_corruption():
logger.info(
"Index recovered, but returning empty search results. Re-index needed."
)
return []
def get_note_by_path(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Get a specific note by its path."""
with self._index.searcher() as searcher: # type: ignore[union-attr]
results = searcher.documents(path=str(file_path))
for result in results:
return {
"path": result["path"],
"title": result["title"],
"content": result["content"],
"tags": result["tags"].split(",") if result["tags"] else [],
"wikilinks": (
result["wikilinks"].split(",") if result["wikilinks"] else []
),
"created_date": (
result["created_date"].isoformat()
if result["created_date"]
else None
),
"modified_date": (
result["modified_date"].isoformat()
if result["modified_date"]
else None
),
}
return None
def list_all_tags(self) -> List[str]:
"""Get all unique tags in the index."""
tags: set[str] = set()
with self._index.searcher() as searcher: # type: ignore[union-attr]
for fields in searcher.all_stored_fields():
if fields.get("tags"):
note_tags = fields["tags"].split(",")
tags.update(tag.strip() for tag in note_tags if tag.strip())
return sorted(list(tags))
def get_recent_notes(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recently modified notes."""
with self._index.searcher() as searcher: # type: ignore[union-attr]
results = searcher.documents()
# Sort by modified date
sorted_results = sorted(
results,
key=lambda x: x.get("modified_date", datetime.min),
reverse=True,
)
recent_notes = []
for result in sorted_results[:limit]:
recent_notes.append(
{
"path": result["path"],
"title": result["title"],
"content": (
result["content"][:200] + "..."
if len(result["content"]) > 200
else result["content"]
),
"tags": result["tags"].split(",") if result["tags"] else [],
"modified_date": (
result["modified_date"].isoformat()
if result["modified_date"]
else None
),
}
)
return recent_notes
def rebuild_index(self, notes: List[ObsidianNote]) -> None:
"""Completely rebuild the search index."""
# Clear existing index by removing all documents
def clear_index() -> None:
with self._index.writer() as writer: # type: ignore[union-attr]
# Truncate the index (remove all documents)
from whoosh.query import Every
writer.delete_by_query(Every())
# Use retry mechanism for clearing the index
self._retry_with_exponential_backoff(clear_index)
# Add all notes
self.bulk_add_notes(notes)
def optimize_index(self) -> None:
"""Optimize the index for better performance."""
def optimize_operation() -> None:
with self._index.writer() as writer: # type: ignore[union-attr]
writer.commit(optimize=True)
# Use retry mechanism for lock errors
self._retry_with_exponential_backoff(optimize_operation)
def get_stats(self) -> Dict[str, Any]:
"""Get index statistics."""
with self._index.searcher() as searcher: # type: ignore[union-attr]
return {
"doc_count": searcher.doc_count(),
"field_names": list(self._index.schema.names()), # type: ignore[union-attr]
"index_path": str(self.index_path),
"last_modified": self._get_index_last_modified(),
}
def _get_index_last_modified(self) -> Optional[str]:
"""Get the last modification time of the index."""
try:
# Check modification time of the main index file
index_files = list(self.index_path.glob("*"))
if index_files:
latest_mtime = max(f.stat().st_mtime for f in index_files)
return datetime.fromtimestamp(latest_mtime).isoformat()
except Exception:
pass
return None
def needs_update(self, vault_path: Path) -> bool:
"""Check if index needs updating based on file modification times."""
try:
# Get index last modified time
index_mtime = self._get_index_last_modified()
if not index_mtime:
return True # No index exists
index_dt = datetime.fromisoformat(index_mtime)
# Check if any markdown files are newer than the index
for md_file in vault_path.rglob("*.md"):
if ".obsidian" in md_file.parts:
continue
file_mtime = datetime.fromtimestamp(md_file.stat().st_mtime)
if file_mtime > index_dt:
return True
return False
except Exception:
return True # Err on the side of caution
def is_properly_initialized(self, expected_note_count: int = 0) -> bool:
"""Fast check if index is properly initialized and ready for use.
Args:
expected_note_count: Expected number of notes (0 to skip count check)
Returns:
True if index appears to be properly initialized, False otherwise.
"""
try:
# Check if index directory and files exist
if not self.index_path.exists():
return False
# Check for essential Whoosh index files
toc_files = list(self.index_path.glob("*.toc"))
if not toc_files:
return False
# Try to open index and get basic stats without long operations
with self._index.searcher() as searcher: # type: ignore[union-attr]
doc_count = searcher.doc_count()
# If we have an expected count, verify it roughly matches
if expected_note_count > 0:
# Allow some variance (±10%) for small differences
variance_threshold = max(1, int(expected_note_count * 0.1))
if abs(doc_count - expected_note_count) > variance_threshold:
logger.info(
f"Index doc count ({doc_count}) doesn't match expected ({expected_note_count})"
)
return False
# Check if index has any documents at all
if doc_count == 0 and expected_note_count > 0:
logger.info("Index appears empty but notes are expected")
return False
logger.info(
f"Index appears properly initialized with {doc_count} documents"
)
return True
except Exception as e:
logger.warning(f"Error checking index initialization status: {e}")
return False
def quick_health_check(self) -> Dict[str, Any]:
"""Perform a quick health check of the index.
Returns:
Dictionary with health check results.
"""
health: Dict[str, Any] = {
"index_exists": False,
"has_documents": False,
"doc_count": 0,
"last_modified": None,
"essential_files_present": False,
"errors": [],
}
try:
# Check if index directory exists
health["index_exists"] = self.index_path.exists()
if not health["index_exists"]:
errors = health["errors"]
if isinstance(errors, list):
errors.append("Index directory does not exist")
return health
# Check for essential files
toc_files = list(self.index_path.glob("*.toc"))
health["essential_files_present"] = len(toc_files) > 0
if not health["essential_files_present"]:
errors = health["errors"]
if isinstance(errors, list):
errors.append("Essential index files (.toc) not found")
# Get last modified time
health["last_modified"] = self._get_index_last_modified()
# Try to get document count
if self._index:
with self._index.searcher() as searcher:
health["doc_count"] = searcher.doc_count()
health["has_documents"] = health["doc_count"] > 0
except Exception as e:
errors = health["errors"]
if isinstance(errors, list):
errors.append(f"Error during health check: {str(e)}")
return health
def incremental_update(self, vault_path: Path, parser: Any) -> Dict[str, int]:
"""Perform incremental update of files newer than index."""
stats = {"updated": 0, "added": 0, "removed": 0}
try:
index_mtime_str = self._get_index_last_modified()
if not index_mtime_str:
return stats
index_dt = datetime.fromisoformat(index_mtime_str)
# Find files that need updating
files_to_update = []
for md_file in vault_path.rglob("*.md"):
if ".obsidian" in md_file.parts:
continue
file_mtime = datetime.fromtimestamp(md_file.stat().st_mtime)
if file_mtime > index_dt:
files_to_update.append(md_file)
# Update modified files
for file_path in files_to_update:
note = parser.parse_note(file_path)
if note:
# Check if note already exists in index
existing = self.get_note_by_path(file_path)
if existing:
stats["updated"] += 1
else:
stats["added"] += 1
self.add_note(note)
# TODO: Handle removed files (would need to track indexed files)
except Exception as e:
logger.error(f"Error during incremental update: {e}")
return stats