"""Search engine implementation for advanced search capabilities."""
import asyncio
import re
from datetime import datetime
from typing import Any
from ..logging import logger
from .date_parser import parse_natural_date
from .fuzzy import FuzzyMatcher
from .parser import QueryParser, QueryToken, TokenType
class SearchEngine:
"""Implementation of advanced search capabilities for Simplenote notes.
Supports:
- Boolean operators (AND, OR, NOT)
- Phrase matching
- Tag filtering
- Date range filtering
"""
def __init__(self, fuzzy: bool = False) -> None:
"""Initialize the search engine.
Args:
fuzzy: Enable fuzzy matching for typo-tolerant search.
"""
self._lock = asyncio.Lock()
self._fuzzy = fuzzy
self._fuzzy_matcher = FuzzyMatcher() if fuzzy else None
def search(
self,
notes: dict[str, dict[str, Any]],
query: str,
tag_filters: list[str] | None = None,
date_range: tuple[datetime | None, datetime | None] | None = None,
) -> list[dict[str, Any]]:
"""Search notes using advanced query capabilities.
Args:
notes: Dictionary of notes to search through (key -> note)
query: The search query (supports boolean operators)
tag_filters: Optional list of tags to filter by
date_range: Optional tuple of (from_date, to_date)
Returns:
List of matching notes sorted by relevance. Pagination should be handled
by the caller.
"""
logger.debug(f"Performing advanced search with query: '{query}'")
# Parse the query into tokens
parser = QueryParser(query)
tokens = parser.tokens
# Extract standalone special tokens (tags, dates) that should be applied globally
global_tag_filters = set(tag_filters) if tag_filters else set()
global_from_date = None
global_to_date = None
if date_range:
global_from_date, global_to_date = date_range
# Find and process special tokens
remaining_tokens = []
for token in tokens:
if token.type == TokenType.TAG:
global_tag_filters.add(token.value)
elif token.type == TokenType.DATE_FROM:
try:
global_from_date = datetime.fromisoformat(token.value)
except ValueError:
# Fallback to natural language date parsing
parsed = parse_natural_date(token.value)
if parsed is not None:
global_from_date = parsed
else:
logger.warning(f"Invalid from date format: {token.value}")
elif token.type == TokenType.DATE_TO:
try:
global_to_date = datetime.fromisoformat(token.value)
except ValueError:
parsed = parse_natural_date(token.value)
if parsed is not None:
global_to_date = parsed
else:
logger.warning(f"Invalid to date format: {token.value}")
else:
remaining_tokens.append(token)
# Update date_range with parsed dates from the query
date_range = (global_from_date, global_to_date)
# If no query text and we have filters, we should still return filtered results
if not query and (
global_tag_filters or date_range[0] is not None or date_range[1] is not None
):
logger.debug(
f"Processing empty query with filters: tags={global_tag_filters}, date_range={date_range}"
)
# Collect results with scores
results = []
for _, note in notes.items():
# Apply tag filters
if global_tag_filters and not self._matches_tags(
note, global_tag_filters
):
continue
# Apply date range filters
if (
date_range[0] is not None or date_range[1] is not None
) and not self._is_in_date_range(note, date_range):
continue
# Add matching note to results with a default score of 1
results.append((note, 1))
# Sort by modification date (most recent first) as a default
results.sort(key=lambda x: self._get_modify_date(x[0]), reverse=True)
# Return just the notes, not the scores
return [note for note, _ in results]
# For regular searches with actual query tokens
elif tokens:
# Collect results with scores
results = []
for _, note in notes.items():
# Apply tag filters
if global_tag_filters and not self._matches_tags(
note, global_tag_filters
):
continue
# Apply date range filters
if (
date_range[0] is not None or date_range[1] is not None
) and not self._is_in_date_range(note, date_range):
continue
# Evaluate the boolean expression
if remaining_tokens and not self._evaluate_expression(
note, remaining_tokens
):
continue
# Calculate relevance score
score = self._calculate_relevance(note, query)
# Add matching note to results
results.append((note, score))
# Sort by relevance score (descending)
results.sort(key=lambda x: x[1], reverse=True)
# Return just the notes, not the scores
return [note for note, _ in results]
# Empty query with no filters returns empty result
else:
return []
def _matches_tags(self, note: dict[str, Any], tags: set[str]) -> bool:
"""Check if a note matches the specified tags.
Args:
note: The note to check
tags: Set of tags to match
Returns:
True if note has all the specified tags, False otherwise
"""
# Special case for untagged notes
if any(tag.lower() == "untagged" for tag in tags):
note_tags = note.get("tags", [])
return not note_tags or len(note_tags) == 0
note_tags = set(note.get("tags", []))
# Case insensitive tag matching
note_tags_lower = {tag.lower() for tag in note_tags}
tags_lower = {tag.lower() for tag in tags}
logger.debug(f"Checking if tags {tags} are in note tags {note_tags}")
return tags_lower.issubset(note_tags_lower)
def _is_in_date_range(
self,
note: dict[str, Any],
date_range: tuple[datetime | None, datetime | None],
) -> bool:
"""Check if a note's modification date is within the specified range.
Args:
note: The note to check
date_range: Tuple of (from_date, to_date), either may be None
Returns:
True if note is in the date range, False otherwise
"""
from_date, to_date = date_range
# If no date range specified, note is in range
if not from_date and not to_date:
return True
# Get note modification date
modify_date = note.get("modifydate", 0)
if not modify_date:
return False
# Parse date if it's a string
if isinstance(modify_date, str):
try:
note_date = datetime.fromisoformat(modify_date)
except ValueError:
logger.warning(f"Invalid note modification date format: {modify_date}")
return False
else:
# Assume it's a timestamp
try:
note_date = datetime.fromtimestamp(modify_date)
except (ValueError, TypeError):
logger.warning(f"Invalid note modification timestamp: {modify_date}")
return False
# Check if note date is in range
if from_date and note_date < from_date:
return False
return not (to_date and note_date > to_date)
def _evaluate_expression(
self, note: dict[str, Any], tokens: list[QueryToken]
) -> bool:
"""Evaluate a boolean expression against a note.
Uses a simple recursive descent parser to evaluate the expression.
Args:
note: The note to evaluate against
tokens: List of query tokens
Returns:
True if note matches the expression, False otherwise
"""
if not tokens:
return True
# Create a parser state with current position
pos = [0]
# Parse the boolean expression
result = self._parse_or_expression(note, tokens, pos)
return result
def _parse_or_expression(
self, note: dict[str, Any], tokens: list[QueryToken], pos: list[int]
) -> bool:
"""Parse an OR expression (term OR term OR ...).
Args:
note: The note to check
tokens: List of query tokens
pos: Current position in the token list
Returns:
Result of evaluating the expression
"""
# Parse the first term
result = self._parse_and_expression(note, tokens, pos)
# Continue parsing OR terms
while pos[0] < len(tokens) and tokens[pos[0]].type == TokenType.OR:
# Skip the OR token
pos[0] += 1
# Parse the next term
next_result = self._parse_and_expression(note, tokens, pos)
# Combine with OR
result = result or next_result
return result
def _parse_and_expression(
self, note: dict[str, Any], tokens: list[QueryToken], pos: list[int]
) -> bool:
"""Parse an AND expression (term AND term AND ...).
Args:
note: The note to check
tokens: List of query tokens
pos: Current position in the token list
Returns:
Result of evaluating the expression
"""
# Parse the first term
result = self._parse_not_expression(note, tokens, pos)
# Continue parsing AND terms
while pos[0] < len(tokens) and tokens[pos[0]].type == TokenType.AND:
# Skip the AND token
pos[0] += 1
# Parse the next term
next_result = self._parse_not_expression(note, tokens, pos)
# Combine with AND
result = result and next_result
return result
def _parse_not_expression(
self, note: dict[str, Any], tokens: list[QueryToken], pos: list[int]
) -> bool:
"""Parse a NOT expression (NOT term).
Args:
note: The note to check
tokens: List of query tokens
pos: Current position in the token list
Returns:
Result of evaluating the expression
"""
# Check for NOT operator
if pos[0] < len(tokens) and tokens[pos[0]].type == TokenType.NOT:
# Skip the NOT token
pos[0] += 1
# Parse the term and negate it
result = not self._parse_primary(note, tokens, pos)
else:
# Parse a regular term
result = self._parse_primary(note, tokens, pos)
return result
def _parse_primary(
self, note: dict[str, Any], tokens: list[QueryToken], pos: list[int]
) -> bool:
"""Parse a primary expression (term, phrase, or grouped expression).
Args:
note: The note to check
tokens: List of query tokens
pos: Current position in the token list
Returns:
Result of evaluating the expression
"""
if pos[0] >= len(tokens):
# End of input
return False
token = tokens[pos[0]]
if token.type == TokenType.GROUP_START:
# Parse a grouped expression (...)
pos[0] += 1
# Parse the expression inside the group
result = self._parse_or_expression(note, tokens, pos)
# Ensure we have a closing parenthesis
if pos[0] < len(tokens) and tokens[pos[0]].type == TokenType.GROUP_END:
pos[0] += 1
else:
logger.warning("Missing closing parenthesis in search query")
return result
elif token.type == TokenType.TERM:
# Check if the term matches the note content
pos[0] += 1
return self._content_contains(note, token.value)
elif token.type == TokenType.PHRASE:
# Check if the phrase matches the note content
pos[0] += 1
return self._content_contains(note, token.value, exact=True)
else:
# Skip unexpected tokens
pos[0] += 1
return False
def _content_contains(
self, note: dict[str, Any], search_term: str, exact: bool = False
) -> bool:
"""Check if note content contains the search term.
When fuzzy mode is enabled, falls back to fuzzy matching if
exact/substring matching fails.
Args:
note: The note to check
search_term: The term to search for
exact: Whether to perform exact matching (case sensitive)
Returns:
True if note contains the term, False otherwise
"""
content = note.get("content", "")
if not content:
return False
if exact:
# For exact phrase matching, we need to check for the exact sequence of words
# This requires whole word matching, not just substring matching
# Convert search term and content to lowercase for case-insensitive matching
search_lower = search_term.lower()
content_lower = content.lower()
# Split into words and join with a word boundary pattern
search_words = search_lower.split()
if len(search_words) <= 1:
# Single word or empty - just do direct matching
found = search_lower in content_lower
if not found and self._fuzzy and self._fuzzy_matcher:
return self._fuzzy_matcher.fuzzy_contains(content, search_term)
return found
# For multiple words, use a regex pattern that matches the exact sequence
# with word boundaries
# Escape regex special characters
escaped_words = [re.escape(word) for word in search_words]
# Join with whitespace pattern
pattern = r"\b" + r"\s+".join(escaped_words) + r"\b"
# Check if the pattern matches
found = bool(re.search(pattern, content_lower))
if not found and self._fuzzy and self._fuzzy_matcher:
return self._fuzzy_matcher.fuzzy_contains_phrase(content, search_term)
return found
else:
# Case-insensitive match for regular terms
found = search_term.lower() in content.lower()
if not found and self._fuzzy and self._fuzzy_matcher:
return self._fuzzy_matcher.fuzzy_contains(content, search_term)
return found
def _get_modify_date(self, note: dict[str, Any]) -> datetime:
"""Extract the modification date from a note.
Args:
note: The note to extract date from
Returns:
A datetime object representing the modification date,
or epoch start if not available
"""
modify_date = note.get("modifydate", 0)
if not modify_date:
return datetime.fromtimestamp(0)
try:
# Parse date if it's a string
if isinstance(modify_date, str):
return datetime.fromisoformat(modify_date)
else:
# Assume it's a timestamp
return datetime.fromtimestamp(modify_date)
except (ValueError, TypeError):
logger.warning(f"Invalid note modification date format: {modify_date}")
return datetime.fromtimestamp(0)
def _calculate_relevance(self, note: dict[str, Any], query: str) -> int:
"""Calculate relevance score for a note.
Args:
note: The note to score
query: The original search query
Returns:
Relevance score (higher is more relevant)
"""
content = note.get("content", "").lower()
title_line = content.split("\n", 1)[0].lower() if content else ""
# Get all search terms (excluding operators)
search_terms = re.findall(r"\b\w+\b", query.lower())
search_terms = [
term for term in search_terms if term not in ("and", "or", "not")
]
if not search_terms:
return 0
# Calculate base score from term occurrences
score = 0
for term in search_terms:
# Count occurrences in content
term_score = content.count(term)
# Bonus for title matches
if term in title_line:
term_score += 10
elif self._fuzzy and self._fuzzy_matcher:
# Reduced bonus for fuzzy title matches
fuzzy_title_score = self._fuzzy_matcher.fuzzy_score(title_line, term)
if fuzzy_title_score >= self._fuzzy_matcher.threshold:
term_score += int(3 * fuzzy_title_score)
# If no exact matches but fuzzy mode is on, add partial score
if term_score == 0 and self._fuzzy and self._fuzzy_matcher:
fuzzy_content_score = self._fuzzy_matcher.fuzzy_score(content, term)
if fuzzy_content_score >= self._fuzzy_matcher.threshold:
term_score += max(1, int(3 * fuzzy_content_score))
score += term_score
# Bonus for tags matching the search terms
note_tags = [tag.lower() for tag in note.get("tags", [])]
for term in search_terms:
if term in note_tags:
score += 5
# Bonus for recency
try:
modify_date = note.get("modifydate", 0)
if modify_date:
# Simple recency bonus: 0-5 points based on age
# (more recent notes get higher scores)
if isinstance(modify_date, str):
# Parse ISO format date
note_date = datetime.fromisoformat(modify_date)
else:
# Unix timestamp
note_date = datetime.fromtimestamp(modify_date)
days_old = (datetime.now() - note_date).days
recency_bonus = max(0, 5 - min(5, days_old // 30))
score += recency_bonus
except (ValueError, TypeError):
# Ignore date parsing errors
pass
return score