Skip to main content
Glama

Convex MCP server

Official
by get-convex
query.rs43.8 kB
use std::{ collections::{ BTreeMap, BTreeSet, HashSet, }, ops::Deref, }; use anyhow::Context; use bitvec::vec::BitVec; use common::{ document::{ CreationTime, PackedDocument, }, document_index_keys::{ DocumentIndexKeyValue, DocumentIndexKeys, SearchIndexKeyValue, SearchValueTokens, }, index::IndexKeyBytes, query::FilterValue, types::{ SubscriberId, TabletIndexName, WriteTimestamp, }, }; use compact_str::CompactString; use itertools::{ Either, Itertools, }; use maplit::btreemap; #[cfg(any(test, feature = "testing"))] use proptest::arbitrary::{ any, Arbitrary, }; #[cfg(any(test, feature = "testing"))] use proptest::strategy::Strategy; use tantivy::{ schema::Field, Score, Term, }; use value::{ heap_size::{ HeapSize, WithHeapSize, }, ConvexString, ConvexValue, FieldPath, InternalId, ResolvedDocumentId, }; use crate::{ convex_en, memory_index::{ art::ART, TermId, }, metrics, scoring::term_from_str, EditDistance, }; /// A search query compiled against a particular `SearchIndexSchema`. #[derive(Debug, Clone)] pub struct CompiledQuery { pub text_query: Vec<QueryTerm>, pub filter_conditions: Vec<CompiledFilterCondition>, } impl CompiledQuery { /// If true, this query can't match anything pub fn is_empty(&self) -> bool { self.text_query.is_empty() } pub fn num_terms(&self) -> usize { self.text_query.len() + self.filter_conditions.len() } pub fn try_from_text_query_proto( value: pb::searchlight::TextQuery, search_field: Field, ) -> anyhow::Result<CompiledQuery> { Ok(Self { text_query: value .search_terms .into_iter() .map(|t| QueryTerm::try_from_text_query_term_proto(t, search_field)) .collect::<anyhow::Result<Vec<_>>>()?, filter_conditions: value .filter_conditions .into_iter() // TODO(CX-5481): get rid of this `Term::wrap` call. Need to propagate the Field for these. .map(|bytes| CompiledFilterCondition::Must(Term::wrap(bytes))) .collect_vec(), }) } } impl From<CompiledQuery> for pb::searchlight::TextQuery { fn from(value: CompiledQuery) -> Self { Self { search_terms: value .text_query .into_iter() .map(pb::searchlight::TextQueryTerm::from) .collect_vec(), filter_conditions: value .filter_conditions .into_iter() .map(|CompiledFilterCondition::Must(term)| term.as_slice().to_vec()) .collect_vec(), } } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct QueryTerm { term: Term, /// If the term is the last in a search query, it can be a prefix match for /// typeahead suggestions. prefix: bool, } impl QueryTerm { pub fn new(term: Term, prefix: bool) -> Self { QueryTerm { term, prefix } } pub fn term(&self) -> &Term { &self.term } pub fn into_term(self) -> Term { self.term } pub fn max_distance(&self) -> u32 { 0 } pub fn prefix(&self) -> bool { self.prefix } pub fn try_from_text_query_term_proto( value: pb::searchlight::TextQueryTerm, search_field: Field, ) -> anyhow::Result<QueryTerm> { let qterm = match value.term_type { None => anyhow::bail!("No TermType in QueryTerm"), Some(pb::searchlight::text_query_term::TermType::Exact(exact)) => QueryTerm { term: Term::from_field_text(search_field, &exact.token), prefix: false, }, Some(pb::searchlight::text_query_term::TermType::Fuzzy(fuzzy)) => QueryTerm { term: Term::from_field_text(search_field, &fuzzy.token), prefix: fuzzy.prefix, }, }; Ok(qterm) } } impl TryFrom<QueryTerm> for TextQueryTerm { type Error = anyhow::Error; fn try_from(value: QueryTerm) -> Result<Self, Self::Error> { let term = value .term .as_str() .context("Term was not a string")? .to_string(); let text_query_term = if value.prefix { TextQueryTerm::Fuzzy { token: term, max_distance: 0.try_into()?, prefix: value.prefix, } } else { TextQueryTerm::Exact(term) }; Ok(text_query_term) } } impl From<QueryTerm> for pb::searchlight::TextQueryTerm { fn from(value: QueryTerm) -> Self { let term = value.term(); let term_str = term.as_str().expect("QueryTerm not a string").to_string(); let term_type = if value.prefix { pb::searchlight::text_query_term::TermType::Fuzzy(pb::searchlight::FuzzyTextTerm { token: term_str, max_distance: 0, prefix: value.prefix, }) } else { pb::searchlight::text_query_term::TermType::Exact(pb::searchlight::ExactTextTerm { token: term_str, }) }; Self { term_type: Some(term_type), } } } /// An expanded version of CompiledQuery's search terms which expands fuzzy /// queries. Maps each QueryTerm from a CompiledQuery to a vector of term /// matches and their distance. /// /// TermShortlist is the list of terms that will be considered for a search /// query. `shortlist` and `query_term_shortlist_items` are normalized (with the /// latter storing indices into the other) to deduplicate terms. This allows /// `CandidateRevision` to include a list of positions for the document /// represented as a `ShortlistId`. /// /// Without this deduplicated structure, `CandidateRevision` would need to store /// the terms themselves which is potentially a lot of unneeded space in the /// searchlight RPC. #[derive(Debug, Clone, PartialEq)] pub struct TermShortlist { shortlist: Vec<Term>, pub query_term_shortlist_items: BTreeMap<QueryTerm, Vec<(EditDistance, ShortlistId)>>, } pub struct TermShortlistBuilder { shortlist: Vec<Term>, query_term_shortlist_items: BTreeMap<QueryTerm, Vec<(EditDistance, ShortlistId)>>, term_to_shortlist: BTreeMap<Term, ShortlistId>, } impl TermShortlistBuilder { fn new() -> Self { Self { shortlist: vec![], query_term_shortlist_items: Default::default(), term_to_shortlist: Default::default(), } } fn build(self) -> TermShortlist { TermShortlist { shortlist: self.shortlist, query_term_shortlist_items: self.query_term_shortlist_items, } } /// Adds the given set of matches for the given term to the short list. /// Returns a vec[] containing a 1-1 mapping between each term in /// `matches` and the corresponding shortlist id. /// /// The returned vec will return Some at each position where the /// corresponding term was newly added to the shortlist, and None at /// each position where the term already existed in the shortlist. fn add_matches( &mut self, term: QueryTerm, matches: BTreeSet<(EditDistance, Term)>, ) -> Vec<Option<ShortlistId>> { let shortlist_items = self.query_term_shortlist_items.entry(term).or_default(); let mut shortlist_ids = vec![]; for (distance, term) in matches { let maybe_new_shortlist_id = if !self.term_to_shortlist.contains_key(&term) { let shortlist_id = ShortlistId(self.shortlist.len() as u16); self.term_to_shortlist.insert(term.clone(), shortlist_id); self.shortlist.push(term); shortlist_items.push((distance, shortlist_id)); Some(shortlist_id) } else { None }; shortlist_ids.push(maybe_new_shortlist_id); } shortlist_ids } } /// A pointer to a term in the shortlist. /// /// For now, ShortlistId fits in a u8 since we will never consider more than 128 /// terms but we use u16 to be generous. /// /// As an implementation detail that may change in the future, these are /// currently just the index of the term in the shortlist. #[derive(PartialOrd, Ord, Clone, Debug, Eq, PartialEq, Copy)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct ShortlistId(u16); impl TryFrom<u32> for ShortlistId { type Error = anyhow::Error; fn try_from(value: u32) -> Result<Self, Self::Error> { Ok(ShortlistId(value.try_into()?)) } } pub(crate) fn shortlist_and_id_mapping( term_matches: BTreeMap<QueryTerm, Vec<(EditDistance, Term, TermId)>>, ) -> (TermShortlist, BTreeMap<ShortlistId, TermId>) { let mut shortlist_id_to_term_id = BTreeMap::new(); let mut builder = TermShortlistBuilder::new(); for (query_term, matches) in term_matches { let (matches, term_ids): (BTreeSet<_>, Vec<TermId>) = matches .into_iter() .map(|(distance, match_term, term_id)| ((distance, match_term), term_id)) .unzip(); let shortlist_ids = builder.add_matches(query_term, matches); shortlist_id_to_term_id.extend(shortlist_ids.into_iter().zip(term_ids).filter_map( |(shortlist_id, term_id)| shortlist_id.map(|shortlist_id| (shortlist_id, term_id)), )); } (builder.build(), shortlist_id_to_term_id) } impl TermShortlist { pub fn new(term_matches: BTreeMap<QueryTerm, BTreeSet<(EditDistance, Term)>>) -> Self { let mut builder = TermShortlistBuilder::new(); for (query_term, matches) in term_matches { builder.add_matches(query_term, matches); } builder.build() } pub fn terms(&self) -> Vec<Term> { self.shortlist.clone() } pub fn ids_and_terms(&self) -> impl Iterator<Item = (ShortlistId, &Term)> { self.shortlist .iter() .enumerate() .map(|(idx, term)| (ShortlistId(idx as u16), term)) } pub fn get_term(&self, id: ShortlistId) -> anyhow::Result<&Term> { self.shortlist .get(id.0 as usize) .context("Invalid shortlist id, did we mix up ids and shortlists?") } pub fn get_shortlisted_terms_for_query_term( &self, query_term: &QueryTerm, ) -> impl Iterator<Item = &(EditDistance, ShortlistId)> { if let Some(vec) = self.query_term_shortlist_items.get(query_term) { Either::Left(vec.iter()) } else { Either::Right(vec![].into_iter()) } } pub fn try_from_proto( value: pb::searchlight::TermShortlist, search_field: Field, ) -> anyhow::Result<TermShortlist> { Ok(TermShortlist { shortlist: value .shortlist .into_iter() .map(|term_str| term_from_str(term_str.as_str())) .collect_vec(), query_term_shortlist_items: value .query_term_shortlist_items .into_iter() .map(|query_term_shortlist| { anyhow::Ok(( QueryTerm::try_from_text_query_term_proto( query_term_shortlist .query_term .context("QueryTerm missing from TermShortlist proto")?, search_field, )?, query_term_shortlist .items .into_iter() .map(|item| { Ok(( item.distance as EditDistance, ShortlistId::try_from(item.shortlist_id)?, )) }) .collect::<anyhow::Result<Vec<_>>>()?, )) }) .collect::<anyhow::Result<BTreeMap<_, _>>>()?, }) } } impl From<TermShortlist> for pb::searchlight::TermShortlist { fn from(value: TermShortlist) -> Self { pb::searchlight::TermShortlist { shortlist: value .shortlist .into_iter() .map(|term| { term.as_str() .expect("shortlisted term not a string") .to_string() }) .collect_vec(), query_term_shortlist_items: value .query_term_shortlist_items .into_iter() .map( |(qterm, matches)| pb::searchlight::QueryTermShortlistItems { query_term: Some(pb::searchlight::TextQueryTerm::from(qterm)), items: matches .into_iter() .map(|(dist, id)| pb::searchlight::ShortlistItem { distance: dist as u32, shortlist_id: id.0 as u32, }) .collect_vec(), }, ) .collect_vec(), } } } /// A memory-index specific query that is useful for scoring #[derive(Debug)] pub struct TermListBitsetQuery { /// Stores a sorted list of term IDs in this query pub sorted_terms: Vec<TermId>, /// Is `sorted_terms[i]` a filter term? pub intersection_terms: BitVec, /// Is `union_terms[i]` a search term? pub union_terms: BitVec, /// Score multiplier for a match of this union term pub union_id_boosts: Vec<Score>, } impl TermListBitsetQuery { pub const NEVER_MATCH: TermListBitsetQuery = TermListBitsetQuery { sorted_terms: vec![], intersection_terms: BitVec::EMPTY, union_terms: BitVec::EMPTY, union_id_boosts: vec![], }; pub fn new( term_ids: BTreeSet<TermId>, intersection_term_ids: BTreeSet<TermId>, boosts_by_union_id: BTreeMap<TermId, Score>, ) -> Self { let sorted_terms = term_ids.into_iter().collect_vec(); let mut intersection_terms = BitVec::repeat(false, sorted_terms.len()); let mut union_terms = BitVec::repeat(false, sorted_terms.len()); let mut union_id_boosts = Vec::with_capacity(sorted_terms.len()); for (i, term) in sorted_terms.iter().enumerate() { intersection_terms.set(i, intersection_term_ids.contains(term)); if let Some(boost) = boosts_by_union_id.get(term) { union_terms.set(i, true); union_id_boosts.push(*boost); } } Self { sorted_terms, intersection_terms, union_terms, union_id_boosts, } } /// Empty `sorted_terms` indicates that a term cannot match any documents, /// either from an empty user query or from a query that is instantiated /// with TermListBitsetQuery::NEVER_MATCH pub fn never_match(&self) -> bool { self.sorted_terms.is_empty() } } #[derive(Debug, Clone)] pub enum CompiledFilterCondition { Must(Term), } #[derive(Clone, Debug, PartialEq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct CandidateRevision { pub score: f32, pub id: InternalId, pub ts: WriteTimestamp, pub creation_time: CreationTime, } impl From<CandidateRevision> for pb::searchlight::CandidateRevision { fn from(revision: CandidateRevision) -> Self { let ts: Option<u64> = match revision.ts { WriteTimestamp::Committed(ts) => Some(ts.into()), WriteTimestamp::Pending => None, }; let internal_id_bytes = &*revision.id; pb::searchlight::CandidateRevision { score: revision.score, internal_id: internal_id_bytes.to_vec(), ts, creation_time: revision.creation_time.into(), } } } impl TryFrom<pb::searchlight::CandidateRevision> for CandidateRevision { type Error = anyhow::Error; fn try_from(proto: pb::searchlight::CandidateRevision) -> Result<Self, Self::Error> { let ts = match proto.ts { Some(ts) => WriteTimestamp::Committed(ts.try_into()?), None => WriteTimestamp::Pending, }; Ok(CandidateRevision { score: proto.score, id: proto.internal_id.try_into()?, ts, creation_time: proto.creation_time.try_into()?, }) } } #[derive(Clone, Debug, PartialEq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct CandidateRevisionPositions { pub revision: CandidateRevision, pub positions: BTreeMap<ShortlistId, Vec<u32>>, } impl From<CandidateRevisionPositions> for CandidateRevision { fn from(value: CandidateRevisionPositions) -> Self { value.revision } } impl TryFrom<pb::searchlight::CandidateRevisionPositions> for CandidateRevisionPositions { type Error = anyhow::Error; fn try_from(value: pb::searchlight::CandidateRevisionPositions) -> Result<Self, Self::Error> { Ok(CandidateRevisionPositions { revision: CandidateRevision::try_from( value.revision.context("candidate revision missing")?, )?, positions: value .positions .into_iter() .map(|pos| Ok((ShortlistId::try_from(pos.shortlist_id)?, pos.positions))) .collect::<anyhow::Result<BTreeMap<_, _>>>()?, }) } } impl From<CandidateRevisionPositions> for pb::searchlight::CandidateRevisionPositions { fn from(value: CandidateRevisionPositions) -> Self { pb::searchlight::CandidateRevisionPositions { revision: Some(pb::searchlight::CandidateRevision::from(value.revision)), positions: value .positions .into_iter() .map(|(id, positions)| pb::searchlight::ShortlistPositions { shortlist_id: id.0 as u32, positions, }) .collect_vec(), } } } pub type RevisionWithKeys = Vec<(CandidateRevision, IndexKeyBytes)>; pub struct QueryResults { pub revisions_with_keys: RevisionWithKeys, pub reads: QueryReads, } impl QueryResults { pub fn empty() -> Self { Self { revisions_with_keys: vec![], reads: QueryReads::empty(), } } } /// A read based on a single token extracted from a text query search. /// /// A single text query will be split into many parts (tokenized), each part /// will be combined with the constant metadata (path, distance prefix etc) into /// a term, then we track reads based on individual terms. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub struct TextQueryTermRead { pub field_path: FieldPath, pub term: TextQueryTerm, } impl TextQueryTermRead { pub fn new(field_path: FieldPath, term: TextQueryTerm) -> Self { Self { field_path, term } } } // For proptest we're using lowercase ascii and a filter to generate tokens so // that we approximately match what the tokenzier we're using allows. The // would already have run on these terms prior to this point for production // code. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum TextQueryTerm { Exact( #[cfg_attr( any(test, feature = "testing"), proptest( regex = "[a-z]+", filter = "|token| token.len() > 1 && token.len() < 32" ) )] String, ), Fuzzy { #[cfg_attr( any(test, feature = "testing"), proptest( regex = "[a-z]+", filter = "|token| token.len() > 1 && token.len() < 32" ) )] token: String, max_distance: FuzzyDistance, prefix: bool, }, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum FuzzyDistance { Zero, One, Two, } impl TryFrom<u8> for FuzzyDistance { type Error = anyhow::Error; fn try_from(value: u8) -> Result<Self, Self::Error> { match value { 0 => Ok(Self::Zero), 1 => Ok(Self::One), 2 => Ok(Self::Two), _ => Err(anyhow::anyhow!("Invalid distance: {value}")), } } } impl From<FuzzyDistance> for u8 { fn from(value: FuzzyDistance) -> Self { *value } } impl Deref for FuzzyDistance { type Target = u8; fn deref(&self) -> &Self::Target { match self { FuzzyDistance::Zero => &0u8, FuzzyDistance::One => &1u8, FuzzyDistance::Two => &2u8, } } } impl TextQueryTerm { /// Convert a term into the parameters necessary to perform a "fuzzy" /// search. /// /// Since exact text search is equivalent to a non-prefixed fuzzy search /// with a distance 0, we can hard code those values. fn fuzzy_params(&self) -> (&String, u8, bool) { match self { Self::Fuzzy { token, max_distance, prefix, } => (token, **max_distance, *prefix), Self::Exact(token) => (token, 0u8, false), } } } impl HeapSize for TextQueryTerm { fn heap_size(&self) -> usize { match self { TextQueryTerm::Exact(token) => token.heap_size(), TextQueryTerm::Fuzzy { token, max_distance, prefix, } => token.heap_size() + max_distance.heap_size() + prefix.heap_size(), } } } impl HeapSize for TextQueryTermRead { fn heap_size(&self) -> usize { self.field_path.heap_size() + self.term.heap_size() } } #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))] pub enum FilterConditionRead { Must(FieldPath, FilterValue), } impl HeapSize for FilterConditionRead { fn heap_size(&self) -> usize { match self { FilterConditionRead::Must(p, v) => p.heap_size() + v.heap_size(), } } } #[derive(Debug, Clone)] pub struct QueryReads { pub text_queries: WithHeapSize<Vec<TextQueryTermRead>>, pub filter_conditions: WithHeapSize<Vec<FilterConditionRead>>, // State derived from text_queries for more efficient matching with many // fuzzy text subscriptions. Because this is strictly derived, it can always // be reconstructed from the simpler text_queries / filter_conditions. fuzzy_terms: SearchTermTries<()>, } impl QueryReads { pub fn new( text_queries: WithHeapSize<Vec<TextQueryTermRead>>, filter_conditions: WithHeapSize<Vec<FilterConditionRead>>, ) -> Self { let mut fuzzy_terms = SearchTermTries::new(); fuzzy_terms.extend((), &text_queries); Self { text_queries, filter_conditions, fuzzy_terms, } } } #[cfg(any(test, feature = "testing"))] impl Arbitrary for QueryReads { type Parameters = (); type Strategy = impl Strategy<Value = QueryReads>; fn arbitrary_with((): Self::Parameters) -> Self::Strategy { any::<( WithHeapSize<Vec<TextQueryTermRead>>, WithHeapSize<Vec<FilterConditionRead>>, )>() .prop_map(|(text_queries, filter_conditions)| { QueryReads::new(text_queries, filter_conditions) }) } } impl PartialEq for QueryReads { fn eq(&self, other: &Self) -> bool { self.text_queries == other.text_queries && self.filter_conditions == other.filter_conditions } } impl Eq for QueryReads {} impl HeapSize for QueryReads { // TODO(CX-5459): Include fuzzy_terms in heap size. fn heap_size(&self) -> usize { self.text_queries.heap_size() + self.filter_conditions.heap_size() } } #[derive(Debug, Clone)] struct SearchTermTries<T: Clone + Ord> { terms: BTreeMap<FieldPath, Tries<T>>, } impl<T: Clone + Ord> SearchTermTries<T> { fn new() -> Self { Self { terms: BTreeMap::new(), } } #[fastrace::trace] fn overlaps_document<'a>(&'a self, document: &'a PackedDocument) -> bool { for (path, tries) in self.terms.iter() { let Some(ConvexValue::String(document_text)) = document.value().get_path(path) else { continue; }; let tokens = tokenize(document_text); let mut overlaps = false; tries.matching_values(&tokens, &mut |_| overlaps = true); if overlaps { return true; } } false } #[fastrace::trace] fn overlaps_index_key_value(&self, index_key_value: &SearchIndexKeyValue) -> bool { let Some(tokens) = &index_key_value.search_field_value else { return false; }; let Some(tries) = self.terms.get(&index_key_value.search_field) else { return false; }; let mut overlaps = false; tries.matching_values(tokens, &mut |_| overlaps = true); overlaps } fn extend(&mut self, value: T, queries: &WithHeapSize<Vec<TextQueryTermRead>>) { for text_query in queries { let path = &text_query.field_path; let (token, max_distance, prefix) = text_query.term.fuzzy_params(); let art = self .terms .entry(path.clone()) .or_insert_with(Tries::new) .tries .entry((prefix, max_distance)) .or_insert_with(ART::new); if let Some(value_to_count) = art.get_mut(token) { *value_to_count.entry(value.clone()).or_default() += 1 } else { art.insert(token.clone(), btreemap! { value.clone() => 1}); } } } fn remove(&mut self, value: T, queries: &WithHeapSize<Vec<TextQueryTermRead>>) { for text_query in queries { let path = &text_query.field_path; let (token, max_distance, prefix) = text_query.term.fuzzy_params(); let value = value.clone(); let tries = self .terms .get_mut(path) .unwrap_or_else(|| panic!("Missing tries for {path}")); let trie = tries .tries .get_mut(&(prefix, max_distance)) .unwrap_or_else(|| panic!("Missing trie for ({prefix}, {max_distance})")); let value_to_count = trie .get_mut(token) .unwrap_or_else(|| panic!("Missing values for a token of length {}", token.len())); let count = value_to_count .entry(value.clone()) .and_modify(|count| { *count = count .checked_sub(1) .expect("Can't remove more values than were added") }) .or_insert_with(|| panic!("Missing count for value!")); if *count == 0 { value_to_count.remove(&value); } if value_to_count.is_empty() { trie.remove(token); } } } } #[derive(Debug, Clone)] struct Tries<T: Clone> { // TODO: Allow ART to store N values: // https://github.com/get-convex/convex/pull/20030/files#r1427222221 tries: BTreeMap<(bool, u8), ART<String, BTreeMap<T, usize>>>, } impl<T: Clone> Tries<T> { fn new() -> Self { Self { tries: BTreeMap::new(), } } } impl<T: Clone + Ord> Tries<T> { fn matching_values(&self, tokens: &SearchValueTokens, result: &mut impl FnMut(T)) { for ((prefix, _max_distance), trie) in self.tries.iter() { // Prefixing is handled by constructing prefix tokens in ValueTokens (see the // notes there), so we can get away with a symmetric search where the dfa's // prefix is always set to false. tokens.for_each_token(*prefix, |token| { if let Some(value) = trie.get(token) { for key in value.keys() { result(key.clone()); } } }); } } } impl QueryReads { pub fn empty() -> Self { QueryReads { text_queries: WithHeapSize::default(), filter_conditions: WithHeapSize::default(), fuzzy_terms: SearchTermTries::new(), } } pub fn merge(&mut self, other: Self) { self.fuzzy_terms.extend((), &other.text_queries); self.text_queries.extend(other.text_queries); self.filter_conditions.extend(other.filter_conditions); } #[fastrace::trace] pub fn overlaps_document(&self, document: &PackedDocument) -> bool { let _timer = metrics::query_reads_overlaps_timer(); for filter_condition in &self.filter_conditions { let FilterConditionRead::Must(field_path, filter_value) = filter_condition; let document_value = document.value().get_path(field_path); let document_value = FilterValue::from_search_value(document_value.as_ref()); // If the document doesn't match the filter condition, we can skip checking // fuzzy terms if document_value != *filter_value { metrics::log_query_reads_outcome(false); return false; } } // If there are no text queries and all filters match, this counts as an // overlap. if self.text_queries.is_empty() { metrics::log_query_reads_outcome(true); return true; } // If all the filter conditions match and there are text queries, we then check // for fuzzy matches. let is_fuzzy_match = self.fuzzy_terms.overlaps_document(document); metrics::log_query_reads_outcome(is_fuzzy_match); is_fuzzy_match } #[fastrace::trace] pub fn overlaps_search_index_key_value(&self, index_key_value: &SearchIndexKeyValue) -> bool { let _timer = metrics::query_reads_overlaps_search_value_timer(); // Filter out documents that don’t match the filter for filter_condition in &self.filter_conditions { let FilterConditionRead::Must(field_path, filter_value) = filter_condition; let Some(document_value) = index_key_value.filter_values.get(field_path) else { // This shouldn’t happen because even if the field doesn’t exist in the // document, there is a special `FilterValue` value for // undefined. This could happen if the write log entry was created concurrently // with index definition changes, but it shouldn’t be a problem. metrics::log_missing_filter_value(); return false; }; if *document_value != *filter_value { return false; } } // If there are no text queries and all filters match, this counts as an // overlap. if self.text_queries.is_empty() { metrics::log_query_reads_outcome(true); return true; } // If all the filter conditions match and there are text queries, we then check // for fuzzy matches. let is_fuzzy_match = self.fuzzy_terms.overlaps_index_key_value(index_key_value); metrics::log_query_reads_outcome(is_fuzzy_match); is_fuzzy_match } } pub struct TextSearchSubscriptions { fuzzy_searches: BTreeMap<TabletIndexName, SearchTermTries<SubscriberId>>, // TODO: Filter conditions are inefficiently searched, especially in conjunction with text // searches. We should eventually optimize this simpler implementation as well. filter_conditions: BTreeMap<TabletIndexName, BTreeMap<SubscriberId, Vec<FilterConditionRead>>>, } impl TextSearchSubscriptions { pub fn new() -> Self { Self { fuzzy_searches: BTreeMap::new(), filter_conditions: BTreeMap::new(), } } pub fn filter_len(&self) -> usize { self.filter_conditions.values().map(|m| m.len()).sum() } pub fn fuzzy_len(&self) -> usize { self.fuzzy_searches.len() } pub fn insert(&mut self, id: SubscriberId, index: &TabletIndexName, reads: &QueryReads) { self.filter_conditions .entry(index.clone()) .or_default() .entry(id) .or_default() .extend(reads.filter_conditions.to_vec()); self.fuzzy_searches .entry(index.clone()) .or_insert_with(SearchTermTries::new) .extend(id, &reads.text_queries) } pub fn remove(&mut self, id: SubscriberId, index: &TabletIndexName, reads: &QueryReads) { let conditions = self .filter_conditions .get_mut(index) .unwrap_or_else(|| panic!("Missing condition index entry for {index}")); assert!(conditions.remove(&id).is_some()); if conditions.is_empty() { self.filter_conditions.remove(index); } let terms = self .fuzzy_searches .get_mut(index) .unwrap_or_else(|| panic!("Missing fuzzy search index entry for {index}")); terms.remove(id, &reads.text_queries); } pub fn add_matches( &self, document_id: &ResolvedDocumentId, document_index_keys: &DocumentIndexKeys, notify: &mut impl FnMut(SubscriberId), ) { self.add_filter_conditions_matches(document_id, document_index_keys, notify); self.add_fuzzy_matches(document_id, document_index_keys, notify); } fn add_filter_conditions_matches( &self, document_id: &ResolvedDocumentId, document_index_keys: &DocumentIndexKeys, notify: &mut impl FnMut(SubscriberId), ) { for (index, filter_conditions_map) in &self.filter_conditions { if *index.table() != document_id.tablet_id { continue; } let Some(DocumentIndexKeyValue::Search(SearchIndexKeyValue { filter_values, .. })) = document_index_keys.get(index) else { metrics::log_missing_index_key(); continue; }; for (subscriber_id, filter_conditions) in filter_conditions_map { for FilterConditionRead::Must(field_path, filter_value) in filter_conditions { let Some(document_value) = filter_values.get(field_path) else { metrics::log_missing_filter_value(); continue; }; if document_value == filter_value { metrics::log_query_reads_outcome(true); notify(*subscriber_id); } } } } } /// An inverse search where we search document tokens against a trie of read /// query terms instead of the more normal trie of the document tokens /// against a dfa for each search term. /// /// This inverse looking search optimizes for cases where the number of /// reads/subscriptions is significantly larger than the number of /// tokens in the document. fn add_fuzzy_matches( &self, document_id: &ResolvedDocumentId, document_index_keys: &DocumentIndexKeys, matches: &mut impl FnMut(SubscriberId), ) { for (index, fuzzy_terms) in self .fuzzy_searches .iter() .filter(|(index, _)| *index.table() == document_id.tablet_id) { let Some(DocumentIndexKeyValue::Search(index_key_value)) = document_index_keys.get(index) else { continue; }; let Some(tokens) = &index_key_value.search_field_value else { continue; }; let Some(tries) = fuzzy_terms.terms.get(&index_key_value.search_field) else { continue; }; tries.matching_values(tokens, matches); } } } pub fn tokenize(value: ConvexString) -> SearchValueTokens { let analyzer = convex_en(); // Tokenizing the value is expensive, but so is constructing a prefix for // every token. So we always keep track of the list of tokens, but we // only construct the prefixes for each token if we have at least one search in // the read set that uses prefixes. let mut token_stream = analyzer.token_stream(&value); let mut tokens: HashSet<CompactString> = HashSet::new(); while token_stream.advance() { let text = &token_stream.token().text; tokens.insert(text.into()); } SearchValueTokens::from(tokens) } #[cfg(test)] mod tests { use std::{ collections::BTreeMap, str::FromStr, }; use common::{ document::ResolvedDocument, types::IndexDescriptor, }; use value::{ ConvexObject, ConvexString, ConvexValue, ResolvedDocumentId, TabletId, }; use super::*; #[test] fn test_search_term_tries_overlaps() -> anyhow::Result<()> { let mut tries = SearchTermTries::new(); // Create a document with a text field let mut map = BTreeMap::new(); map.insert( "title".parse()?, ConvexValue::String(ConvexString::try_from("hello world")?), ); let object = ConvexObject::try_from(map)?; let doc = PackedDocument::pack(&ResolvedDocument::new( ResolvedDocumentId::MIN, CreationTime::ONE, object, )?); // Add a search term that matches the document using extend let text_query = TextQueryTermRead::new( FieldPath::from_str("title")?, TextQueryTerm::Exact("hello".to_string()), ); let text_queries = WithHeapSize::from(vec![text_query]); tries.extend((), &text_queries); // Test that the document matches assert!(tries.overlaps_document(&doc)); // Add a non-matching term let text_query = TextQueryTermRead::new( FieldPath::from_str("title")?, TextQueryTerm::Exact("goodbye".to_string()), ); let text_queries = WithHeapSize::from(vec![text_query]); tries.extend((), &text_queries); // Document should still match because it matches at least one term assert!(tries.overlaps_document(&doc)); // Create a document that doesn't match any terms let mut map = BTreeMap::new(); map.insert( "title".parse()?, ConvexValue::String(ConvexString::try_from("bonjour")?), ); let object = ConvexObject::try_from(map)?; let doc = PackedDocument::pack(&ResolvedDocument::new( ResolvedDocumentId::MIN, CreationTime::ONE, object, )?); // Document should not match assert!(!tries.overlaps_document(&doc)); Ok(()) } #[test] fn test_search_term_tries_overlaps_returns_false_if_the_field_does_not_exist( ) -> anyhow::Result<()> { let mut tries = SearchTermTries::new(); let text_query = TextQueryTermRead::new( FieldPath::from_str("title")?, TextQueryTerm::Exact("hello".to_string()), ); let text_queries = WithHeapSize::from(vec![text_query]); tries.extend((), &text_queries); let doc = PackedDocument::pack(&ResolvedDocument::new( ResolvedDocumentId::MIN, CreationTime::ONE, ConvexObject::try_from(btreemap! {})?, )?); assert!(!tries.overlaps_document(&doc)); Ok(()) } #[test] fn test_add_fuzzy_matches() -> anyhow::Result<()> { let mut subscriptions = TextSearchSubscriptions::new(); let tablet_id = TabletId::MIN; let index = TabletIndexName::new(tablet_id, IndexDescriptor::new("test_index")?)?; let subscriber_id = SubscriberId::MIN; // Query that matches the document let query_reads = QueryReads::new( WithHeapSize::from(vec![TextQueryTermRead::new( FieldPath::from_str("text")?, TextQueryTerm::Exact("hello".to_string()), )]), WithHeapSize::default(), ); subscriptions.insert(subscriber_id, &index, &query_reads); let keys_matching = DocumentIndexKeys::with_search_index_for_test( index.clone(), FieldPath::from_str("text")?, tokenize(ConvexString::try_from("hello world")?), ); // Test matching let mut matches = BTreeSet::new(); subscriptions.add_fuzzy_matches(&ResolvedDocumentId::MIN, &keys_matching, &mut |id| { matches.insert(id); }); assert!(matches.contains(&subscriber_id)); // Test non-matching let keys_non_matching = DocumentIndexKeys::with_search_index_for_test( index.clone(), FieldPath::from_str("text")?, tokenize(ConvexString::try_from("different text")?), ); let mut matches = BTreeSet::new(); subscriptions.add_fuzzy_matches(&ResolvedDocumentId::MIN, &keys_non_matching, &mut |id| { matches.insert(id); }); assert!(matches.is_empty()); Ok(()) } #[test] fn test_add_fuzzy_matches_returns_false_if_the_field_does_not_exist() -> anyhow::Result<()> { let mut subscriptions = TextSearchSubscriptions::new(); let tablet_id = TabletId::MIN; let index = TabletIndexName::new(tablet_id, IndexDescriptor::new("test_index")?)?; let subscriber_id = SubscriberId::MIN; let query_reads = QueryReads::new( WithHeapSize::from(vec![TextQueryTermRead::new( FieldPath::from_str("text")?, TextQueryTerm::Exact("hello".to_string()), )]), WithHeapSize::default(), ); subscriptions.insert(subscriber_id, &index, &query_reads); let index_keys = DocumentIndexKeys::empty_for_test(); let mut matches = BTreeSet::new(); subscriptions.add_fuzzy_matches(&ResolvedDocumentId::MIN, &index_keys, &mut |id| { matches.insert(id); }); assert!(matches.is_empty()); Ok(()) } #[test] fn test_tokenize() { let tokens = tokenize(ConvexString::try_from("Hello world! Hello again!").unwrap()); assert!( tokens == SearchValueTokens::from_iter_for_test(vec![ "hello".to_string(), "world".to_string(), "again".to_string(), ]), ); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/get-convex/convex-backend'

If you have feedback or need assistance with the MCP directory API, please join our Discord server